xref: /openbmc/linux/fs/dlm/lock.c (revision 2209fda3)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 /* Central locking logic has four stages:
14 
15    dlm_lock()
16    dlm_unlock()
17 
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22 
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27 
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32 
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35 
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40 
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43 
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46 
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49 
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53 
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "lowcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77 
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 				    struct dlm_message *ms);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void del_timeout(struct dlm_lkb *lkb);
93 static void toss_rsb(struct kref *kref);
94 
95 /*
96  * Lock compatibilty matrix - thanks Steve
97  * UN = Unlocked state. Not really a state, used as a flag
98  * PD = Padding. Used to make the matrix a nice power of two in size
99  * Other states are the same as the VMS DLM.
100  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
101  */
102 
103 static const int __dlm_compat_matrix[8][8] = {
104       /* UN NL CR CW PR PW EX PD */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
106         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
107         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
108         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
109         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
110         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
111         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
112         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
113 };
114 
115 /*
116  * This defines the direction of transfer of LVB data.
117  * Granted mode is the row; requested mode is the column.
118  * Usage: matrix[grmode+1][rqmode+1]
119  * 1 = LVB is returned to the caller
120  * 0 = LVB is written to the resource
121  * -1 = nothing happens to the LVB
122  */
123 
124 const int dlm_lvb_operations[8][8] = {
125         /* UN   NL  CR  CW  PR  PW  EX  PD*/
126         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
127         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
128         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
129         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
130         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
131         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
133         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
134 };
135 
136 #define modes_compat(gr, rq) \
137 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
138 
139 int dlm_modes_compat(int mode1, int mode2)
140 {
141 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
142 }
143 
144 /*
145  * Compatibility matrix for conversions with QUECVT set.
146  * Granted mode is the row; requested mode is the column.
147  * Usage: matrix[grmode+1][rqmode+1]
148  */
149 
150 static const int __quecvt_compat_matrix[8][8] = {
151       /* UN NL CR CW PR PW EX PD */
152         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
153         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
154         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
155         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
156         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
157         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
158         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
159         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
160 };
161 
162 void dlm_print_lkb(struct dlm_lkb *lkb)
163 {
164 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
165 	       "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
166 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
167 	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
168 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
169 	       (unsigned long long)lkb->lkb_recover_seq);
170 }
171 
172 static void dlm_print_rsb(struct dlm_rsb *r)
173 {
174 	printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
175 	       "rlc %d name %s\n",
176 	       r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
177 	       r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
178 	       r->res_name);
179 }
180 
181 void dlm_dump_rsb(struct dlm_rsb *r)
182 {
183 	struct dlm_lkb *lkb;
184 
185 	dlm_print_rsb(r);
186 
187 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
188 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
189 	printk(KERN_ERR "rsb lookup list\n");
190 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
191 		dlm_print_lkb(lkb);
192 	printk(KERN_ERR "rsb grant queue:\n");
193 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
194 		dlm_print_lkb(lkb);
195 	printk(KERN_ERR "rsb convert queue:\n");
196 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
197 		dlm_print_lkb(lkb);
198 	printk(KERN_ERR "rsb wait queue:\n");
199 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
200 		dlm_print_lkb(lkb);
201 }
202 
203 /* Threads cannot use the lockspace while it's being recovered */
204 
205 static inline void dlm_lock_recovery(struct dlm_ls *ls)
206 {
207 	down_read(&ls->ls_in_recovery);
208 }
209 
210 void dlm_unlock_recovery(struct dlm_ls *ls)
211 {
212 	up_read(&ls->ls_in_recovery);
213 }
214 
215 int dlm_lock_recovery_try(struct dlm_ls *ls)
216 {
217 	return down_read_trylock(&ls->ls_in_recovery);
218 }
219 
220 static inline int can_be_queued(struct dlm_lkb *lkb)
221 {
222 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
223 }
224 
225 static inline int force_blocking_asts(struct dlm_lkb *lkb)
226 {
227 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
228 }
229 
230 static inline int is_demoted(struct dlm_lkb *lkb)
231 {
232 	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
233 }
234 
235 static inline int is_altmode(struct dlm_lkb *lkb)
236 {
237 	return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
238 }
239 
240 static inline int is_granted(struct dlm_lkb *lkb)
241 {
242 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
243 }
244 
245 static inline int is_remote(struct dlm_rsb *r)
246 {
247 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
248 	return !!r->res_nodeid;
249 }
250 
251 static inline int is_process_copy(struct dlm_lkb *lkb)
252 {
253 	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
254 }
255 
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258 	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
259 }
260 
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265 		return 1;
266 	return 0;
267 }
268 
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273 
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276 	return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
277 }
278 
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281 	return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
282 }
283 
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286 	return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
287 				  DLM_IFL_OVERLAP_CANCEL));
288 }
289 
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292 	if (is_master_copy(lkb))
293 		return;
294 
295 	del_timeout(lkb);
296 
297 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
298 
299 	/* if the operation was a cancel, then return -DLM_ECANCEL, if a
300 	   timeout caused the cancel then return -ETIMEDOUT */
301 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
302 		lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
303 		rv = -ETIMEDOUT;
304 	}
305 
306 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
307 		lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
308 		rv = -EDEADLK;
309 	}
310 
311 	dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
312 }
313 
314 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
315 {
316 	queue_cast(r, lkb,
317 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
318 }
319 
320 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
321 {
322 	if (is_master_copy(lkb)) {
323 		send_bast(r, lkb, rqmode);
324 	} else {
325 		dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
326 	}
327 }
328 
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332 
333 /* This is only called to add a reference when the code already holds
334    a valid reference to the rsb, so there's no need for locking. */
335 
336 static inline void hold_rsb(struct dlm_rsb *r)
337 {
338 	kref_get(&r->res_ref);
339 }
340 
341 void dlm_hold_rsb(struct dlm_rsb *r)
342 {
343 	hold_rsb(r);
344 }
345 
346 /* When all references to the rsb are gone it's transferred to
347    the tossed list for later disposal. */
348 
349 static void put_rsb(struct dlm_rsb *r)
350 {
351 	struct dlm_ls *ls = r->res_ls;
352 	uint32_t bucket = r->res_bucket;
353 
354 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
355 	kref_put(&r->res_ref, toss_rsb);
356 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
357 }
358 
359 void dlm_put_rsb(struct dlm_rsb *r)
360 {
361 	put_rsb(r);
362 }
363 
364 static int pre_rsb_struct(struct dlm_ls *ls)
365 {
366 	struct dlm_rsb *r1, *r2;
367 	int count = 0;
368 
369 	spin_lock(&ls->ls_new_rsb_spin);
370 	if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
371 		spin_unlock(&ls->ls_new_rsb_spin);
372 		return 0;
373 	}
374 	spin_unlock(&ls->ls_new_rsb_spin);
375 
376 	r1 = dlm_allocate_rsb(ls);
377 	r2 = dlm_allocate_rsb(ls);
378 
379 	spin_lock(&ls->ls_new_rsb_spin);
380 	if (r1) {
381 		list_add(&r1->res_hashchain, &ls->ls_new_rsb);
382 		ls->ls_new_rsb_count++;
383 	}
384 	if (r2) {
385 		list_add(&r2->res_hashchain, &ls->ls_new_rsb);
386 		ls->ls_new_rsb_count++;
387 	}
388 	count = ls->ls_new_rsb_count;
389 	spin_unlock(&ls->ls_new_rsb_spin);
390 
391 	if (!count)
392 		return -ENOMEM;
393 	return 0;
394 }
395 
396 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
397    unlock any spinlocks, go back and call pre_rsb_struct again.
398    Otherwise, take an rsb off the list and return it. */
399 
400 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
401 			  struct dlm_rsb **r_ret)
402 {
403 	struct dlm_rsb *r;
404 	int count;
405 
406 	spin_lock(&ls->ls_new_rsb_spin);
407 	if (list_empty(&ls->ls_new_rsb)) {
408 		count = ls->ls_new_rsb_count;
409 		spin_unlock(&ls->ls_new_rsb_spin);
410 		log_debug(ls, "find_rsb retry %d %d %s",
411 			  count, dlm_config.ci_new_rsb_count, name);
412 		return -EAGAIN;
413 	}
414 
415 	r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
416 	list_del(&r->res_hashchain);
417 	/* Convert the empty list_head to a NULL rb_node for tree usage: */
418 	memset(&r->res_hashnode, 0, sizeof(struct rb_node));
419 	ls->ls_new_rsb_count--;
420 	spin_unlock(&ls->ls_new_rsb_spin);
421 
422 	r->res_ls = ls;
423 	r->res_length = len;
424 	memcpy(r->res_name, name, len);
425 	mutex_init(&r->res_mutex);
426 
427 	INIT_LIST_HEAD(&r->res_lookup);
428 	INIT_LIST_HEAD(&r->res_grantqueue);
429 	INIT_LIST_HEAD(&r->res_convertqueue);
430 	INIT_LIST_HEAD(&r->res_waitqueue);
431 	INIT_LIST_HEAD(&r->res_root_list);
432 	INIT_LIST_HEAD(&r->res_recover_list);
433 
434 	*r_ret = r;
435 	return 0;
436 }
437 
438 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
439 {
440 	char maxname[DLM_RESNAME_MAXLEN];
441 
442 	memset(maxname, 0, DLM_RESNAME_MAXLEN);
443 	memcpy(maxname, name, nlen);
444 	return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
445 }
446 
447 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
448 			struct dlm_rsb **r_ret)
449 {
450 	struct rb_node *node = tree->rb_node;
451 	struct dlm_rsb *r;
452 	int rc;
453 
454 	while (node) {
455 		r = rb_entry(node, struct dlm_rsb, res_hashnode);
456 		rc = rsb_cmp(r, name, len);
457 		if (rc < 0)
458 			node = node->rb_left;
459 		else if (rc > 0)
460 			node = node->rb_right;
461 		else
462 			goto found;
463 	}
464 	*r_ret = NULL;
465 	return -EBADR;
466 
467  found:
468 	*r_ret = r;
469 	return 0;
470 }
471 
472 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
473 {
474 	struct rb_node **newn = &tree->rb_node;
475 	struct rb_node *parent = NULL;
476 	int rc;
477 
478 	while (*newn) {
479 		struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
480 					       res_hashnode);
481 
482 		parent = *newn;
483 		rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
484 		if (rc < 0)
485 			newn = &parent->rb_left;
486 		else if (rc > 0)
487 			newn = &parent->rb_right;
488 		else {
489 			log_print("rsb_insert match");
490 			dlm_dump_rsb(rsb);
491 			dlm_dump_rsb(cur);
492 			return -EEXIST;
493 		}
494 	}
495 
496 	rb_link_node(&rsb->res_hashnode, parent, newn);
497 	rb_insert_color(&rsb->res_hashnode, tree);
498 	return 0;
499 }
500 
501 /*
502  * Find rsb in rsbtbl and potentially create/add one
503  *
504  * Delaying the release of rsb's has a similar benefit to applications keeping
505  * NL locks on an rsb, but without the guarantee that the cached master value
506  * will still be valid when the rsb is reused.  Apps aren't always smart enough
507  * to keep NL locks on an rsb that they may lock again shortly; this can lead
508  * to excessive master lookups and removals if we don't delay the release.
509  *
510  * Searching for an rsb means looking through both the normal list and toss
511  * list.  When found on the toss list the rsb is moved to the normal list with
512  * ref count of 1; when found on normal list the ref count is incremented.
513  *
514  * rsb's on the keep list are being used locally and refcounted.
515  * rsb's on the toss list are not being used locally, and are not refcounted.
516  *
517  * The toss list rsb's were either
518  * - previously used locally but not any more (were on keep list, then
519  *   moved to toss list when last refcount dropped)
520  * - created and put on toss list as a directory record for a lookup
521  *   (we are the dir node for the res, but are not using the res right now,
522  *   but some other node is)
523  *
524  * The purpose of find_rsb() is to return a refcounted rsb for local use.
525  * So, if the given rsb is on the toss list, it is moved to the keep list
526  * before being returned.
527  *
528  * toss_rsb() happens when all local usage of the rsb is done, i.e. no
529  * more refcounts exist, so the rsb is moved from the keep list to the
530  * toss list.
531  *
532  * rsb's on both keep and toss lists are used for doing a name to master
533  * lookups.  rsb's that are in use locally (and being refcounted) are on
534  * the keep list, rsb's that are not in use locally (not refcounted) and
535  * only exist for name/master lookups are on the toss list.
536  *
537  * rsb's on the toss list who's dir_nodeid is not local can have stale
538  * name/master mappings.  So, remote requests on such rsb's can potentially
539  * return with an error, which means the mapping is stale and needs to
540  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
541  * first_lkid is to keep only a single outstanding request on an rsb
542  * while that rsb has a potentially stale master.)
543  */
544 
545 static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
546 			uint32_t hash, uint32_t b,
547 			int dir_nodeid, int from_nodeid,
548 			unsigned int flags, struct dlm_rsb **r_ret)
549 {
550 	struct dlm_rsb *r = NULL;
551 	int our_nodeid = dlm_our_nodeid();
552 	int from_local = 0;
553 	int from_other = 0;
554 	int from_dir = 0;
555 	int create = 0;
556 	int error;
557 
558 	if (flags & R_RECEIVE_REQUEST) {
559 		if (from_nodeid == dir_nodeid)
560 			from_dir = 1;
561 		else
562 			from_other = 1;
563 	} else if (flags & R_REQUEST) {
564 		from_local = 1;
565 	}
566 
567 	/*
568 	 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
569 	 * from_nodeid has sent us a lock in dlm_recover_locks, believing
570 	 * we're the new master.  Our local recovery may not have set
571 	 * res_master_nodeid to our_nodeid yet, so allow either.  Don't
572 	 * create the rsb; dlm_recover_process_copy() will handle EBADR
573 	 * by resending.
574 	 *
575 	 * If someone sends us a request, we are the dir node, and we do
576 	 * not find the rsb anywhere, then recreate it.  This happens if
577 	 * someone sends us a request after we have removed/freed an rsb
578 	 * from our toss list.  (They sent a request instead of lookup
579 	 * because they are using an rsb from their toss list.)
580 	 */
581 
582 	if (from_local || from_dir ||
583 	    (from_other && (dir_nodeid == our_nodeid))) {
584 		create = 1;
585 	}
586 
587  retry:
588 	if (create) {
589 		error = pre_rsb_struct(ls);
590 		if (error < 0)
591 			goto out;
592 	}
593 
594 	spin_lock(&ls->ls_rsbtbl[b].lock);
595 
596 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
597 	if (error)
598 		goto do_toss;
599 
600 	/*
601 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
602 	 */
603 
604 	kref_get(&r->res_ref);
605 	error = 0;
606 	goto out_unlock;
607 
608 
609  do_toss:
610 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
611 	if (error)
612 		goto do_new;
613 
614 	/*
615 	 * rsb found inactive (master_nodeid may be out of date unless
616 	 * we are the dir_nodeid or were the master)  No other thread
617 	 * is using this rsb because it's on the toss list, so we can
618 	 * look at or update res_master_nodeid without lock_rsb.
619 	 */
620 
621 	if ((r->res_master_nodeid != our_nodeid) && from_other) {
622 		/* our rsb was not master, and another node (not the dir node)
623 		   has sent us a request */
624 		log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
625 			  from_nodeid, r->res_master_nodeid, dir_nodeid,
626 			  r->res_name);
627 		error = -ENOTBLK;
628 		goto out_unlock;
629 	}
630 
631 	if ((r->res_master_nodeid != our_nodeid) && from_dir) {
632 		/* don't think this should ever happen */
633 		log_error(ls, "find_rsb toss from_dir %d master %d",
634 			  from_nodeid, r->res_master_nodeid);
635 		dlm_print_rsb(r);
636 		/* fix it and go on */
637 		r->res_master_nodeid = our_nodeid;
638 		r->res_nodeid = 0;
639 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
640 		r->res_first_lkid = 0;
641 	}
642 
643 	if (from_local && (r->res_master_nodeid != our_nodeid)) {
644 		/* Because we have held no locks on this rsb,
645 		   res_master_nodeid could have become stale. */
646 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
647 		r->res_first_lkid = 0;
648 	}
649 
650 	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
651 	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
652 	goto out_unlock;
653 
654 
655  do_new:
656 	/*
657 	 * rsb not found
658 	 */
659 
660 	if (error == -EBADR && !create)
661 		goto out_unlock;
662 
663 	error = get_rsb_struct(ls, name, len, &r);
664 	if (error == -EAGAIN) {
665 		spin_unlock(&ls->ls_rsbtbl[b].lock);
666 		goto retry;
667 	}
668 	if (error)
669 		goto out_unlock;
670 
671 	r->res_hash = hash;
672 	r->res_bucket = b;
673 	r->res_dir_nodeid = dir_nodeid;
674 	kref_init(&r->res_ref);
675 
676 	if (from_dir) {
677 		/* want to see how often this happens */
678 		log_debug(ls, "find_rsb new from_dir %d recreate %s",
679 			  from_nodeid, r->res_name);
680 		r->res_master_nodeid = our_nodeid;
681 		r->res_nodeid = 0;
682 		goto out_add;
683 	}
684 
685 	if (from_other && (dir_nodeid != our_nodeid)) {
686 		/* should never happen */
687 		log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
688 			  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
689 		dlm_free_rsb(r);
690 		r = NULL;
691 		error = -ENOTBLK;
692 		goto out_unlock;
693 	}
694 
695 	if (from_other) {
696 		log_debug(ls, "find_rsb new from_other %d dir %d %s",
697 			  from_nodeid, dir_nodeid, r->res_name);
698 	}
699 
700 	if (dir_nodeid == our_nodeid) {
701 		/* When we are the dir nodeid, we can set the master
702 		   node immediately */
703 		r->res_master_nodeid = our_nodeid;
704 		r->res_nodeid = 0;
705 	} else {
706 		/* set_master will send_lookup to dir_nodeid */
707 		r->res_master_nodeid = 0;
708 		r->res_nodeid = -1;
709 	}
710 
711  out_add:
712 	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
713  out_unlock:
714 	spin_unlock(&ls->ls_rsbtbl[b].lock);
715  out:
716 	*r_ret = r;
717 	return error;
718 }
719 
720 /* During recovery, other nodes can send us new MSTCPY locks (from
721    dlm_recover_locks) before we've made ourself master (in
722    dlm_recover_masters). */
723 
724 static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
725 			  uint32_t hash, uint32_t b,
726 			  int dir_nodeid, int from_nodeid,
727 			  unsigned int flags, struct dlm_rsb **r_ret)
728 {
729 	struct dlm_rsb *r = NULL;
730 	int our_nodeid = dlm_our_nodeid();
731 	int recover = (flags & R_RECEIVE_RECOVER);
732 	int error;
733 
734  retry:
735 	error = pre_rsb_struct(ls);
736 	if (error < 0)
737 		goto out;
738 
739 	spin_lock(&ls->ls_rsbtbl[b].lock);
740 
741 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
742 	if (error)
743 		goto do_toss;
744 
745 	/*
746 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
747 	 */
748 
749 	kref_get(&r->res_ref);
750 	goto out_unlock;
751 
752 
753  do_toss:
754 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
755 	if (error)
756 		goto do_new;
757 
758 	/*
759 	 * rsb found inactive. No other thread is using this rsb because
760 	 * it's on the toss list, so we can look at or update
761 	 * res_master_nodeid without lock_rsb.
762 	 */
763 
764 	if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
765 		/* our rsb is not master, and another node has sent us a
766 		   request; this should never happen */
767 		log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
768 			  from_nodeid, r->res_master_nodeid, dir_nodeid);
769 		dlm_print_rsb(r);
770 		error = -ENOTBLK;
771 		goto out_unlock;
772 	}
773 
774 	if (!recover && (r->res_master_nodeid != our_nodeid) &&
775 	    (dir_nodeid == our_nodeid)) {
776 		/* our rsb is not master, and we are dir; may as well fix it;
777 		   this should never happen */
778 		log_error(ls, "find_rsb toss our %d master %d dir %d",
779 			  our_nodeid, r->res_master_nodeid, dir_nodeid);
780 		dlm_print_rsb(r);
781 		r->res_master_nodeid = our_nodeid;
782 		r->res_nodeid = 0;
783 	}
784 
785 	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
786 	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
787 	goto out_unlock;
788 
789 
790  do_new:
791 	/*
792 	 * rsb not found
793 	 */
794 
795 	error = get_rsb_struct(ls, name, len, &r);
796 	if (error == -EAGAIN) {
797 		spin_unlock(&ls->ls_rsbtbl[b].lock);
798 		goto retry;
799 	}
800 	if (error)
801 		goto out_unlock;
802 
803 	r->res_hash = hash;
804 	r->res_bucket = b;
805 	r->res_dir_nodeid = dir_nodeid;
806 	r->res_master_nodeid = dir_nodeid;
807 	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
808 	kref_init(&r->res_ref);
809 
810 	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
811  out_unlock:
812 	spin_unlock(&ls->ls_rsbtbl[b].lock);
813  out:
814 	*r_ret = r;
815 	return error;
816 }
817 
818 static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
819 		    unsigned int flags, struct dlm_rsb **r_ret)
820 {
821 	uint32_t hash, b;
822 	int dir_nodeid;
823 
824 	if (len > DLM_RESNAME_MAXLEN)
825 		return -EINVAL;
826 
827 	hash = jhash(name, len, 0);
828 	b = hash & (ls->ls_rsbtbl_size - 1);
829 
830 	dir_nodeid = dlm_hash2nodeid(ls, hash);
831 
832 	if (dlm_no_directory(ls))
833 		return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
834 				      from_nodeid, flags, r_ret);
835 	else
836 		return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
837 				      from_nodeid, flags, r_ret);
838 }
839 
840 /* we have received a request and found that res_master_nodeid != our_nodeid,
841    so we need to return an error or make ourself the master */
842 
843 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
844 				  int from_nodeid)
845 {
846 	if (dlm_no_directory(ls)) {
847 		log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
848 			  from_nodeid, r->res_master_nodeid,
849 			  r->res_dir_nodeid);
850 		dlm_print_rsb(r);
851 		return -ENOTBLK;
852 	}
853 
854 	if (from_nodeid != r->res_dir_nodeid) {
855 		/* our rsb is not master, and another node (not the dir node)
856 	   	   has sent us a request.  this is much more common when our
857 	   	   master_nodeid is zero, so limit debug to non-zero.  */
858 
859 		if (r->res_master_nodeid) {
860 			log_debug(ls, "validate master from_other %d master %d "
861 				  "dir %d first %x %s", from_nodeid,
862 				  r->res_master_nodeid, r->res_dir_nodeid,
863 				  r->res_first_lkid, r->res_name);
864 		}
865 		return -ENOTBLK;
866 	} else {
867 		/* our rsb is not master, but the dir nodeid has sent us a
868 	   	   request; this could happen with master 0 / res_nodeid -1 */
869 
870 		if (r->res_master_nodeid) {
871 			log_error(ls, "validate master from_dir %d master %d "
872 				  "first %x %s",
873 				  from_nodeid, r->res_master_nodeid,
874 				  r->res_first_lkid, r->res_name);
875 		}
876 
877 		r->res_master_nodeid = dlm_our_nodeid();
878 		r->res_nodeid = 0;
879 		return 0;
880 	}
881 }
882 
883 /*
884  * We're the dir node for this res and another node wants to know the
885  * master nodeid.  During normal operation (non recovery) this is only
886  * called from receive_lookup(); master lookups when the local node is
887  * the dir node are done by find_rsb().
888  *
889  * normal operation, we are the dir node for a resource
890  * . _request_lock
891  * . set_master
892  * . send_lookup
893  * . receive_lookup
894  * . dlm_master_lookup flags 0
895  *
896  * recover directory, we are rebuilding dir for all resources
897  * . dlm_recover_directory
898  * . dlm_rcom_names
899  *   remote node sends back the rsb names it is master of and we are dir of
900  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
901  *   we either create new rsb setting remote node as master, or find existing
902  *   rsb and set master to be the remote node.
903  *
904  * recover masters, we are finding the new master for resources
905  * . dlm_recover_masters
906  * . recover_master
907  * . dlm_send_rcom_lookup
908  * . receive_rcom_lookup
909  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
910  */
911 
912 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
913 		      unsigned int flags, int *r_nodeid, int *result)
914 {
915 	struct dlm_rsb *r = NULL;
916 	uint32_t hash, b;
917 	int from_master = (flags & DLM_LU_RECOVER_DIR);
918 	int fix_master = (flags & DLM_LU_RECOVER_MASTER);
919 	int our_nodeid = dlm_our_nodeid();
920 	int dir_nodeid, error, toss_list = 0;
921 
922 	if (len > DLM_RESNAME_MAXLEN)
923 		return -EINVAL;
924 
925 	if (from_nodeid == our_nodeid) {
926 		log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
927 			  our_nodeid, flags);
928 		return -EINVAL;
929 	}
930 
931 	hash = jhash(name, len, 0);
932 	b = hash & (ls->ls_rsbtbl_size - 1);
933 
934 	dir_nodeid = dlm_hash2nodeid(ls, hash);
935 	if (dir_nodeid != our_nodeid) {
936 		log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
937 			  from_nodeid, dir_nodeid, our_nodeid, hash,
938 			  ls->ls_num_nodes);
939 		*r_nodeid = -1;
940 		return -EINVAL;
941 	}
942 
943  retry:
944 	error = pre_rsb_struct(ls);
945 	if (error < 0)
946 		return error;
947 
948 	spin_lock(&ls->ls_rsbtbl[b].lock);
949 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
950 	if (!error) {
951 		/* because the rsb is active, we need to lock_rsb before
952 		   checking/changing re_master_nodeid */
953 
954 		hold_rsb(r);
955 		spin_unlock(&ls->ls_rsbtbl[b].lock);
956 		lock_rsb(r);
957 		goto found;
958 	}
959 
960 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
961 	if (error)
962 		goto not_found;
963 
964 	/* because the rsb is inactive (on toss list), it's not refcounted
965 	   and lock_rsb is not used, but is protected by the rsbtbl lock */
966 
967 	toss_list = 1;
968  found:
969 	if (r->res_dir_nodeid != our_nodeid) {
970 		/* should not happen, but may as well fix it and carry on */
971 		log_error(ls, "dlm_master_lookup res_dir %d our %d %s",
972 			  r->res_dir_nodeid, our_nodeid, r->res_name);
973 		r->res_dir_nodeid = our_nodeid;
974 	}
975 
976 	if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
977 		/* Recovery uses this function to set a new master when
978 		   the previous master failed.  Setting NEW_MASTER will
979 		   force dlm_recover_masters to call recover_master on this
980 		   rsb even though the res_nodeid is no longer removed. */
981 
982 		r->res_master_nodeid = from_nodeid;
983 		r->res_nodeid = from_nodeid;
984 		rsb_set_flag(r, RSB_NEW_MASTER);
985 
986 		if (toss_list) {
987 			/* I don't think we should ever find it on toss list. */
988 			log_error(ls, "dlm_master_lookup fix_master on toss");
989 			dlm_dump_rsb(r);
990 		}
991 	}
992 
993 	if (from_master && (r->res_master_nodeid != from_nodeid)) {
994 		/* this will happen if from_nodeid became master during
995 		   a previous recovery cycle, and we aborted the previous
996 		   cycle before recovering this master value */
997 
998 		log_limit(ls, "dlm_master_lookup from_master %d "
999 			  "master_nodeid %d res_nodeid %d first %x %s",
1000 			  from_nodeid, r->res_master_nodeid, r->res_nodeid,
1001 			  r->res_first_lkid, r->res_name);
1002 
1003 		if (r->res_master_nodeid == our_nodeid) {
1004 			log_error(ls, "from_master %d our_master", from_nodeid);
1005 			dlm_dump_rsb(r);
1006 			goto out_found;
1007 		}
1008 
1009 		r->res_master_nodeid = from_nodeid;
1010 		r->res_nodeid = from_nodeid;
1011 		rsb_set_flag(r, RSB_NEW_MASTER);
1012 	}
1013 
1014 	if (!r->res_master_nodeid) {
1015 		/* this will happen if recovery happens while we're looking
1016 		   up the master for this rsb */
1017 
1018 		log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s",
1019 			  from_nodeid, r->res_first_lkid, r->res_name);
1020 		r->res_master_nodeid = from_nodeid;
1021 		r->res_nodeid = from_nodeid;
1022 	}
1023 
1024 	if (!from_master && !fix_master &&
1025 	    (r->res_master_nodeid == from_nodeid)) {
1026 		/* this can happen when the master sends remove, the dir node
1027 		   finds the rsb on the keep list and ignores the remove,
1028 		   and the former master sends a lookup */
1029 
1030 		log_limit(ls, "dlm_master_lookup from master %d flags %x "
1031 			  "first %x %s", from_nodeid, flags,
1032 			  r->res_first_lkid, r->res_name);
1033 	}
1034 
1035  out_found:
1036 	*r_nodeid = r->res_master_nodeid;
1037 	if (result)
1038 		*result = DLM_LU_MATCH;
1039 
1040 	if (toss_list) {
1041 		r->res_toss_time = jiffies;
1042 		/* the rsb was inactive (on toss list) */
1043 		spin_unlock(&ls->ls_rsbtbl[b].lock);
1044 	} else {
1045 		/* the rsb was active */
1046 		unlock_rsb(r);
1047 		put_rsb(r);
1048 	}
1049 	return 0;
1050 
1051  not_found:
1052 	error = get_rsb_struct(ls, name, len, &r);
1053 	if (error == -EAGAIN) {
1054 		spin_unlock(&ls->ls_rsbtbl[b].lock);
1055 		goto retry;
1056 	}
1057 	if (error)
1058 		goto out_unlock;
1059 
1060 	r->res_hash = hash;
1061 	r->res_bucket = b;
1062 	r->res_dir_nodeid = our_nodeid;
1063 	r->res_master_nodeid = from_nodeid;
1064 	r->res_nodeid = from_nodeid;
1065 	kref_init(&r->res_ref);
1066 	r->res_toss_time = jiffies;
1067 
1068 	error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1069 	if (error) {
1070 		/* should never happen */
1071 		dlm_free_rsb(r);
1072 		spin_unlock(&ls->ls_rsbtbl[b].lock);
1073 		goto retry;
1074 	}
1075 
1076 	if (result)
1077 		*result = DLM_LU_ADD;
1078 	*r_nodeid = from_nodeid;
1079 	error = 0;
1080  out_unlock:
1081 	spin_unlock(&ls->ls_rsbtbl[b].lock);
1082 	return error;
1083 }
1084 
1085 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1086 {
1087 	struct rb_node *n;
1088 	struct dlm_rsb *r;
1089 	int i;
1090 
1091 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1092 		spin_lock(&ls->ls_rsbtbl[i].lock);
1093 		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1094 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
1095 			if (r->res_hash == hash)
1096 				dlm_dump_rsb(r);
1097 		}
1098 		spin_unlock(&ls->ls_rsbtbl[i].lock);
1099 	}
1100 }
1101 
1102 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1103 {
1104 	struct dlm_rsb *r = NULL;
1105 	uint32_t hash, b;
1106 	int error;
1107 
1108 	hash = jhash(name, len, 0);
1109 	b = hash & (ls->ls_rsbtbl_size - 1);
1110 
1111 	spin_lock(&ls->ls_rsbtbl[b].lock);
1112 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1113 	if (!error)
1114 		goto out_dump;
1115 
1116 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1117 	if (error)
1118 		goto out;
1119  out_dump:
1120 	dlm_dump_rsb(r);
1121  out:
1122 	spin_unlock(&ls->ls_rsbtbl[b].lock);
1123 }
1124 
1125 static void toss_rsb(struct kref *kref)
1126 {
1127 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1128 	struct dlm_ls *ls = r->res_ls;
1129 
1130 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1131 	kref_init(&r->res_ref);
1132 	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1133 	rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1134 	r->res_toss_time = jiffies;
1135 	ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK;
1136 	if (r->res_lvbptr) {
1137 		dlm_free_lvb(r->res_lvbptr);
1138 		r->res_lvbptr = NULL;
1139 	}
1140 }
1141 
1142 /* See comment for unhold_lkb */
1143 
1144 static void unhold_rsb(struct dlm_rsb *r)
1145 {
1146 	int rv;
1147 	rv = kref_put(&r->res_ref, toss_rsb);
1148 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
1149 }
1150 
1151 static void kill_rsb(struct kref *kref)
1152 {
1153 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1154 
1155 	/* All work is done after the return from kref_put() so we
1156 	   can release the write_lock before the remove and free. */
1157 
1158 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1159 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1160 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1161 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1162 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1163 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1164 }
1165 
1166 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1167    The rsb must exist as long as any lkb's for it do. */
1168 
1169 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1170 {
1171 	hold_rsb(r);
1172 	lkb->lkb_resource = r;
1173 }
1174 
1175 static void detach_lkb(struct dlm_lkb *lkb)
1176 {
1177 	if (lkb->lkb_resource) {
1178 		put_rsb(lkb->lkb_resource);
1179 		lkb->lkb_resource = NULL;
1180 	}
1181 }
1182 
1183 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1184 {
1185 	struct dlm_lkb *lkb;
1186 	int rv;
1187 
1188 	lkb = dlm_allocate_lkb(ls);
1189 	if (!lkb)
1190 		return -ENOMEM;
1191 
1192 	lkb->lkb_nodeid = -1;
1193 	lkb->lkb_grmode = DLM_LOCK_IV;
1194 	kref_init(&lkb->lkb_ref);
1195 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1196 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1197 	INIT_LIST_HEAD(&lkb->lkb_time_list);
1198 	INIT_LIST_HEAD(&lkb->lkb_cb_list);
1199 	mutex_init(&lkb->lkb_cb_mutex);
1200 	INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1201 
1202 	idr_preload(GFP_NOFS);
1203 	spin_lock(&ls->ls_lkbidr_spin);
1204 	rv = idr_alloc(&ls->ls_lkbidr, lkb, 1, 0, GFP_NOWAIT);
1205 	if (rv >= 0)
1206 		lkb->lkb_id = rv;
1207 	spin_unlock(&ls->ls_lkbidr_spin);
1208 	idr_preload_end();
1209 
1210 	if (rv < 0) {
1211 		log_error(ls, "create_lkb idr error %d", rv);
1212 		return rv;
1213 	}
1214 
1215 	*lkb_ret = lkb;
1216 	return 0;
1217 }
1218 
1219 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1220 {
1221 	struct dlm_lkb *lkb;
1222 
1223 	spin_lock(&ls->ls_lkbidr_spin);
1224 	lkb = idr_find(&ls->ls_lkbidr, lkid);
1225 	if (lkb)
1226 		kref_get(&lkb->lkb_ref);
1227 	spin_unlock(&ls->ls_lkbidr_spin);
1228 
1229 	*lkb_ret = lkb;
1230 	return lkb ? 0 : -ENOENT;
1231 }
1232 
1233 static void kill_lkb(struct kref *kref)
1234 {
1235 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1236 
1237 	/* All work is done after the return from kref_put() so we
1238 	   can release the write_lock before the detach_lkb */
1239 
1240 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1241 }
1242 
1243 /* __put_lkb() is used when an lkb may not have an rsb attached to
1244    it so we need to provide the lockspace explicitly */
1245 
1246 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1247 {
1248 	uint32_t lkid = lkb->lkb_id;
1249 
1250 	spin_lock(&ls->ls_lkbidr_spin);
1251 	if (kref_put(&lkb->lkb_ref, kill_lkb)) {
1252 		idr_remove(&ls->ls_lkbidr, lkid);
1253 		spin_unlock(&ls->ls_lkbidr_spin);
1254 
1255 		detach_lkb(lkb);
1256 
1257 		/* for local/process lkbs, lvbptr points to caller's lksb */
1258 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
1259 			dlm_free_lvb(lkb->lkb_lvbptr);
1260 		dlm_free_lkb(lkb);
1261 		return 1;
1262 	} else {
1263 		spin_unlock(&ls->ls_lkbidr_spin);
1264 		return 0;
1265 	}
1266 }
1267 
1268 int dlm_put_lkb(struct dlm_lkb *lkb)
1269 {
1270 	struct dlm_ls *ls;
1271 
1272 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1273 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1274 
1275 	ls = lkb->lkb_resource->res_ls;
1276 	return __put_lkb(ls, lkb);
1277 }
1278 
1279 /* This is only called to add a reference when the code already holds
1280    a valid reference to the lkb, so there's no need for locking. */
1281 
1282 static inline void hold_lkb(struct dlm_lkb *lkb)
1283 {
1284 	kref_get(&lkb->lkb_ref);
1285 }
1286 
1287 /* This is called when we need to remove a reference and are certain
1288    it's not the last ref.  e.g. del_lkb is always called between a
1289    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1290    put_lkb would work fine, but would involve unnecessary locking */
1291 
1292 static inline void unhold_lkb(struct dlm_lkb *lkb)
1293 {
1294 	int rv;
1295 	rv = kref_put(&lkb->lkb_ref, kill_lkb);
1296 	DLM_ASSERT(!rv, dlm_print_lkb(lkb););
1297 }
1298 
1299 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1300 			    int mode)
1301 {
1302 	struct dlm_lkb *lkb = NULL;
1303 
1304 	list_for_each_entry(lkb, head, lkb_statequeue)
1305 		if (lkb->lkb_rqmode < mode)
1306 			break;
1307 
1308 	__list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
1309 }
1310 
1311 /* add/remove lkb to rsb's grant/convert/wait queue */
1312 
1313 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1314 {
1315 	kref_get(&lkb->lkb_ref);
1316 
1317 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1318 
1319 	lkb->lkb_timestamp = ktime_get();
1320 
1321 	lkb->lkb_status = status;
1322 
1323 	switch (status) {
1324 	case DLM_LKSTS_WAITING:
1325 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1326 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1327 		else
1328 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1329 		break;
1330 	case DLM_LKSTS_GRANTED:
1331 		/* convention says granted locks kept in order of grmode */
1332 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1333 				lkb->lkb_grmode);
1334 		break;
1335 	case DLM_LKSTS_CONVERT:
1336 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1337 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1338 		else
1339 			list_add_tail(&lkb->lkb_statequeue,
1340 				      &r->res_convertqueue);
1341 		break;
1342 	default:
1343 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1344 	}
1345 }
1346 
1347 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1348 {
1349 	lkb->lkb_status = 0;
1350 	list_del(&lkb->lkb_statequeue);
1351 	unhold_lkb(lkb);
1352 }
1353 
1354 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1355 {
1356 	hold_lkb(lkb);
1357 	del_lkb(r, lkb);
1358 	add_lkb(r, lkb, sts);
1359 	unhold_lkb(lkb);
1360 }
1361 
1362 static int msg_reply_type(int mstype)
1363 {
1364 	switch (mstype) {
1365 	case DLM_MSG_REQUEST:
1366 		return DLM_MSG_REQUEST_REPLY;
1367 	case DLM_MSG_CONVERT:
1368 		return DLM_MSG_CONVERT_REPLY;
1369 	case DLM_MSG_UNLOCK:
1370 		return DLM_MSG_UNLOCK_REPLY;
1371 	case DLM_MSG_CANCEL:
1372 		return DLM_MSG_CANCEL_REPLY;
1373 	case DLM_MSG_LOOKUP:
1374 		return DLM_MSG_LOOKUP_REPLY;
1375 	}
1376 	return -1;
1377 }
1378 
1379 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
1380 {
1381 	int i;
1382 
1383 	for (i = 0; i < num_nodes; i++) {
1384 		if (!warned[i]) {
1385 			warned[i] = nodeid;
1386 			return 0;
1387 		}
1388 		if (warned[i] == nodeid)
1389 			return 1;
1390 	}
1391 	return 0;
1392 }
1393 
1394 void dlm_scan_waiters(struct dlm_ls *ls)
1395 {
1396 	struct dlm_lkb *lkb;
1397 	s64 us;
1398 	s64 debug_maxus = 0;
1399 	u32 debug_scanned = 0;
1400 	u32 debug_expired = 0;
1401 	int num_nodes = 0;
1402 	int *warned = NULL;
1403 
1404 	if (!dlm_config.ci_waitwarn_us)
1405 		return;
1406 
1407 	mutex_lock(&ls->ls_waiters_mutex);
1408 
1409 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1410 		if (!lkb->lkb_wait_time)
1411 			continue;
1412 
1413 		debug_scanned++;
1414 
1415 		us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
1416 
1417 		if (us < dlm_config.ci_waitwarn_us)
1418 			continue;
1419 
1420 		lkb->lkb_wait_time = 0;
1421 
1422 		debug_expired++;
1423 		if (us > debug_maxus)
1424 			debug_maxus = us;
1425 
1426 		if (!num_nodes) {
1427 			num_nodes = ls->ls_num_nodes;
1428 			warned = kcalloc(num_nodes, sizeof(int), GFP_KERNEL);
1429 		}
1430 		if (!warned)
1431 			continue;
1432 		if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
1433 			continue;
1434 
1435 		log_error(ls, "waitwarn %x %lld %d us check connection to "
1436 			  "node %d", lkb->lkb_id, (long long)us,
1437 			  dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
1438 	}
1439 	mutex_unlock(&ls->ls_waiters_mutex);
1440 	kfree(warned);
1441 
1442 	if (debug_expired)
1443 		log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
1444 			  debug_scanned, debug_expired,
1445 			  dlm_config.ci_waitwarn_us, (long long)debug_maxus);
1446 }
1447 
1448 /* add/remove lkb from global waiters list of lkb's waiting for
1449    a reply from a remote node */
1450 
1451 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1452 {
1453 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1454 	int error = 0;
1455 
1456 	mutex_lock(&ls->ls_waiters_mutex);
1457 
1458 	if (is_overlap_unlock(lkb) ||
1459 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1460 		error = -EINVAL;
1461 		goto out;
1462 	}
1463 
1464 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1465 		switch (mstype) {
1466 		case DLM_MSG_UNLOCK:
1467 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1468 			break;
1469 		case DLM_MSG_CANCEL:
1470 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1471 			break;
1472 		default:
1473 			error = -EBUSY;
1474 			goto out;
1475 		}
1476 		lkb->lkb_wait_count++;
1477 		hold_lkb(lkb);
1478 
1479 		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1480 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
1481 			  lkb->lkb_wait_count, lkb->lkb_flags);
1482 		goto out;
1483 	}
1484 
1485 	DLM_ASSERT(!lkb->lkb_wait_count,
1486 		   dlm_print_lkb(lkb);
1487 		   printk("wait_count %d\n", lkb->lkb_wait_count););
1488 
1489 	lkb->lkb_wait_count++;
1490 	lkb->lkb_wait_type = mstype;
1491 	lkb->lkb_wait_time = ktime_get();
1492 	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1493 	hold_lkb(lkb);
1494 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1495  out:
1496 	if (error)
1497 		log_error(ls, "addwait error %x %d flags %x %d %d %s",
1498 			  lkb->lkb_id, error, lkb->lkb_flags, mstype,
1499 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1500 	mutex_unlock(&ls->ls_waiters_mutex);
1501 	return error;
1502 }
1503 
1504 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1505    list as part of process_requestqueue (e.g. a lookup that has an optimized
1506    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1507    set RESEND and dlm_recover_waiters_post() */
1508 
1509 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1510 				struct dlm_message *ms)
1511 {
1512 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1513 	int overlap_done = 0;
1514 
1515 	if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1516 		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1517 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1518 		overlap_done = 1;
1519 		goto out_del;
1520 	}
1521 
1522 	if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1523 		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1524 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1525 		overlap_done = 1;
1526 		goto out_del;
1527 	}
1528 
1529 	/* Cancel state was preemptively cleared by a successful convert,
1530 	   see next comment, nothing to do. */
1531 
1532 	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1533 	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1534 		log_debug(ls, "remwait %x cancel_reply wait_type %d",
1535 			  lkb->lkb_id, lkb->lkb_wait_type);
1536 		return -1;
1537 	}
1538 
1539 	/* Remove for the convert reply, and premptively remove for the
1540 	   cancel reply.  A convert has been granted while there's still
1541 	   an outstanding cancel on it (the cancel is moot and the result
1542 	   in the cancel reply should be 0).  We preempt the cancel reply
1543 	   because the app gets the convert result and then can follow up
1544 	   with another op, like convert.  This subsequent op would see the
1545 	   lingering state of the cancel and fail with -EBUSY. */
1546 
1547 	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1548 	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1549 	    is_overlap_cancel(lkb) && ms && !ms->m_result) {
1550 		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1551 			  lkb->lkb_id);
1552 		lkb->lkb_wait_type = 0;
1553 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1554 		lkb->lkb_wait_count--;
1555 		goto out_del;
1556 	}
1557 
1558 	/* N.B. type of reply may not always correspond to type of original
1559 	   msg due to lookup->request optimization, verify others? */
1560 
1561 	if (lkb->lkb_wait_type) {
1562 		lkb->lkb_wait_type = 0;
1563 		goto out_del;
1564 	}
1565 
1566 	log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1567 		  lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1568 		  mstype, lkb->lkb_flags);
1569 	return -1;
1570 
1571  out_del:
1572 	/* the force-unlock/cancel has completed and we haven't recvd a reply
1573 	   to the op that was in progress prior to the unlock/cancel; we
1574 	   give up on any reply to the earlier op.  FIXME: not sure when/how
1575 	   this would happen */
1576 
1577 	if (overlap_done && lkb->lkb_wait_type) {
1578 		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1579 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
1580 		lkb->lkb_wait_count--;
1581 		lkb->lkb_wait_type = 0;
1582 	}
1583 
1584 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1585 
1586 	lkb->lkb_flags &= ~DLM_IFL_RESEND;
1587 	lkb->lkb_wait_count--;
1588 	if (!lkb->lkb_wait_count)
1589 		list_del_init(&lkb->lkb_wait_reply);
1590 	unhold_lkb(lkb);
1591 	return 0;
1592 }
1593 
1594 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1595 {
1596 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1597 	int error;
1598 
1599 	mutex_lock(&ls->ls_waiters_mutex);
1600 	error = _remove_from_waiters(lkb, mstype, NULL);
1601 	mutex_unlock(&ls->ls_waiters_mutex);
1602 	return error;
1603 }
1604 
1605 /* Handles situations where we might be processing a "fake" or "stub" reply in
1606    which we can't try to take waiters_mutex again. */
1607 
1608 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1609 {
1610 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1611 	int error;
1612 
1613 	if (ms->m_flags != DLM_IFL_STUB_MS)
1614 		mutex_lock(&ls->ls_waiters_mutex);
1615 	error = _remove_from_waiters(lkb, ms->m_type, ms);
1616 	if (ms->m_flags != DLM_IFL_STUB_MS)
1617 		mutex_unlock(&ls->ls_waiters_mutex);
1618 	return error;
1619 }
1620 
1621 /* If there's an rsb for the same resource being removed, ensure
1622    that the remove message is sent before the new lookup message.
1623    It should be rare to need a delay here, but if not, then it may
1624    be worthwhile to add a proper wait mechanism rather than a delay. */
1625 
1626 static void wait_pending_remove(struct dlm_rsb *r)
1627 {
1628 	struct dlm_ls *ls = r->res_ls;
1629  restart:
1630 	spin_lock(&ls->ls_remove_spin);
1631 	if (ls->ls_remove_len &&
1632 	    !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
1633 		log_debug(ls, "delay lookup for remove dir %d %s",
1634 		  	  r->res_dir_nodeid, r->res_name);
1635 		spin_unlock(&ls->ls_remove_spin);
1636 		msleep(1);
1637 		goto restart;
1638 	}
1639 	spin_unlock(&ls->ls_remove_spin);
1640 }
1641 
1642 /*
1643  * ls_remove_spin protects ls_remove_name and ls_remove_len which are
1644  * read by other threads in wait_pending_remove.  ls_remove_names
1645  * and ls_remove_lens are only used by the scan thread, so they do
1646  * not need protection.
1647  */
1648 
1649 static void shrink_bucket(struct dlm_ls *ls, int b)
1650 {
1651 	struct rb_node *n, *next;
1652 	struct dlm_rsb *r;
1653 	char *name;
1654 	int our_nodeid = dlm_our_nodeid();
1655 	int remote_count = 0;
1656 	int need_shrink = 0;
1657 	int i, len, rv;
1658 
1659 	memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1660 
1661 	spin_lock(&ls->ls_rsbtbl[b].lock);
1662 
1663 	if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) {
1664 		spin_unlock(&ls->ls_rsbtbl[b].lock);
1665 		return;
1666 	}
1667 
1668 	for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1669 		next = rb_next(n);
1670 		r = rb_entry(n, struct dlm_rsb, res_hashnode);
1671 
1672 		/* If we're the directory record for this rsb, and
1673 		   we're not the master of it, then we need to wait
1674 		   for the master node to send us a dir remove for
1675 		   before removing the dir record. */
1676 
1677 		if (!dlm_no_directory(ls) &&
1678 		    (r->res_master_nodeid != our_nodeid) &&
1679 		    (dlm_dir_nodeid(r) == our_nodeid)) {
1680 			continue;
1681 		}
1682 
1683 		need_shrink = 1;
1684 
1685 		if (!time_after_eq(jiffies, r->res_toss_time +
1686 				   dlm_config.ci_toss_secs * HZ)) {
1687 			continue;
1688 		}
1689 
1690 		if (!dlm_no_directory(ls) &&
1691 		    (r->res_master_nodeid == our_nodeid) &&
1692 		    (dlm_dir_nodeid(r) != our_nodeid)) {
1693 
1694 			/* We're the master of this rsb but we're not
1695 			   the directory record, so we need to tell the
1696 			   dir node to remove the dir record. */
1697 
1698 			ls->ls_remove_lens[remote_count] = r->res_length;
1699 			memcpy(ls->ls_remove_names[remote_count], r->res_name,
1700 			       DLM_RESNAME_MAXLEN);
1701 			remote_count++;
1702 
1703 			if (remote_count >= DLM_REMOVE_NAMES_MAX)
1704 				break;
1705 			continue;
1706 		}
1707 
1708 		if (!kref_put(&r->res_ref, kill_rsb)) {
1709 			log_error(ls, "tossed rsb in use %s", r->res_name);
1710 			continue;
1711 		}
1712 
1713 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1714 		dlm_free_rsb(r);
1715 	}
1716 
1717 	if (need_shrink)
1718 		ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK;
1719 	else
1720 		ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK;
1721 	spin_unlock(&ls->ls_rsbtbl[b].lock);
1722 
1723 	/*
1724 	 * While searching for rsb's to free, we found some that require
1725 	 * remote removal.  We leave them in place and find them again here
1726 	 * so there is a very small gap between removing them from the toss
1727 	 * list and sending the removal.  Keeping this gap small is
1728 	 * important to keep us (the master node) from being out of sync
1729 	 * with the remote dir node for very long.
1730 	 *
1731 	 * From the time the rsb is removed from toss until just after
1732 	 * send_remove, the rsb name is saved in ls_remove_name.  A new
1733 	 * lookup checks this to ensure that a new lookup message for the
1734 	 * same resource name is not sent just before the remove message.
1735 	 */
1736 
1737 	for (i = 0; i < remote_count; i++) {
1738 		name = ls->ls_remove_names[i];
1739 		len = ls->ls_remove_lens[i];
1740 
1741 		spin_lock(&ls->ls_rsbtbl[b].lock);
1742 		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1743 		if (rv) {
1744 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1745 			log_debug(ls, "remove_name not toss %s", name);
1746 			continue;
1747 		}
1748 
1749 		if (r->res_master_nodeid != our_nodeid) {
1750 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1751 			log_debug(ls, "remove_name master %d dir %d our %d %s",
1752 				  r->res_master_nodeid, r->res_dir_nodeid,
1753 				  our_nodeid, name);
1754 			continue;
1755 		}
1756 
1757 		if (r->res_dir_nodeid == our_nodeid) {
1758 			/* should never happen */
1759 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1760 			log_error(ls, "remove_name dir %d master %d our %d %s",
1761 				  r->res_dir_nodeid, r->res_master_nodeid,
1762 				  our_nodeid, name);
1763 			continue;
1764 		}
1765 
1766 		if (!time_after_eq(jiffies, r->res_toss_time +
1767 				   dlm_config.ci_toss_secs * HZ)) {
1768 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1769 			log_debug(ls, "remove_name toss_time %lu now %lu %s",
1770 				  r->res_toss_time, jiffies, name);
1771 			continue;
1772 		}
1773 
1774 		if (!kref_put(&r->res_ref, kill_rsb)) {
1775 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1776 			log_error(ls, "remove_name in use %s", name);
1777 			continue;
1778 		}
1779 
1780 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1781 
1782 		/* block lookup of same name until we've sent remove */
1783 		spin_lock(&ls->ls_remove_spin);
1784 		ls->ls_remove_len = len;
1785 		memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
1786 		spin_unlock(&ls->ls_remove_spin);
1787 		spin_unlock(&ls->ls_rsbtbl[b].lock);
1788 
1789 		send_remove(r);
1790 
1791 		/* allow lookup of name again */
1792 		spin_lock(&ls->ls_remove_spin);
1793 		ls->ls_remove_len = 0;
1794 		memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
1795 		spin_unlock(&ls->ls_remove_spin);
1796 
1797 		dlm_free_rsb(r);
1798 	}
1799 }
1800 
1801 void dlm_scan_rsbs(struct dlm_ls *ls)
1802 {
1803 	int i;
1804 
1805 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1806 		shrink_bucket(ls, i);
1807 		if (dlm_locking_stopped(ls))
1808 			break;
1809 		cond_resched();
1810 	}
1811 }
1812 
1813 static void add_timeout(struct dlm_lkb *lkb)
1814 {
1815 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1816 
1817 	if (is_master_copy(lkb))
1818 		return;
1819 
1820 	if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1821 	    !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1822 		lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1823 		goto add_it;
1824 	}
1825 	if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1826 		goto add_it;
1827 	return;
1828 
1829  add_it:
1830 	DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1831 	mutex_lock(&ls->ls_timeout_mutex);
1832 	hold_lkb(lkb);
1833 	list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1834 	mutex_unlock(&ls->ls_timeout_mutex);
1835 }
1836 
1837 static void del_timeout(struct dlm_lkb *lkb)
1838 {
1839 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1840 
1841 	mutex_lock(&ls->ls_timeout_mutex);
1842 	if (!list_empty(&lkb->lkb_time_list)) {
1843 		list_del_init(&lkb->lkb_time_list);
1844 		unhold_lkb(lkb);
1845 	}
1846 	mutex_unlock(&ls->ls_timeout_mutex);
1847 }
1848 
1849 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1850    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1851    and then lock rsb because of lock ordering in add_timeout.  We may need
1852    to specify some special timeout-related bits in the lkb that are just to
1853    be accessed under the timeout_mutex. */
1854 
1855 void dlm_scan_timeout(struct dlm_ls *ls)
1856 {
1857 	struct dlm_rsb *r;
1858 	struct dlm_lkb *lkb;
1859 	int do_cancel, do_warn;
1860 	s64 wait_us;
1861 
1862 	for (;;) {
1863 		if (dlm_locking_stopped(ls))
1864 			break;
1865 
1866 		do_cancel = 0;
1867 		do_warn = 0;
1868 		mutex_lock(&ls->ls_timeout_mutex);
1869 		list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1870 
1871 			wait_us = ktime_to_us(ktime_sub(ktime_get(),
1872 					      		lkb->lkb_timestamp));
1873 
1874 			if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1875 			    wait_us >= (lkb->lkb_timeout_cs * 10000))
1876 				do_cancel = 1;
1877 
1878 			if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1879 			    wait_us >= dlm_config.ci_timewarn_cs * 10000)
1880 				do_warn = 1;
1881 
1882 			if (!do_cancel && !do_warn)
1883 				continue;
1884 			hold_lkb(lkb);
1885 			break;
1886 		}
1887 		mutex_unlock(&ls->ls_timeout_mutex);
1888 
1889 		if (!do_cancel && !do_warn)
1890 			break;
1891 
1892 		r = lkb->lkb_resource;
1893 		hold_rsb(r);
1894 		lock_rsb(r);
1895 
1896 		if (do_warn) {
1897 			/* clear flag so we only warn once */
1898 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1899 			if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1900 				del_timeout(lkb);
1901 			dlm_timeout_warn(lkb);
1902 		}
1903 
1904 		if (do_cancel) {
1905 			log_debug(ls, "timeout cancel %x node %d %s",
1906 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1907 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1908 			lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1909 			del_timeout(lkb);
1910 			_cancel_lock(r, lkb);
1911 		}
1912 
1913 		unlock_rsb(r);
1914 		unhold_rsb(r);
1915 		dlm_put_lkb(lkb);
1916 	}
1917 }
1918 
1919 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1920    dlm_recoverd before checking/setting ls_recover_begin. */
1921 
1922 void dlm_adjust_timeouts(struct dlm_ls *ls)
1923 {
1924 	struct dlm_lkb *lkb;
1925 	u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1926 
1927 	ls->ls_recover_begin = 0;
1928 	mutex_lock(&ls->ls_timeout_mutex);
1929 	list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1930 		lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1931 	mutex_unlock(&ls->ls_timeout_mutex);
1932 
1933 	if (!dlm_config.ci_waitwarn_us)
1934 		return;
1935 
1936 	mutex_lock(&ls->ls_waiters_mutex);
1937 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1938 		if (ktime_to_us(lkb->lkb_wait_time))
1939 			lkb->lkb_wait_time = ktime_get();
1940 	}
1941 	mutex_unlock(&ls->ls_waiters_mutex);
1942 }
1943 
1944 /* lkb is master or local copy */
1945 
1946 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1947 {
1948 	int b, len = r->res_ls->ls_lvblen;
1949 
1950 	/* b=1 lvb returned to caller
1951 	   b=0 lvb written to rsb or invalidated
1952 	   b=-1 do nothing */
1953 
1954 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1955 
1956 	if (b == 1) {
1957 		if (!lkb->lkb_lvbptr)
1958 			return;
1959 
1960 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1961 			return;
1962 
1963 		if (!r->res_lvbptr)
1964 			return;
1965 
1966 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1967 		lkb->lkb_lvbseq = r->res_lvbseq;
1968 
1969 	} else if (b == 0) {
1970 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1971 			rsb_set_flag(r, RSB_VALNOTVALID);
1972 			return;
1973 		}
1974 
1975 		if (!lkb->lkb_lvbptr)
1976 			return;
1977 
1978 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1979 			return;
1980 
1981 		if (!r->res_lvbptr)
1982 			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1983 
1984 		if (!r->res_lvbptr)
1985 			return;
1986 
1987 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1988 		r->res_lvbseq++;
1989 		lkb->lkb_lvbseq = r->res_lvbseq;
1990 		rsb_clear_flag(r, RSB_VALNOTVALID);
1991 	}
1992 
1993 	if (rsb_flag(r, RSB_VALNOTVALID))
1994 		lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1995 }
1996 
1997 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1998 {
1999 	if (lkb->lkb_grmode < DLM_LOCK_PW)
2000 		return;
2001 
2002 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
2003 		rsb_set_flag(r, RSB_VALNOTVALID);
2004 		return;
2005 	}
2006 
2007 	if (!lkb->lkb_lvbptr)
2008 		return;
2009 
2010 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2011 		return;
2012 
2013 	if (!r->res_lvbptr)
2014 		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
2015 
2016 	if (!r->res_lvbptr)
2017 		return;
2018 
2019 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2020 	r->res_lvbseq++;
2021 	rsb_clear_flag(r, RSB_VALNOTVALID);
2022 }
2023 
2024 /* lkb is process copy (pc) */
2025 
2026 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2027 			    struct dlm_message *ms)
2028 {
2029 	int b;
2030 
2031 	if (!lkb->lkb_lvbptr)
2032 		return;
2033 
2034 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2035 		return;
2036 
2037 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
2038 	if (b == 1) {
2039 		int len = receive_extralen(ms);
2040 		if (len > r->res_ls->ls_lvblen)
2041 			len = r->res_ls->ls_lvblen;
2042 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2043 		lkb->lkb_lvbseq = ms->m_lvbseq;
2044 	}
2045 }
2046 
2047 /* Manipulate lkb's on rsb's convert/granted/waiting queues
2048    remove_lock -- used for unlock, removes lkb from granted
2049    revert_lock -- used for cancel, moves lkb from convert to granted
2050    grant_lock  -- used for request and convert, adds lkb to granted or
2051                   moves lkb from convert or waiting to granted
2052 
2053    Each of these is used for master or local copy lkb's.  There is
2054    also a _pc() variation used to make the corresponding change on
2055    a process copy (pc) lkb. */
2056 
2057 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2058 {
2059 	del_lkb(r, lkb);
2060 	lkb->lkb_grmode = DLM_LOCK_IV;
2061 	/* this unhold undoes the original ref from create_lkb()
2062 	   so this leads to the lkb being freed */
2063 	unhold_lkb(lkb);
2064 }
2065 
2066 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2067 {
2068 	set_lvb_unlock(r, lkb);
2069 	_remove_lock(r, lkb);
2070 }
2071 
2072 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2073 {
2074 	_remove_lock(r, lkb);
2075 }
2076 
2077 /* returns: 0 did nothing
2078 	    1 moved lock to granted
2079 	   -1 removed lock */
2080 
2081 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2082 {
2083 	int rv = 0;
2084 
2085 	lkb->lkb_rqmode = DLM_LOCK_IV;
2086 
2087 	switch (lkb->lkb_status) {
2088 	case DLM_LKSTS_GRANTED:
2089 		break;
2090 	case DLM_LKSTS_CONVERT:
2091 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2092 		rv = 1;
2093 		break;
2094 	case DLM_LKSTS_WAITING:
2095 		del_lkb(r, lkb);
2096 		lkb->lkb_grmode = DLM_LOCK_IV;
2097 		/* this unhold undoes the original ref from create_lkb()
2098 		   so this leads to the lkb being freed */
2099 		unhold_lkb(lkb);
2100 		rv = -1;
2101 		break;
2102 	default:
2103 		log_print("invalid status for revert %d", lkb->lkb_status);
2104 	}
2105 	return rv;
2106 }
2107 
2108 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2109 {
2110 	return revert_lock(r, lkb);
2111 }
2112 
2113 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2114 {
2115 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2116 		lkb->lkb_grmode = lkb->lkb_rqmode;
2117 		if (lkb->lkb_status)
2118 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2119 		else
2120 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2121 	}
2122 
2123 	lkb->lkb_rqmode = DLM_LOCK_IV;
2124 	lkb->lkb_highbast = 0;
2125 }
2126 
2127 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2128 {
2129 	set_lvb_lock(r, lkb);
2130 	_grant_lock(r, lkb);
2131 }
2132 
2133 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2134 			  struct dlm_message *ms)
2135 {
2136 	set_lvb_lock_pc(r, lkb, ms);
2137 	_grant_lock(r, lkb);
2138 }
2139 
2140 /* called by grant_pending_locks() which means an async grant message must
2141    be sent to the requesting node in addition to granting the lock if the
2142    lkb belongs to a remote node. */
2143 
2144 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2145 {
2146 	grant_lock(r, lkb);
2147 	if (is_master_copy(lkb))
2148 		send_grant(r, lkb);
2149 	else
2150 		queue_cast(r, lkb, 0);
2151 }
2152 
2153 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2154    change the granted/requested modes.  We're munging things accordingly in
2155    the process copy.
2156    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2157    conversion deadlock
2158    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2159    compatible with other granted locks */
2160 
2161 static void munge_demoted(struct dlm_lkb *lkb)
2162 {
2163 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2164 		log_print("munge_demoted %x invalid modes gr %d rq %d",
2165 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2166 		return;
2167 	}
2168 
2169 	lkb->lkb_grmode = DLM_LOCK_NL;
2170 }
2171 
2172 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
2173 {
2174 	if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
2175 	    ms->m_type != DLM_MSG_GRANT) {
2176 		log_print("munge_altmode %x invalid reply type %d",
2177 			  lkb->lkb_id, ms->m_type);
2178 		return;
2179 	}
2180 
2181 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2182 		lkb->lkb_rqmode = DLM_LOCK_PR;
2183 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2184 		lkb->lkb_rqmode = DLM_LOCK_CW;
2185 	else {
2186 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2187 		dlm_print_lkb(lkb);
2188 	}
2189 }
2190 
2191 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2192 {
2193 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2194 					   lkb_statequeue);
2195 	if (lkb->lkb_id == first->lkb_id)
2196 		return 1;
2197 
2198 	return 0;
2199 }
2200 
2201 /* Check if the given lkb conflicts with another lkb on the queue. */
2202 
2203 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2204 {
2205 	struct dlm_lkb *this;
2206 
2207 	list_for_each_entry(this, head, lkb_statequeue) {
2208 		if (this == lkb)
2209 			continue;
2210 		if (!modes_compat(this, lkb))
2211 			return 1;
2212 	}
2213 	return 0;
2214 }
2215 
2216 /*
2217  * "A conversion deadlock arises with a pair of lock requests in the converting
2218  * queue for one resource.  The granted mode of each lock blocks the requested
2219  * mode of the other lock."
2220  *
2221  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2222  * convert queue from being granted, then deadlk/demote lkb.
2223  *
2224  * Example:
2225  * Granted Queue: empty
2226  * Convert Queue: NL->EX (first lock)
2227  *                PR->EX (second lock)
2228  *
2229  * The first lock can't be granted because of the granted mode of the second
2230  * lock and the second lock can't be granted because it's not first in the
2231  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2232  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2233  * flag set and return DEMOTED in the lksb flags.
2234  *
2235  * Originally, this function detected conv-deadlk in a more limited scope:
2236  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2237  * - if lkb1 was the first entry in the queue (not just earlier), and was
2238  *   blocked by the granted mode of lkb2, and there was nothing on the
2239  *   granted queue preventing lkb1 from being granted immediately, i.e.
2240  *   lkb2 was the only thing preventing lkb1 from being granted.
2241  *
2242  * That second condition meant we'd only say there was conv-deadlk if
2243  * resolving it (by demotion) would lead to the first lock on the convert
2244  * queue being granted right away.  It allowed conversion deadlocks to exist
2245  * between locks on the convert queue while they couldn't be granted anyway.
2246  *
2247  * Now, we detect and take action on conversion deadlocks immediately when
2248  * they're created, even if they may not be immediately consequential.  If
2249  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2250  * mode that would prevent lkb1's conversion from being granted, we do a
2251  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2252  * I think this means that the lkb_is_ahead condition below should always
2253  * be zero, i.e. there will never be conv-deadlk between two locks that are
2254  * both already on the convert queue.
2255  */
2256 
2257 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2258 {
2259 	struct dlm_lkb *lkb1;
2260 	int lkb_is_ahead = 0;
2261 
2262 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2263 		if (lkb1 == lkb2) {
2264 			lkb_is_ahead = 1;
2265 			continue;
2266 		}
2267 
2268 		if (!lkb_is_ahead) {
2269 			if (!modes_compat(lkb2, lkb1))
2270 				return 1;
2271 		} else {
2272 			if (!modes_compat(lkb2, lkb1) &&
2273 			    !modes_compat(lkb1, lkb2))
2274 				return 1;
2275 		}
2276 	}
2277 	return 0;
2278 }
2279 
2280 /*
2281  * Return 1 if the lock can be granted, 0 otherwise.
2282  * Also detect and resolve conversion deadlocks.
2283  *
2284  * lkb is the lock to be granted
2285  *
2286  * now is 1 if the function is being called in the context of the
2287  * immediate request, it is 0 if called later, after the lock has been
2288  * queued.
2289  *
2290  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2291  * after recovery.
2292  *
2293  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2294  */
2295 
2296 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2297 			   int recover)
2298 {
2299 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2300 
2301 	/*
2302 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2303 	 * a new request for a NL mode lock being blocked.
2304 	 *
2305 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2306 	 * request, then it would be granted.  In essence, the use of this flag
2307 	 * tells the Lock Manager to expedite theis request by not considering
2308 	 * what may be in the CONVERTING or WAITING queues...  As of this
2309 	 * writing, the EXPEDITE flag can be used only with new requests for NL
2310 	 * mode locks.  This flag is not valid for conversion requests.
2311 	 *
2312 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2313 	 * conversion or used with a non-NL requested mode.  We also know an
2314 	 * EXPEDITE request is always granted immediately, so now must always
2315 	 * be 1.  The full condition to grant an expedite request: (now &&
2316 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2317 	 * therefore be shortened to just checking the flag.
2318 	 */
2319 
2320 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2321 		return 1;
2322 
2323 	/*
2324 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2325 	 * added to the remaining conditions.
2326 	 */
2327 
2328 	if (queue_conflict(&r->res_grantqueue, lkb))
2329 		return 0;
2330 
2331 	/*
2332 	 * 6-3: By default, a conversion request is immediately granted if the
2333 	 * requested mode is compatible with the modes of all other granted
2334 	 * locks
2335 	 */
2336 
2337 	if (queue_conflict(&r->res_convertqueue, lkb))
2338 		return 0;
2339 
2340 	/*
2341 	 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2342 	 * locks for a recovered rsb, on which lkb's have been rebuilt.
2343 	 * The lkb's may have been rebuilt on the queues in a different
2344 	 * order than they were in on the previous master.  So, granting
2345 	 * queued conversions in order after recovery doesn't make sense
2346 	 * since the order hasn't been preserved anyway.  The new order
2347 	 * could also have created a new "in place" conversion deadlock.
2348 	 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2349 	 * After recovery, there would be no granted locks, and possibly
2350 	 * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2351 	 * recovery, grant conversions without considering order.
2352 	 */
2353 
2354 	if (conv && recover)
2355 		return 1;
2356 
2357 	/*
2358 	 * 6-5: But the default algorithm for deciding whether to grant or
2359 	 * queue conversion requests does not by itself guarantee that such
2360 	 * requests are serviced on a "first come first serve" basis.  This, in
2361 	 * turn, can lead to a phenomenon known as "indefinate postponement".
2362 	 *
2363 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2364 	 * the system service employed to request a lock conversion.  This flag
2365 	 * forces certain conversion requests to be queued, even if they are
2366 	 * compatible with the granted modes of other locks on the same
2367 	 * resource.  Thus, the use of this flag results in conversion requests
2368 	 * being ordered on a "first come first servce" basis.
2369 	 *
2370 	 * DCT: This condition is all about new conversions being able to occur
2371 	 * "in place" while the lock remains on the granted queue (assuming
2372 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2373 	 * doesn't _have_ to go onto the convert queue where it's processed in
2374 	 * order.  The "now" variable is necessary to distinguish converts
2375 	 * being received and processed for the first time now, because once a
2376 	 * convert is moved to the conversion queue the condition below applies
2377 	 * requiring fifo granting.
2378 	 */
2379 
2380 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2381 		return 1;
2382 
2383 	/*
2384 	 * Even if the convert is compat with all granted locks,
2385 	 * QUECVT forces it behind other locks on the convert queue.
2386 	 */
2387 
2388 	if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2389 		if (list_empty(&r->res_convertqueue))
2390 			return 1;
2391 		else
2392 			return 0;
2393 	}
2394 
2395 	/*
2396 	 * The NOORDER flag is set to avoid the standard vms rules on grant
2397 	 * order.
2398 	 */
2399 
2400 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2401 		return 1;
2402 
2403 	/*
2404 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2405 	 * granted until all other conversion requests ahead of it are granted
2406 	 * and/or canceled.
2407 	 */
2408 
2409 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2410 		return 1;
2411 
2412 	/*
2413 	 * 6-4: By default, a new request is immediately granted only if all
2414 	 * three of the following conditions are satisfied when the request is
2415 	 * issued:
2416 	 * - The queue of ungranted conversion requests for the resource is
2417 	 *   empty.
2418 	 * - The queue of ungranted new requests for the resource is empty.
2419 	 * - The mode of the new request is compatible with the most
2420 	 *   restrictive mode of all granted locks on the resource.
2421 	 */
2422 
2423 	if (now && !conv && list_empty(&r->res_convertqueue) &&
2424 	    list_empty(&r->res_waitqueue))
2425 		return 1;
2426 
2427 	/*
2428 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
2429 	 * it cannot be granted until the queue of ungranted conversion
2430 	 * requests is empty, all ungranted new requests ahead of it are
2431 	 * granted and/or canceled, and it is compatible with the granted mode
2432 	 * of the most restrictive lock granted on the resource.
2433 	 */
2434 
2435 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
2436 	    first_in_list(lkb, &r->res_waitqueue))
2437 		return 1;
2438 
2439 	return 0;
2440 }
2441 
2442 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2443 			  int recover, int *err)
2444 {
2445 	int rv;
2446 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2447 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2448 
2449 	if (err)
2450 		*err = 0;
2451 
2452 	rv = _can_be_granted(r, lkb, now, recover);
2453 	if (rv)
2454 		goto out;
2455 
2456 	/*
2457 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2458 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2459 	 * cancels one of the locks.
2460 	 */
2461 
2462 	if (is_convert && can_be_queued(lkb) &&
2463 	    conversion_deadlock_detect(r, lkb)) {
2464 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2465 			lkb->lkb_grmode = DLM_LOCK_NL;
2466 			lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
2467 		} else if (err) {
2468 			*err = -EDEADLK;
2469 		} else {
2470 			log_print("can_be_granted deadlock %x now %d",
2471 				  lkb->lkb_id, now);
2472 			dlm_dump_rsb(r);
2473 		}
2474 		goto out;
2475 	}
2476 
2477 	/*
2478 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2479 	 * to grant a request in a mode other than the normal rqmode.  It's a
2480 	 * simple way to provide a big optimization to applications that can
2481 	 * use them.
2482 	 */
2483 
2484 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2485 		alt = DLM_LOCK_PR;
2486 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2487 		alt = DLM_LOCK_CW;
2488 
2489 	if (alt) {
2490 		lkb->lkb_rqmode = alt;
2491 		rv = _can_be_granted(r, lkb, now, 0);
2492 		if (rv)
2493 			lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
2494 		else
2495 			lkb->lkb_rqmode = rqmode;
2496 	}
2497  out:
2498 	return rv;
2499 }
2500 
2501 /* Returns the highest requested mode of all blocked conversions; sets
2502    cw if there's a blocked conversion to DLM_LOCK_CW. */
2503 
2504 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2505 				 unsigned int *count)
2506 {
2507 	struct dlm_lkb *lkb, *s;
2508 	int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2509 	int hi, demoted, quit, grant_restart, demote_restart;
2510 	int deadlk;
2511 
2512 	quit = 0;
2513  restart:
2514 	grant_restart = 0;
2515 	demote_restart = 0;
2516 	hi = DLM_LOCK_IV;
2517 
2518 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2519 		demoted = is_demoted(lkb);
2520 		deadlk = 0;
2521 
2522 		if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2523 			grant_lock_pending(r, lkb);
2524 			grant_restart = 1;
2525 			if (count)
2526 				(*count)++;
2527 			continue;
2528 		}
2529 
2530 		if (!demoted && is_demoted(lkb)) {
2531 			log_print("WARN: pending demoted %x node %d %s",
2532 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2533 			demote_restart = 1;
2534 			continue;
2535 		}
2536 
2537 		if (deadlk) {
2538 			/*
2539 			 * If DLM_LKB_NODLKWT flag is set and conversion
2540 			 * deadlock is detected, we request blocking AST and
2541 			 * down (or cancel) conversion.
2542 			 */
2543 			if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2544 				if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2545 					queue_bast(r, lkb, lkb->lkb_rqmode);
2546 					lkb->lkb_highbast = lkb->lkb_rqmode;
2547 				}
2548 			} else {
2549 				log_print("WARN: pending deadlock %x node %d %s",
2550 					  lkb->lkb_id, lkb->lkb_nodeid,
2551 					  r->res_name);
2552 				dlm_dump_rsb(r);
2553 			}
2554 			continue;
2555 		}
2556 
2557 		hi = max_t(int, lkb->lkb_rqmode, hi);
2558 
2559 		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2560 			*cw = 1;
2561 	}
2562 
2563 	if (grant_restart)
2564 		goto restart;
2565 	if (demote_restart && !quit) {
2566 		quit = 1;
2567 		goto restart;
2568 	}
2569 
2570 	return max_t(int, high, hi);
2571 }
2572 
2573 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2574 			      unsigned int *count)
2575 {
2576 	struct dlm_lkb *lkb, *s;
2577 
2578 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2579 		if (can_be_granted(r, lkb, 0, 0, NULL)) {
2580 			grant_lock_pending(r, lkb);
2581 			if (count)
2582 				(*count)++;
2583 		} else {
2584 			high = max_t(int, lkb->lkb_rqmode, high);
2585 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
2586 				*cw = 1;
2587 		}
2588 	}
2589 
2590 	return high;
2591 }
2592 
2593 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2594    on either the convert or waiting queue.
2595    high is the largest rqmode of all locks blocked on the convert or
2596    waiting queue. */
2597 
2598 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2599 {
2600 	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2601 		if (gr->lkb_highbast < DLM_LOCK_EX)
2602 			return 1;
2603 		return 0;
2604 	}
2605 
2606 	if (gr->lkb_highbast < high &&
2607 	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2608 		return 1;
2609 	return 0;
2610 }
2611 
2612 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2613 {
2614 	struct dlm_lkb *lkb, *s;
2615 	int high = DLM_LOCK_IV;
2616 	int cw = 0;
2617 
2618 	if (!is_master(r)) {
2619 		log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2620 		dlm_dump_rsb(r);
2621 		return;
2622 	}
2623 
2624 	high = grant_pending_convert(r, high, &cw, count);
2625 	high = grant_pending_wait(r, high, &cw, count);
2626 
2627 	if (high == DLM_LOCK_IV)
2628 		return;
2629 
2630 	/*
2631 	 * If there are locks left on the wait/convert queue then send blocking
2632 	 * ASTs to granted locks based on the largest requested mode (high)
2633 	 * found above.
2634 	 */
2635 
2636 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2637 		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2638 			if (cw && high == DLM_LOCK_PR &&
2639 			    lkb->lkb_grmode == DLM_LOCK_PR)
2640 				queue_bast(r, lkb, DLM_LOCK_CW);
2641 			else
2642 				queue_bast(r, lkb, high);
2643 			lkb->lkb_highbast = high;
2644 		}
2645 	}
2646 }
2647 
2648 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2649 {
2650 	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2651 	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2652 		if (gr->lkb_highbast < DLM_LOCK_EX)
2653 			return 1;
2654 		return 0;
2655 	}
2656 
2657 	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2658 		return 1;
2659 	return 0;
2660 }
2661 
2662 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2663 			    struct dlm_lkb *lkb)
2664 {
2665 	struct dlm_lkb *gr;
2666 
2667 	list_for_each_entry(gr, head, lkb_statequeue) {
2668 		/* skip self when sending basts to convertqueue */
2669 		if (gr == lkb)
2670 			continue;
2671 		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2672 			queue_bast(r, gr, lkb->lkb_rqmode);
2673 			gr->lkb_highbast = lkb->lkb_rqmode;
2674 		}
2675 	}
2676 }
2677 
2678 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2679 {
2680 	send_bast_queue(r, &r->res_grantqueue, lkb);
2681 }
2682 
2683 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2684 {
2685 	send_bast_queue(r, &r->res_grantqueue, lkb);
2686 	send_bast_queue(r, &r->res_convertqueue, lkb);
2687 }
2688 
2689 /* set_master(r, lkb) -- set the master nodeid of a resource
2690 
2691    The purpose of this function is to set the nodeid field in the given
2692    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2693    known, it can just be copied to the lkb and the function will return
2694    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2695    before it can be copied to the lkb.
2696 
2697    When the rsb nodeid is being looked up remotely, the initial lkb
2698    causing the lookup is kept on the ls_waiters list waiting for the
2699    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2700    on the rsb's res_lookup list until the master is verified.
2701 
2702    Return values:
2703    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2704    1: the rsb master is not available and the lkb has been placed on
2705       a wait queue
2706 */
2707 
2708 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2709 {
2710 	int our_nodeid = dlm_our_nodeid();
2711 
2712 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2713 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2714 		r->res_first_lkid = lkb->lkb_id;
2715 		lkb->lkb_nodeid = r->res_nodeid;
2716 		return 0;
2717 	}
2718 
2719 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2720 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2721 		return 1;
2722 	}
2723 
2724 	if (r->res_master_nodeid == our_nodeid) {
2725 		lkb->lkb_nodeid = 0;
2726 		return 0;
2727 	}
2728 
2729 	if (r->res_master_nodeid) {
2730 		lkb->lkb_nodeid = r->res_master_nodeid;
2731 		return 0;
2732 	}
2733 
2734 	if (dlm_dir_nodeid(r) == our_nodeid) {
2735 		/* This is a somewhat unusual case; find_rsb will usually
2736 		   have set res_master_nodeid when dir nodeid is local, but
2737 		   there are cases where we become the dir node after we've
2738 		   past find_rsb and go through _request_lock again.
2739 		   confirm_master() or process_lookup_list() needs to be
2740 		   called after this. */
2741 		log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2742 			  lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2743 			  r->res_name);
2744 		r->res_master_nodeid = our_nodeid;
2745 		r->res_nodeid = 0;
2746 		lkb->lkb_nodeid = 0;
2747 		return 0;
2748 	}
2749 
2750 	wait_pending_remove(r);
2751 
2752 	r->res_first_lkid = lkb->lkb_id;
2753 	send_lookup(r, lkb);
2754 	return 1;
2755 }
2756 
2757 static void process_lookup_list(struct dlm_rsb *r)
2758 {
2759 	struct dlm_lkb *lkb, *safe;
2760 
2761 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2762 		list_del_init(&lkb->lkb_rsb_lookup);
2763 		_request_lock(r, lkb);
2764 		schedule();
2765 	}
2766 }
2767 
2768 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2769 
2770 static void confirm_master(struct dlm_rsb *r, int error)
2771 {
2772 	struct dlm_lkb *lkb;
2773 
2774 	if (!r->res_first_lkid)
2775 		return;
2776 
2777 	switch (error) {
2778 	case 0:
2779 	case -EINPROGRESS:
2780 		r->res_first_lkid = 0;
2781 		process_lookup_list(r);
2782 		break;
2783 
2784 	case -EAGAIN:
2785 	case -EBADR:
2786 	case -ENOTBLK:
2787 		/* the remote request failed and won't be retried (it was
2788 		   a NOQUEUE, or has been canceled/unlocked); make a waiting
2789 		   lkb the first_lkid */
2790 
2791 		r->res_first_lkid = 0;
2792 
2793 		if (!list_empty(&r->res_lookup)) {
2794 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2795 					 lkb_rsb_lookup);
2796 			list_del_init(&lkb->lkb_rsb_lookup);
2797 			r->res_first_lkid = lkb->lkb_id;
2798 			_request_lock(r, lkb);
2799 		}
2800 		break;
2801 
2802 	default:
2803 		log_error(r->res_ls, "confirm_master unknown error %d", error);
2804 	}
2805 }
2806 
2807 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2808 			 int namelen, unsigned long timeout_cs,
2809 			 void (*ast) (void *astparam),
2810 			 void *astparam,
2811 			 void (*bast) (void *astparam, int mode),
2812 			 struct dlm_args *args)
2813 {
2814 	int rv = -EINVAL;
2815 
2816 	/* check for invalid arg usage */
2817 
2818 	if (mode < 0 || mode > DLM_LOCK_EX)
2819 		goto out;
2820 
2821 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2822 		goto out;
2823 
2824 	if (flags & DLM_LKF_CANCEL)
2825 		goto out;
2826 
2827 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2828 		goto out;
2829 
2830 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2831 		goto out;
2832 
2833 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2834 		goto out;
2835 
2836 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2837 		goto out;
2838 
2839 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2840 		goto out;
2841 
2842 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2843 		goto out;
2844 
2845 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2846 		goto out;
2847 
2848 	if (!ast || !lksb)
2849 		goto out;
2850 
2851 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2852 		goto out;
2853 
2854 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2855 		goto out;
2856 
2857 	/* these args will be copied to the lkb in validate_lock_args,
2858 	   it cannot be done now because when converting locks, fields in
2859 	   an active lkb cannot be modified before locking the rsb */
2860 
2861 	args->flags = flags;
2862 	args->astfn = ast;
2863 	args->astparam = astparam;
2864 	args->bastfn = bast;
2865 	args->timeout = timeout_cs;
2866 	args->mode = mode;
2867 	args->lksb = lksb;
2868 	rv = 0;
2869  out:
2870 	return rv;
2871 }
2872 
2873 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2874 {
2875 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2876  		      DLM_LKF_FORCEUNLOCK))
2877 		return -EINVAL;
2878 
2879 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2880 		return -EINVAL;
2881 
2882 	args->flags = flags;
2883 	args->astparam = astarg;
2884 	return 0;
2885 }
2886 
2887 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2888 			      struct dlm_args *args)
2889 {
2890 	int rv = -EINVAL;
2891 
2892 	if (args->flags & DLM_LKF_CONVERT) {
2893 		if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2894 			goto out;
2895 
2896 		if (args->flags & DLM_LKF_QUECVT &&
2897 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2898 			goto out;
2899 
2900 		rv = -EBUSY;
2901 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2902 			goto out;
2903 
2904 		if (lkb->lkb_wait_type)
2905 			goto out;
2906 
2907 		if (is_overlap(lkb))
2908 			goto out;
2909 	}
2910 
2911 	lkb->lkb_exflags = args->flags;
2912 	lkb->lkb_sbflags = 0;
2913 	lkb->lkb_astfn = args->astfn;
2914 	lkb->lkb_astparam = args->astparam;
2915 	lkb->lkb_bastfn = args->bastfn;
2916 	lkb->lkb_rqmode = args->mode;
2917 	lkb->lkb_lksb = args->lksb;
2918 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2919 	lkb->lkb_ownpid = (int) current->pid;
2920 	lkb->lkb_timeout_cs = args->timeout;
2921 	rv = 0;
2922  out:
2923 	if (rv)
2924 		log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2925 			  rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2926 			  lkb->lkb_status, lkb->lkb_wait_type,
2927 			  lkb->lkb_resource->res_name);
2928 	return rv;
2929 }
2930 
2931 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2932    for success */
2933 
2934 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2935    because there may be a lookup in progress and it's valid to do
2936    cancel/unlockf on it */
2937 
2938 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2939 {
2940 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2941 	int rv = -EINVAL;
2942 
2943 	if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2944 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2945 		dlm_print_lkb(lkb);
2946 		goto out;
2947 	}
2948 
2949 	/* an lkb may still exist even though the lock is EOL'ed due to a
2950 	   cancel, unlock or failed noqueue request; an app can't use these
2951 	   locks; return same error as if the lkid had not been found at all */
2952 
2953 	if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2954 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2955 		rv = -ENOENT;
2956 		goto out;
2957 	}
2958 
2959 	/* an lkb may be waiting for an rsb lookup to complete where the
2960 	   lookup was initiated by another lock */
2961 
2962 	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2963 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2964 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2965 			list_del_init(&lkb->lkb_rsb_lookup);
2966 			queue_cast(lkb->lkb_resource, lkb,
2967 				   args->flags & DLM_LKF_CANCEL ?
2968 				   -DLM_ECANCEL : -DLM_EUNLOCK);
2969 			unhold_lkb(lkb); /* undoes create_lkb() */
2970 		}
2971 		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2972 		rv = -EBUSY;
2973 		goto out;
2974 	}
2975 
2976 	/* cancel not allowed with another cancel/unlock in progress */
2977 
2978 	if (args->flags & DLM_LKF_CANCEL) {
2979 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2980 			goto out;
2981 
2982 		if (is_overlap(lkb))
2983 			goto out;
2984 
2985 		/* don't let scand try to do a cancel */
2986 		del_timeout(lkb);
2987 
2988 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2989 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2990 			rv = -EBUSY;
2991 			goto out;
2992 		}
2993 
2994 		/* there's nothing to cancel */
2995 		if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2996 		    !lkb->lkb_wait_type) {
2997 			rv = -EBUSY;
2998 			goto out;
2999 		}
3000 
3001 		switch (lkb->lkb_wait_type) {
3002 		case DLM_MSG_LOOKUP:
3003 		case DLM_MSG_REQUEST:
3004 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
3005 			rv = -EBUSY;
3006 			goto out;
3007 		case DLM_MSG_UNLOCK:
3008 		case DLM_MSG_CANCEL:
3009 			goto out;
3010 		}
3011 		/* add_to_waiters() will set OVERLAP_CANCEL */
3012 		goto out_ok;
3013 	}
3014 
3015 	/* do we need to allow a force-unlock if there's a normal unlock
3016 	   already in progress?  in what conditions could the normal unlock
3017 	   fail such that we'd want to send a force-unlock to be sure? */
3018 
3019 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
3020 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
3021 			goto out;
3022 
3023 		if (is_overlap_unlock(lkb))
3024 			goto out;
3025 
3026 		/* don't let scand try to do a cancel */
3027 		del_timeout(lkb);
3028 
3029 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
3030 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3031 			rv = -EBUSY;
3032 			goto out;
3033 		}
3034 
3035 		switch (lkb->lkb_wait_type) {
3036 		case DLM_MSG_LOOKUP:
3037 		case DLM_MSG_REQUEST:
3038 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3039 			rv = -EBUSY;
3040 			goto out;
3041 		case DLM_MSG_UNLOCK:
3042 			goto out;
3043 		}
3044 		/* add_to_waiters() will set OVERLAP_UNLOCK */
3045 		goto out_ok;
3046 	}
3047 
3048 	/* normal unlock not allowed if there's any op in progress */
3049 	rv = -EBUSY;
3050 	if (lkb->lkb_wait_type || lkb->lkb_wait_count)
3051 		goto out;
3052 
3053  out_ok:
3054 	/* an overlapping op shouldn't blow away exflags from other op */
3055 	lkb->lkb_exflags |= args->flags;
3056 	lkb->lkb_sbflags = 0;
3057 	lkb->lkb_astparam = args->astparam;
3058 	rv = 0;
3059  out:
3060 	if (rv)
3061 		log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
3062 			  lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
3063 			  args->flags, lkb->lkb_wait_type,
3064 			  lkb->lkb_resource->res_name);
3065 	return rv;
3066 }
3067 
3068 /*
3069  * Four stage 4 varieties:
3070  * do_request(), do_convert(), do_unlock(), do_cancel()
3071  * These are called on the master node for the given lock and
3072  * from the central locking logic.
3073  */
3074 
3075 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3076 {
3077 	int error = 0;
3078 
3079 	if (can_be_granted(r, lkb, 1, 0, NULL)) {
3080 		grant_lock(r, lkb);
3081 		queue_cast(r, lkb, 0);
3082 		goto out;
3083 	}
3084 
3085 	if (can_be_queued(lkb)) {
3086 		error = -EINPROGRESS;
3087 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
3088 		add_timeout(lkb);
3089 		goto out;
3090 	}
3091 
3092 	error = -EAGAIN;
3093 	queue_cast(r, lkb, -EAGAIN);
3094  out:
3095 	return error;
3096 }
3097 
3098 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3099 			       int error)
3100 {
3101 	switch (error) {
3102 	case -EAGAIN:
3103 		if (force_blocking_asts(lkb))
3104 			send_blocking_asts_all(r, lkb);
3105 		break;
3106 	case -EINPROGRESS:
3107 		send_blocking_asts(r, lkb);
3108 		break;
3109 	}
3110 }
3111 
3112 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3113 {
3114 	int error = 0;
3115 	int deadlk = 0;
3116 
3117 	/* changing an existing lock may allow others to be granted */
3118 
3119 	if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3120 		grant_lock(r, lkb);
3121 		queue_cast(r, lkb, 0);
3122 		goto out;
3123 	}
3124 
3125 	/* can_be_granted() detected that this lock would block in a conversion
3126 	   deadlock, so we leave it on the granted queue and return EDEADLK in
3127 	   the ast for the convert. */
3128 
3129 	if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3130 		/* it's left on the granted queue */
3131 		revert_lock(r, lkb);
3132 		queue_cast(r, lkb, -EDEADLK);
3133 		error = -EDEADLK;
3134 		goto out;
3135 	}
3136 
3137 	/* is_demoted() means the can_be_granted() above set the grmode
3138 	   to NL, and left us on the granted queue.  This auto-demotion
3139 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
3140 	   now grantable.  We have to try to grant other converting locks
3141 	   before we try again to grant this one. */
3142 
3143 	if (is_demoted(lkb)) {
3144 		grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3145 		if (_can_be_granted(r, lkb, 1, 0)) {
3146 			grant_lock(r, lkb);
3147 			queue_cast(r, lkb, 0);
3148 			goto out;
3149 		}
3150 		/* else fall through and move to convert queue */
3151 	}
3152 
3153 	if (can_be_queued(lkb)) {
3154 		error = -EINPROGRESS;
3155 		del_lkb(r, lkb);
3156 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3157 		add_timeout(lkb);
3158 		goto out;
3159 	}
3160 
3161 	error = -EAGAIN;
3162 	queue_cast(r, lkb, -EAGAIN);
3163  out:
3164 	return error;
3165 }
3166 
3167 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3168 			       int error)
3169 {
3170 	switch (error) {
3171 	case 0:
3172 		grant_pending_locks(r, NULL);
3173 		/* grant_pending_locks also sends basts */
3174 		break;
3175 	case -EAGAIN:
3176 		if (force_blocking_asts(lkb))
3177 			send_blocking_asts_all(r, lkb);
3178 		break;
3179 	case -EINPROGRESS:
3180 		send_blocking_asts(r, lkb);
3181 		break;
3182 	}
3183 }
3184 
3185 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3186 {
3187 	remove_lock(r, lkb);
3188 	queue_cast(r, lkb, -DLM_EUNLOCK);
3189 	return -DLM_EUNLOCK;
3190 }
3191 
3192 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3193 			      int error)
3194 {
3195 	grant_pending_locks(r, NULL);
3196 }
3197 
3198 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3199 
3200 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3201 {
3202 	int error;
3203 
3204 	error = revert_lock(r, lkb);
3205 	if (error) {
3206 		queue_cast(r, lkb, -DLM_ECANCEL);
3207 		return -DLM_ECANCEL;
3208 	}
3209 	return 0;
3210 }
3211 
3212 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3213 			      int error)
3214 {
3215 	if (error)
3216 		grant_pending_locks(r, NULL);
3217 }
3218 
3219 /*
3220  * Four stage 3 varieties:
3221  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3222  */
3223 
3224 /* add a new lkb to a possibly new rsb, called by requesting process */
3225 
3226 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3227 {
3228 	int error;
3229 
3230 	/* set_master: sets lkb nodeid from r */
3231 
3232 	error = set_master(r, lkb);
3233 	if (error < 0)
3234 		goto out;
3235 	if (error) {
3236 		error = 0;
3237 		goto out;
3238 	}
3239 
3240 	if (is_remote(r)) {
3241 		/* receive_request() calls do_request() on remote node */
3242 		error = send_request(r, lkb);
3243 	} else {
3244 		error = do_request(r, lkb);
3245 		/* for remote locks the request_reply is sent
3246 		   between do_request and do_request_effects */
3247 		do_request_effects(r, lkb, error);
3248 	}
3249  out:
3250 	return error;
3251 }
3252 
3253 /* change some property of an existing lkb, e.g. mode */
3254 
3255 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3256 {
3257 	int error;
3258 
3259 	if (is_remote(r)) {
3260 		/* receive_convert() calls do_convert() on remote node */
3261 		error = send_convert(r, lkb);
3262 	} else {
3263 		error = do_convert(r, lkb);
3264 		/* for remote locks the convert_reply is sent
3265 		   between do_convert and do_convert_effects */
3266 		do_convert_effects(r, lkb, error);
3267 	}
3268 
3269 	return error;
3270 }
3271 
3272 /* remove an existing lkb from the granted queue */
3273 
3274 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3275 {
3276 	int error;
3277 
3278 	if (is_remote(r)) {
3279 		/* receive_unlock() calls do_unlock() on remote node */
3280 		error = send_unlock(r, lkb);
3281 	} else {
3282 		error = do_unlock(r, lkb);
3283 		/* for remote locks the unlock_reply is sent
3284 		   between do_unlock and do_unlock_effects */
3285 		do_unlock_effects(r, lkb, error);
3286 	}
3287 
3288 	return error;
3289 }
3290 
3291 /* remove an existing lkb from the convert or wait queue */
3292 
3293 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3294 {
3295 	int error;
3296 
3297 	if (is_remote(r)) {
3298 		/* receive_cancel() calls do_cancel() on remote node */
3299 		error = send_cancel(r, lkb);
3300 	} else {
3301 		error = do_cancel(r, lkb);
3302 		/* for remote locks the cancel_reply is sent
3303 		   between do_cancel and do_cancel_effects */
3304 		do_cancel_effects(r, lkb, error);
3305 	}
3306 
3307 	return error;
3308 }
3309 
3310 /*
3311  * Four stage 2 varieties:
3312  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3313  */
3314 
3315 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3316 			int len, struct dlm_args *args)
3317 {
3318 	struct dlm_rsb *r;
3319 	int error;
3320 
3321 	error = validate_lock_args(ls, lkb, args);
3322 	if (error)
3323 		return error;
3324 
3325 	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3326 	if (error)
3327 		return error;
3328 
3329 	lock_rsb(r);
3330 
3331 	attach_lkb(r, lkb);
3332 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3333 
3334 	error = _request_lock(r, lkb);
3335 
3336 	unlock_rsb(r);
3337 	put_rsb(r);
3338 	return error;
3339 }
3340 
3341 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3342 			struct dlm_args *args)
3343 {
3344 	struct dlm_rsb *r;
3345 	int error;
3346 
3347 	r = lkb->lkb_resource;
3348 
3349 	hold_rsb(r);
3350 	lock_rsb(r);
3351 
3352 	error = validate_lock_args(ls, lkb, args);
3353 	if (error)
3354 		goto out;
3355 
3356 	error = _convert_lock(r, lkb);
3357  out:
3358 	unlock_rsb(r);
3359 	put_rsb(r);
3360 	return error;
3361 }
3362 
3363 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3364 		       struct dlm_args *args)
3365 {
3366 	struct dlm_rsb *r;
3367 	int error;
3368 
3369 	r = lkb->lkb_resource;
3370 
3371 	hold_rsb(r);
3372 	lock_rsb(r);
3373 
3374 	error = validate_unlock_args(lkb, args);
3375 	if (error)
3376 		goto out;
3377 
3378 	error = _unlock_lock(r, lkb);
3379  out:
3380 	unlock_rsb(r);
3381 	put_rsb(r);
3382 	return error;
3383 }
3384 
3385 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3386 		       struct dlm_args *args)
3387 {
3388 	struct dlm_rsb *r;
3389 	int error;
3390 
3391 	r = lkb->lkb_resource;
3392 
3393 	hold_rsb(r);
3394 	lock_rsb(r);
3395 
3396 	error = validate_unlock_args(lkb, args);
3397 	if (error)
3398 		goto out;
3399 
3400 	error = _cancel_lock(r, lkb);
3401  out:
3402 	unlock_rsb(r);
3403 	put_rsb(r);
3404 	return error;
3405 }
3406 
3407 /*
3408  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3409  */
3410 
3411 int dlm_lock(dlm_lockspace_t *lockspace,
3412 	     int mode,
3413 	     struct dlm_lksb *lksb,
3414 	     uint32_t flags,
3415 	     void *name,
3416 	     unsigned int namelen,
3417 	     uint32_t parent_lkid,
3418 	     void (*ast) (void *astarg),
3419 	     void *astarg,
3420 	     void (*bast) (void *astarg, int mode))
3421 {
3422 	struct dlm_ls *ls;
3423 	struct dlm_lkb *lkb;
3424 	struct dlm_args args;
3425 	int error, convert = flags & DLM_LKF_CONVERT;
3426 
3427 	ls = dlm_find_lockspace_local(lockspace);
3428 	if (!ls)
3429 		return -EINVAL;
3430 
3431 	dlm_lock_recovery(ls);
3432 
3433 	if (convert)
3434 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
3435 	else
3436 		error = create_lkb(ls, &lkb);
3437 
3438 	if (error)
3439 		goto out;
3440 
3441 	error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3442 			      astarg, bast, &args);
3443 	if (error)
3444 		goto out_put;
3445 
3446 	if (convert)
3447 		error = convert_lock(ls, lkb, &args);
3448 	else
3449 		error = request_lock(ls, lkb, name, namelen, &args);
3450 
3451 	if (error == -EINPROGRESS)
3452 		error = 0;
3453  out_put:
3454 	if (convert || error)
3455 		__put_lkb(ls, lkb);
3456 	if (error == -EAGAIN || error == -EDEADLK)
3457 		error = 0;
3458  out:
3459 	dlm_unlock_recovery(ls);
3460 	dlm_put_lockspace(ls);
3461 	return error;
3462 }
3463 
3464 int dlm_unlock(dlm_lockspace_t *lockspace,
3465 	       uint32_t lkid,
3466 	       uint32_t flags,
3467 	       struct dlm_lksb *lksb,
3468 	       void *astarg)
3469 {
3470 	struct dlm_ls *ls;
3471 	struct dlm_lkb *lkb;
3472 	struct dlm_args args;
3473 	int error;
3474 
3475 	ls = dlm_find_lockspace_local(lockspace);
3476 	if (!ls)
3477 		return -EINVAL;
3478 
3479 	dlm_lock_recovery(ls);
3480 
3481 	error = find_lkb(ls, lkid, &lkb);
3482 	if (error)
3483 		goto out;
3484 
3485 	error = set_unlock_args(flags, astarg, &args);
3486 	if (error)
3487 		goto out_put;
3488 
3489 	if (flags & DLM_LKF_CANCEL)
3490 		error = cancel_lock(ls, lkb, &args);
3491 	else
3492 		error = unlock_lock(ls, lkb, &args);
3493 
3494 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3495 		error = 0;
3496 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3497 		error = 0;
3498  out_put:
3499 	dlm_put_lkb(lkb);
3500  out:
3501 	dlm_unlock_recovery(ls);
3502 	dlm_put_lockspace(ls);
3503 	return error;
3504 }
3505 
3506 /*
3507  * send/receive routines for remote operations and replies
3508  *
3509  * send_args
3510  * send_common
3511  * send_request			receive_request
3512  * send_convert			receive_convert
3513  * send_unlock			receive_unlock
3514  * send_cancel			receive_cancel
3515  * send_grant			receive_grant
3516  * send_bast			receive_bast
3517  * send_lookup			receive_lookup
3518  * send_remove			receive_remove
3519  *
3520  * 				send_common_reply
3521  * receive_request_reply	send_request_reply
3522  * receive_convert_reply	send_convert_reply
3523  * receive_unlock_reply		send_unlock_reply
3524  * receive_cancel_reply		send_cancel_reply
3525  * receive_lookup_reply		send_lookup_reply
3526  */
3527 
3528 static int _create_message(struct dlm_ls *ls, int mb_len,
3529 			   int to_nodeid, int mstype,
3530 			   struct dlm_message **ms_ret,
3531 			   struct dlm_mhandle **mh_ret)
3532 {
3533 	struct dlm_message *ms;
3534 	struct dlm_mhandle *mh;
3535 	char *mb;
3536 
3537 	/* get_buffer gives us a message handle (mh) that we need to
3538 	   pass into lowcomms_commit and a message buffer (mb) that we
3539 	   write our data into */
3540 
3541 	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
3542 	if (!mh)
3543 		return -ENOBUFS;
3544 
3545 	memset(mb, 0, mb_len);
3546 
3547 	ms = (struct dlm_message *) mb;
3548 
3549 	ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3550 	ms->m_header.h_lockspace = ls->ls_global_id;
3551 	ms->m_header.h_nodeid = dlm_our_nodeid();
3552 	ms->m_header.h_length = mb_len;
3553 	ms->m_header.h_cmd = DLM_MSG;
3554 
3555 	ms->m_type = mstype;
3556 
3557 	*mh_ret = mh;
3558 	*ms_ret = ms;
3559 	return 0;
3560 }
3561 
3562 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3563 			  int to_nodeid, int mstype,
3564 			  struct dlm_message **ms_ret,
3565 			  struct dlm_mhandle **mh_ret)
3566 {
3567 	int mb_len = sizeof(struct dlm_message);
3568 
3569 	switch (mstype) {
3570 	case DLM_MSG_REQUEST:
3571 	case DLM_MSG_LOOKUP:
3572 	case DLM_MSG_REMOVE:
3573 		mb_len += r->res_length;
3574 		break;
3575 	case DLM_MSG_CONVERT:
3576 	case DLM_MSG_UNLOCK:
3577 	case DLM_MSG_REQUEST_REPLY:
3578 	case DLM_MSG_CONVERT_REPLY:
3579 	case DLM_MSG_GRANT:
3580 		if (lkb && lkb->lkb_lvbptr)
3581 			mb_len += r->res_ls->ls_lvblen;
3582 		break;
3583 	}
3584 
3585 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3586 			       ms_ret, mh_ret);
3587 }
3588 
3589 /* further lowcomms enhancements or alternate implementations may make
3590    the return value from this function useful at some point */
3591 
3592 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
3593 {
3594 	dlm_message_out(ms);
3595 	dlm_lowcomms_commit_buffer(mh);
3596 	return 0;
3597 }
3598 
3599 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3600 		      struct dlm_message *ms)
3601 {
3602 	ms->m_nodeid   = lkb->lkb_nodeid;
3603 	ms->m_pid      = lkb->lkb_ownpid;
3604 	ms->m_lkid     = lkb->lkb_id;
3605 	ms->m_remid    = lkb->lkb_remid;
3606 	ms->m_exflags  = lkb->lkb_exflags;
3607 	ms->m_sbflags  = lkb->lkb_sbflags;
3608 	ms->m_flags    = lkb->lkb_flags;
3609 	ms->m_lvbseq   = lkb->lkb_lvbseq;
3610 	ms->m_status   = lkb->lkb_status;
3611 	ms->m_grmode   = lkb->lkb_grmode;
3612 	ms->m_rqmode   = lkb->lkb_rqmode;
3613 	ms->m_hash     = r->res_hash;
3614 
3615 	/* m_result and m_bastmode are set from function args,
3616 	   not from lkb fields */
3617 
3618 	if (lkb->lkb_bastfn)
3619 		ms->m_asts |= DLM_CB_BAST;
3620 	if (lkb->lkb_astfn)
3621 		ms->m_asts |= DLM_CB_CAST;
3622 
3623 	/* compare with switch in create_message; send_remove() doesn't
3624 	   use send_args() */
3625 
3626 	switch (ms->m_type) {
3627 	case DLM_MSG_REQUEST:
3628 	case DLM_MSG_LOOKUP:
3629 		memcpy(ms->m_extra, r->res_name, r->res_length);
3630 		break;
3631 	case DLM_MSG_CONVERT:
3632 	case DLM_MSG_UNLOCK:
3633 	case DLM_MSG_REQUEST_REPLY:
3634 	case DLM_MSG_CONVERT_REPLY:
3635 	case DLM_MSG_GRANT:
3636 		if (!lkb->lkb_lvbptr)
3637 			break;
3638 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3639 		break;
3640 	}
3641 }
3642 
3643 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3644 {
3645 	struct dlm_message *ms;
3646 	struct dlm_mhandle *mh;
3647 	int to_nodeid, error;
3648 
3649 	to_nodeid = r->res_nodeid;
3650 
3651 	error = add_to_waiters(lkb, mstype, to_nodeid);
3652 	if (error)
3653 		return error;
3654 
3655 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3656 	if (error)
3657 		goto fail;
3658 
3659 	send_args(r, lkb, ms);
3660 
3661 	error = send_message(mh, ms);
3662 	if (error)
3663 		goto fail;
3664 	return 0;
3665 
3666  fail:
3667 	remove_from_waiters(lkb, msg_reply_type(mstype));
3668 	return error;
3669 }
3670 
3671 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3672 {
3673 	return send_common(r, lkb, DLM_MSG_REQUEST);
3674 }
3675 
3676 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3677 {
3678 	int error;
3679 
3680 	error = send_common(r, lkb, DLM_MSG_CONVERT);
3681 
3682 	/* down conversions go without a reply from the master */
3683 	if (!error && down_conversion(lkb)) {
3684 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3685 		r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
3686 		r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3687 		r->res_ls->ls_stub_ms.m_result = 0;
3688 		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3689 	}
3690 
3691 	return error;
3692 }
3693 
3694 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3695    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3696    that the master is still correct. */
3697 
3698 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3699 {
3700 	return send_common(r, lkb, DLM_MSG_UNLOCK);
3701 }
3702 
3703 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3704 {
3705 	return send_common(r, lkb, DLM_MSG_CANCEL);
3706 }
3707 
3708 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3709 {
3710 	struct dlm_message *ms;
3711 	struct dlm_mhandle *mh;
3712 	int to_nodeid, error;
3713 
3714 	to_nodeid = lkb->lkb_nodeid;
3715 
3716 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3717 	if (error)
3718 		goto out;
3719 
3720 	send_args(r, lkb, ms);
3721 
3722 	ms->m_result = 0;
3723 
3724 	error = send_message(mh, ms);
3725  out:
3726 	return error;
3727 }
3728 
3729 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3730 {
3731 	struct dlm_message *ms;
3732 	struct dlm_mhandle *mh;
3733 	int to_nodeid, error;
3734 
3735 	to_nodeid = lkb->lkb_nodeid;
3736 
3737 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3738 	if (error)
3739 		goto out;
3740 
3741 	send_args(r, lkb, ms);
3742 
3743 	ms->m_bastmode = mode;
3744 
3745 	error = send_message(mh, ms);
3746  out:
3747 	return error;
3748 }
3749 
3750 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3751 {
3752 	struct dlm_message *ms;
3753 	struct dlm_mhandle *mh;
3754 	int to_nodeid, error;
3755 
3756 	to_nodeid = dlm_dir_nodeid(r);
3757 
3758 	error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3759 	if (error)
3760 		return error;
3761 
3762 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3763 	if (error)
3764 		goto fail;
3765 
3766 	send_args(r, lkb, ms);
3767 
3768 	error = send_message(mh, ms);
3769 	if (error)
3770 		goto fail;
3771 	return 0;
3772 
3773  fail:
3774 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3775 	return error;
3776 }
3777 
3778 static int send_remove(struct dlm_rsb *r)
3779 {
3780 	struct dlm_message *ms;
3781 	struct dlm_mhandle *mh;
3782 	int to_nodeid, error;
3783 
3784 	to_nodeid = dlm_dir_nodeid(r);
3785 
3786 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3787 	if (error)
3788 		goto out;
3789 
3790 	memcpy(ms->m_extra, r->res_name, r->res_length);
3791 	ms->m_hash = r->res_hash;
3792 
3793 	error = send_message(mh, ms);
3794  out:
3795 	return error;
3796 }
3797 
3798 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3799 			     int mstype, int rv)
3800 {
3801 	struct dlm_message *ms;
3802 	struct dlm_mhandle *mh;
3803 	int to_nodeid, error;
3804 
3805 	to_nodeid = lkb->lkb_nodeid;
3806 
3807 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3808 	if (error)
3809 		goto out;
3810 
3811 	send_args(r, lkb, ms);
3812 
3813 	ms->m_result = rv;
3814 
3815 	error = send_message(mh, ms);
3816  out:
3817 	return error;
3818 }
3819 
3820 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3821 {
3822 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3823 }
3824 
3825 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3826 {
3827 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3828 }
3829 
3830 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3831 {
3832 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3833 }
3834 
3835 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3836 {
3837 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3838 }
3839 
3840 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3841 			     int ret_nodeid, int rv)
3842 {
3843 	struct dlm_rsb *r = &ls->ls_stub_rsb;
3844 	struct dlm_message *ms;
3845 	struct dlm_mhandle *mh;
3846 	int error, nodeid = ms_in->m_header.h_nodeid;
3847 
3848 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3849 	if (error)
3850 		goto out;
3851 
3852 	ms->m_lkid = ms_in->m_lkid;
3853 	ms->m_result = rv;
3854 	ms->m_nodeid = ret_nodeid;
3855 
3856 	error = send_message(mh, ms);
3857  out:
3858 	return error;
3859 }
3860 
3861 /* which args we save from a received message depends heavily on the type
3862    of message, unlike the send side where we can safely send everything about
3863    the lkb for any type of message */
3864 
3865 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3866 {
3867 	lkb->lkb_exflags = ms->m_exflags;
3868 	lkb->lkb_sbflags = ms->m_sbflags;
3869 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3870 		         (ms->m_flags & 0x0000FFFF);
3871 }
3872 
3873 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3874 {
3875 	if (ms->m_flags == DLM_IFL_STUB_MS)
3876 		return;
3877 
3878 	lkb->lkb_sbflags = ms->m_sbflags;
3879 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3880 		         (ms->m_flags & 0x0000FFFF);
3881 }
3882 
3883 static int receive_extralen(struct dlm_message *ms)
3884 {
3885 	return (ms->m_header.h_length - sizeof(struct dlm_message));
3886 }
3887 
3888 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3889 		       struct dlm_message *ms)
3890 {
3891 	int len;
3892 
3893 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3894 		if (!lkb->lkb_lvbptr)
3895 			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3896 		if (!lkb->lkb_lvbptr)
3897 			return -ENOMEM;
3898 		len = receive_extralen(ms);
3899 		if (len > ls->ls_lvblen)
3900 			len = ls->ls_lvblen;
3901 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3902 	}
3903 	return 0;
3904 }
3905 
3906 static void fake_bastfn(void *astparam, int mode)
3907 {
3908 	log_print("fake_bastfn should not be called");
3909 }
3910 
3911 static void fake_astfn(void *astparam)
3912 {
3913 	log_print("fake_astfn should not be called");
3914 }
3915 
3916 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3917 				struct dlm_message *ms)
3918 {
3919 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3920 	lkb->lkb_ownpid = ms->m_pid;
3921 	lkb->lkb_remid = ms->m_lkid;
3922 	lkb->lkb_grmode = DLM_LOCK_IV;
3923 	lkb->lkb_rqmode = ms->m_rqmode;
3924 
3925 	lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3926 	lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3927 
3928 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3929 		/* lkb was just created so there won't be an lvb yet */
3930 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3931 		if (!lkb->lkb_lvbptr)
3932 			return -ENOMEM;
3933 	}
3934 
3935 	return 0;
3936 }
3937 
3938 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3939 				struct dlm_message *ms)
3940 {
3941 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3942 		return -EBUSY;
3943 
3944 	if (receive_lvb(ls, lkb, ms))
3945 		return -ENOMEM;
3946 
3947 	lkb->lkb_rqmode = ms->m_rqmode;
3948 	lkb->lkb_lvbseq = ms->m_lvbseq;
3949 
3950 	return 0;
3951 }
3952 
3953 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3954 			       struct dlm_message *ms)
3955 {
3956 	if (receive_lvb(ls, lkb, ms))
3957 		return -ENOMEM;
3958 	return 0;
3959 }
3960 
3961 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3962    uses to send a reply and that the remote end uses to process the reply. */
3963 
3964 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3965 {
3966 	struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3967 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3968 	lkb->lkb_remid = ms->m_lkid;
3969 }
3970 
3971 /* This is called after the rsb is locked so that we can safely inspect
3972    fields in the lkb. */
3973 
3974 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3975 {
3976 	int from = ms->m_header.h_nodeid;
3977 	int error = 0;
3978 
3979 	switch (ms->m_type) {
3980 	case DLM_MSG_CONVERT:
3981 	case DLM_MSG_UNLOCK:
3982 	case DLM_MSG_CANCEL:
3983 		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3984 			error = -EINVAL;
3985 		break;
3986 
3987 	case DLM_MSG_CONVERT_REPLY:
3988 	case DLM_MSG_UNLOCK_REPLY:
3989 	case DLM_MSG_CANCEL_REPLY:
3990 	case DLM_MSG_GRANT:
3991 	case DLM_MSG_BAST:
3992 		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3993 			error = -EINVAL;
3994 		break;
3995 
3996 	case DLM_MSG_REQUEST_REPLY:
3997 		if (!is_process_copy(lkb))
3998 			error = -EINVAL;
3999 		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
4000 			error = -EINVAL;
4001 		break;
4002 
4003 	default:
4004 		error = -EINVAL;
4005 	}
4006 
4007 	if (error)
4008 		log_error(lkb->lkb_resource->res_ls,
4009 			  "ignore invalid message %d from %d %x %x %x %d",
4010 			  ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
4011 			  lkb->lkb_flags, lkb->lkb_nodeid);
4012 	return error;
4013 }
4014 
4015 static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
4016 {
4017 	char name[DLM_RESNAME_MAXLEN + 1];
4018 	struct dlm_message *ms;
4019 	struct dlm_mhandle *mh;
4020 	struct dlm_rsb *r;
4021 	uint32_t hash, b;
4022 	int rv, dir_nodeid;
4023 
4024 	memset(name, 0, sizeof(name));
4025 	memcpy(name, ms_name, len);
4026 
4027 	hash = jhash(name, len, 0);
4028 	b = hash & (ls->ls_rsbtbl_size - 1);
4029 
4030 	dir_nodeid = dlm_hash2nodeid(ls, hash);
4031 
4032 	log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
4033 
4034 	spin_lock(&ls->ls_rsbtbl[b].lock);
4035 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4036 	if (!rv) {
4037 		spin_unlock(&ls->ls_rsbtbl[b].lock);
4038 		log_error(ls, "repeat_remove on keep %s", name);
4039 		return;
4040 	}
4041 
4042 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4043 	if (!rv) {
4044 		spin_unlock(&ls->ls_rsbtbl[b].lock);
4045 		log_error(ls, "repeat_remove on toss %s", name);
4046 		return;
4047 	}
4048 
4049 	/* use ls->remove_name2 to avoid conflict with shrink? */
4050 
4051 	spin_lock(&ls->ls_remove_spin);
4052 	ls->ls_remove_len = len;
4053 	memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
4054 	spin_unlock(&ls->ls_remove_spin);
4055 	spin_unlock(&ls->ls_rsbtbl[b].lock);
4056 
4057 	rv = _create_message(ls, sizeof(struct dlm_message) + len,
4058 			     dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
4059 	if (rv)
4060 		return;
4061 
4062 	memcpy(ms->m_extra, name, len);
4063 	ms->m_hash = hash;
4064 
4065 	send_message(mh, ms);
4066 
4067 	spin_lock(&ls->ls_remove_spin);
4068 	ls->ls_remove_len = 0;
4069 	memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
4070 	spin_unlock(&ls->ls_remove_spin);
4071 }
4072 
4073 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
4074 {
4075 	struct dlm_lkb *lkb;
4076 	struct dlm_rsb *r;
4077 	int from_nodeid;
4078 	int error, namelen = 0;
4079 
4080 	from_nodeid = ms->m_header.h_nodeid;
4081 
4082 	error = create_lkb(ls, &lkb);
4083 	if (error)
4084 		goto fail;
4085 
4086 	receive_flags(lkb, ms);
4087 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
4088 	error = receive_request_args(ls, lkb, ms);
4089 	if (error) {
4090 		__put_lkb(ls, lkb);
4091 		goto fail;
4092 	}
4093 
4094 	/* The dir node is the authority on whether we are the master
4095 	   for this rsb or not, so if the master sends us a request, we should
4096 	   recreate the rsb if we've destroyed it.   This race happens when we
4097 	   send a remove message to the dir node at the same time that the dir
4098 	   node sends us a request for the rsb. */
4099 
4100 	namelen = receive_extralen(ms);
4101 
4102 	error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4103 			 R_RECEIVE_REQUEST, &r);
4104 	if (error) {
4105 		__put_lkb(ls, lkb);
4106 		goto fail;
4107 	}
4108 
4109 	lock_rsb(r);
4110 
4111 	if (r->res_master_nodeid != dlm_our_nodeid()) {
4112 		error = validate_master_nodeid(ls, r, from_nodeid);
4113 		if (error) {
4114 			unlock_rsb(r);
4115 			put_rsb(r);
4116 			__put_lkb(ls, lkb);
4117 			goto fail;
4118 		}
4119 	}
4120 
4121 	attach_lkb(r, lkb);
4122 	error = do_request(r, lkb);
4123 	send_request_reply(r, lkb, error);
4124 	do_request_effects(r, lkb, error);
4125 
4126 	unlock_rsb(r);
4127 	put_rsb(r);
4128 
4129 	if (error == -EINPROGRESS)
4130 		error = 0;
4131 	if (error)
4132 		dlm_put_lkb(lkb);
4133 	return 0;
4134 
4135  fail:
4136 	/* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4137 	   and do this receive_request again from process_lookup_list once
4138 	   we get the lookup reply.  This would avoid a many repeated
4139 	   ENOTBLK request failures when the lookup reply designating us
4140 	   as master is delayed. */
4141 
4142 	/* We could repeatedly return -EBADR here if our send_remove() is
4143 	   delayed in being sent/arriving/being processed on the dir node.
4144 	   Another node would repeatedly lookup up the master, and the dir
4145 	   node would continue returning our nodeid until our send_remove
4146 	   took effect.
4147 
4148 	   We send another remove message in case our previous send_remove
4149 	   was lost/ignored/missed somehow. */
4150 
4151 	if (error != -ENOTBLK) {
4152 		log_limit(ls, "receive_request %x from %d %d",
4153 			  ms->m_lkid, from_nodeid, error);
4154 	}
4155 
4156 	if (namelen && error == -EBADR) {
4157 		send_repeat_remove(ls, ms->m_extra, namelen);
4158 		msleep(1000);
4159 	}
4160 
4161 	setup_stub_lkb(ls, ms);
4162 	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4163 	return error;
4164 }
4165 
4166 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
4167 {
4168 	struct dlm_lkb *lkb;
4169 	struct dlm_rsb *r;
4170 	int error, reply = 1;
4171 
4172 	error = find_lkb(ls, ms->m_remid, &lkb);
4173 	if (error)
4174 		goto fail;
4175 
4176 	if (lkb->lkb_remid != ms->m_lkid) {
4177 		log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4178 			  "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4179 			  (unsigned long long)lkb->lkb_recover_seq,
4180 			  ms->m_header.h_nodeid, ms->m_lkid);
4181 		error = -ENOENT;
4182 		goto fail;
4183 	}
4184 
4185 	r = lkb->lkb_resource;
4186 
4187 	hold_rsb(r);
4188 	lock_rsb(r);
4189 
4190 	error = validate_message(lkb, ms);
4191 	if (error)
4192 		goto out;
4193 
4194 	receive_flags(lkb, ms);
4195 
4196 	error = receive_convert_args(ls, lkb, ms);
4197 	if (error) {
4198 		send_convert_reply(r, lkb, error);
4199 		goto out;
4200 	}
4201 
4202 	reply = !down_conversion(lkb);
4203 
4204 	error = do_convert(r, lkb);
4205 	if (reply)
4206 		send_convert_reply(r, lkb, error);
4207 	do_convert_effects(r, lkb, error);
4208  out:
4209 	unlock_rsb(r);
4210 	put_rsb(r);
4211 	dlm_put_lkb(lkb);
4212 	return 0;
4213 
4214  fail:
4215 	setup_stub_lkb(ls, ms);
4216 	send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4217 	return error;
4218 }
4219 
4220 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
4221 {
4222 	struct dlm_lkb *lkb;
4223 	struct dlm_rsb *r;
4224 	int error;
4225 
4226 	error = find_lkb(ls, ms->m_remid, &lkb);
4227 	if (error)
4228 		goto fail;
4229 
4230 	if (lkb->lkb_remid != ms->m_lkid) {
4231 		log_error(ls, "receive_unlock %x remid %x remote %d %x",
4232 			  lkb->lkb_id, lkb->lkb_remid,
4233 			  ms->m_header.h_nodeid, ms->m_lkid);
4234 		error = -ENOENT;
4235 		goto fail;
4236 	}
4237 
4238 	r = lkb->lkb_resource;
4239 
4240 	hold_rsb(r);
4241 	lock_rsb(r);
4242 
4243 	error = validate_message(lkb, ms);
4244 	if (error)
4245 		goto out;
4246 
4247 	receive_flags(lkb, ms);
4248 
4249 	error = receive_unlock_args(ls, lkb, ms);
4250 	if (error) {
4251 		send_unlock_reply(r, lkb, error);
4252 		goto out;
4253 	}
4254 
4255 	error = do_unlock(r, lkb);
4256 	send_unlock_reply(r, lkb, error);
4257 	do_unlock_effects(r, lkb, error);
4258  out:
4259 	unlock_rsb(r);
4260 	put_rsb(r);
4261 	dlm_put_lkb(lkb);
4262 	return 0;
4263 
4264  fail:
4265 	setup_stub_lkb(ls, ms);
4266 	send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4267 	return error;
4268 }
4269 
4270 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4271 {
4272 	struct dlm_lkb *lkb;
4273 	struct dlm_rsb *r;
4274 	int error;
4275 
4276 	error = find_lkb(ls, ms->m_remid, &lkb);
4277 	if (error)
4278 		goto fail;
4279 
4280 	receive_flags(lkb, ms);
4281 
4282 	r = lkb->lkb_resource;
4283 
4284 	hold_rsb(r);
4285 	lock_rsb(r);
4286 
4287 	error = validate_message(lkb, ms);
4288 	if (error)
4289 		goto out;
4290 
4291 	error = do_cancel(r, lkb);
4292 	send_cancel_reply(r, lkb, error);
4293 	do_cancel_effects(r, lkb, error);
4294  out:
4295 	unlock_rsb(r);
4296 	put_rsb(r);
4297 	dlm_put_lkb(lkb);
4298 	return 0;
4299 
4300  fail:
4301 	setup_stub_lkb(ls, ms);
4302 	send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4303 	return error;
4304 }
4305 
4306 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4307 {
4308 	struct dlm_lkb *lkb;
4309 	struct dlm_rsb *r;
4310 	int error;
4311 
4312 	error = find_lkb(ls, ms->m_remid, &lkb);
4313 	if (error)
4314 		return error;
4315 
4316 	r = lkb->lkb_resource;
4317 
4318 	hold_rsb(r);
4319 	lock_rsb(r);
4320 
4321 	error = validate_message(lkb, ms);
4322 	if (error)
4323 		goto out;
4324 
4325 	receive_flags_reply(lkb, ms);
4326 	if (is_altmode(lkb))
4327 		munge_altmode(lkb, ms);
4328 	grant_lock_pc(r, lkb, ms);
4329 	queue_cast(r, lkb, 0);
4330  out:
4331 	unlock_rsb(r);
4332 	put_rsb(r);
4333 	dlm_put_lkb(lkb);
4334 	return 0;
4335 }
4336 
4337 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4338 {
4339 	struct dlm_lkb *lkb;
4340 	struct dlm_rsb *r;
4341 	int error;
4342 
4343 	error = find_lkb(ls, ms->m_remid, &lkb);
4344 	if (error)
4345 		return error;
4346 
4347 	r = lkb->lkb_resource;
4348 
4349 	hold_rsb(r);
4350 	lock_rsb(r);
4351 
4352 	error = validate_message(lkb, ms);
4353 	if (error)
4354 		goto out;
4355 
4356 	queue_bast(r, lkb, ms->m_bastmode);
4357 	lkb->lkb_highbast = ms->m_bastmode;
4358  out:
4359 	unlock_rsb(r);
4360 	put_rsb(r);
4361 	dlm_put_lkb(lkb);
4362 	return 0;
4363 }
4364 
4365 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4366 {
4367 	int len, error, ret_nodeid, from_nodeid, our_nodeid;
4368 
4369 	from_nodeid = ms->m_header.h_nodeid;
4370 	our_nodeid = dlm_our_nodeid();
4371 
4372 	len = receive_extralen(ms);
4373 
4374 	error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4375 				  &ret_nodeid, NULL);
4376 
4377 	/* Optimization: we're master so treat lookup as a request */
4378 	if (!error && ret_nodeid == our_nodeid) {
4379 		receive_request(ls, ms);
4380 		return;
4381 	}
4382 	send_lookup_reply(ls, ms, ret_nodeid, error);
4383 }
4384 
4385 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4386 {
4387 	char name[DLM_RESNAME_MAXLEN+1];
4388 	struct dlm_rsb *r;
4389 	uint32_t hash, b;
4390 	int rv, len, dir_nodeid, from_nodeid;
4391 
4392 	from_nodeid = ms->m_header.h_nodeid;
4393 
4394 	len = receive_extralen(ms);
4395 
4396 	if (len > DLM_RESNAME_MAXLEN) {
4397 		log_error(ls, "receive_remove from %d bad len %d",
4398 			  from_nodeid, len);
4399 		return;
4400 	}
4401 
4402 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
4403 	if (dir_nodeid != dlm_our_nodeid()) {
4404 		log_error(ls, "receive_remove from %d bad nodeid %d",
4405 			  from_nodeid, dir_nodeid);
4406 		return;
4407 	}
4408 
4409 	/* Look for name on rsbtbl.toss, if it's there, kill it.
4410 	   If it's on rsbtbl.keep, it's being used, and we should ignore this
4411 	   message.  This is an expected race between the dir node sending a
4412 	   request to the master node at the same time as the master node sends
4413 	   a remove to the dir node.  The resolution to that race is for the
4414 	   dir node to ignore the remove message, and the master node to
4415 	   recreate the master rsb when it gets a request from the dir node for
4416 	   an rsb it doesn't have. */
4417 
4418 	memset(name, 0, sizeof(name));
4419 	memcpy(name, ms->m_extra, len);
4420 
4421 	hash = jhash(name, len, 0);
4422 	b = hash & (ls->ls_rsbtbl_size - 1);
4423 
4424 	spin_lock(&ls->ls_rsbtbl[b].lock);
4425 
4426 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4427 	if (rv) {
4428 		/* verify the rsb is on keep list per comment above */
4429 		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4430 		if (rv) {
4431 			/* should not happen */
4432 			log_error(ls, "receive_remove from %d not found %s",
4433 				  from_nodeid, name);
4434 			spin_unlock(&ls->ls_rsbtbl[b].lock);
4435 			return;
4436 		}
4437 		if (r->res_master_nodeid != from_nodeid) {
4438 			/* should not happen */
4439 			log_error(ls, "receive_remove keep from %d master %d",
4440 				  from_nodeid, r->res_master_nodeid);
4441 			dlm_print_rsb(r);
4442 			spin_unlock(&ls->ls_rsbtbl[b].lock);
4443 			return;
4444 		}
4445 
4446 		log_debug(ls, "receive_remove from %d master %d first %x %s",
4447 			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4448 			  name);
4449 		spin_unlock(&ls->ls_rsbtbl[b].lock);
4450 		return;
4451 	}
4452 
4453 	if (r->res_master_nodeid != from_nodeid) {
4454 		log_error(ls, "receive_remove toss from %d master %d",
4455 			  from_nodeid, r->res_master_nodeid);
4456 		dlm_print_rsb(r);
4457 		spin_unlock(&ls->ls_rsbtbl[b].lock);
4458 		return;
4459 	}
4460 
4461 	if (kref_put(&r->res_ref, kill_rsb)) {
4462 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4463 		spin_unlock(&ls->ls_rsbtbl[b].lock);
4464 		dlm_free_rsb(r);
4465 	} else {
4466 		log_error(ls, "receive_remove from %d rsb ref error",
4467 			  from_nodeid);
4468 		dlm_print_rsb(r);
4469 		spin_unlock(&ls->ls_rsbtbl[b].lock);
4470 	}
4471 }
4472 
4473 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4474 {
4475 	do_purge(ls, ms->m_nodeid, ms->m_pid);
4476 }
4477 
4478 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4479 {
4480 	struct dlm_lkb *lkb;
4481 	struct dlm_rsb *r;
4482 	int error, mstype, result;
4483 	int from_nodeid = ms->m_header.h_nodeid;
4484 
4485 	error = find_lkb(ls, ms->m_remid, &lkb);
4486 	if (error)
4487 		return error;
4488 
4489 	r = lkb->lkb_resource;
4490 	hold_rsb(r);
4491 	lock_rsb(r);
4492 
4493 	error = validate_message(lkb, ms);
4494 	if (error)
4495 		goto out;
4496 
4497 	mstype = lkb->lkb_wait_type;
4498 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4499 	if (error) {
4500 		log_error(ls, "receive_request_reply %x remote %d %x result %d",
4501 			  lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result);
4502 		dlm_dump_rsb(r);
4503 		goto out;
4504 	}
4505 
4506 	/* Optimization: the dir node was also the master, so it took our
4507 	   lookup as a request and sent request reply instead of lookup reply */
4508 	if (mstype == DLM_MSG_LOOKUP) {
4509 		r->res_master_nodeid = from_nodeid;
4510 		r->res_nodeid = from_nodeid;
4511 		lkb->lkb_nodeid = from_nodeid;
4512 	}
4513 
4514 	/* this is the value returned from do_request() on the master */
4515 	result = ms->m_result;
4516 
4517 	switch (result) {
4518 	case -EAGAIN:
4519 		/* request would block (be queued) on remote master */
4520 		queue_cast(r, lkb, -EAGAIN);
4521 		confirm_master(r, -EAGAIN);
4522 		unhold_lkb(lkb); /* undoes create_lkb() */
4523 		break;
4524 
4525 	case -EINPROGRESS:
4526 	case 0:
4527 		/* request was queued or granted on remote master */
4528 		receive_flags_reply(lkb, ms);
4529 		lkb->lkb_remid = ms->m_lkid;
4530 		if (is_altmode(lkb))
4531 			munge_altmode(lkb, ms);
4532 		if (result) {
4533 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
4534 			add_timeout(lkb);
4535 		} else {
4536 			grant_lock_pc(r, lkb, ms);
4537 			queue_cast(r, lkb, 0);
4538 		}
4539 		confirm_master(r, result);
4540 		break;
4541 
4542 	case -EBADR:
4543 	case -ENOTBLK:
4544 		/* find_rsb failed to find rsb or rsb wasn't master */
4545 		log_limit(ls, "receive_request_reply %x from %d %d "
4546 			  "master %d dir %d first %x %s", lkb->lkb_id,
4547 			  from_nodeid, result, r->res_master_nodeid,
4548 			  r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4549 
4550 		if (r->res_dir_nodeid != dlm_our_nodeid() &&
4551 		    r->res_master_nodeid != dlm_our_nodeid()) {
4552 			/* cause _request_lock->set_master->send_lookup */
4553 			r->res_master_nodeid = 0;
4554 			r->res_nodeid = -1;
4555 			lkb->lkb_nodeid = -1;
4556 		}
4557 
4558 		if (is_overlap(lkb)) {
4559 			/* we'll ignore error in cancel/unlock reply */
4560 			queue_cast_overlap(r, lkb);
4561 			confirm_master(r, result);
4562 			unhold_lkb(lkb); /* undoes create_lkb() */
4563 		} else {
4564 			_request_lock(r, lkb);
4565 
4566 			if (r->res_master_nodeid == dlm_our_nodeid())
4567 				confirm_master(r, 0);
4568 		}
4569 		break;
4570 
4571 	default:
4572 		log_error(ls, "receive_request_reply %x error %d",
4573 			  lkb->lkb_id, result);
4574 	}
4575 
4576 	if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
4577 		log_debug(ls, "receive_request_reply %x result %d unlock",
4578 			  lkb->lkb_id, result);
4579 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4580 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4581 		send_unlock(r, lkb);
4582 	} else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
4583 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4584 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4585 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4586 		send_cancel(r, lkb);
4587 	} else {
4588 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4589 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4590 	}
4591  out:
4592 	unlock_rsb(r);
4593 	put_rsb(r);
4594 	dlm_put_lkb(lkb);
4595 	return 0;
4596 }
4597 
4598 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4599 				    struct dlm_message *ms)
4600 {
4601 	/* this is the value returned from do_convert() on the master */
4602 	switch (ms->m_result) {
4603 	case -EAGAIN:
4604 		/* convert would block (be queued) on remote master */
4605 		queue_cast(r, lkb, -EAGAIN);
4606 		break;
4607 
4608 	case -EDEADLK:
4609 		receive_flags_reply(lkb, ms);
4610 		revert_lock_pc(r, lkb);
4611 		queue_cast(r, lkb, -EDEADLK);
4612 		break;
4613 
4614 	case -EINPROGRESS:
4615 		/* convert was queued on remote master */
4616 		receive_flags_reply(lkb, ms);
4617 		if (is_demoted(lkb))
4618 			munge_demoted(lkb);
4619 		del_lkb(r, lkb);
4620 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4621 		add_timeout(lkb);
4622 		break;
4623 
4624 	case 0:
4625 		/* convert was granted on remote master */
4626 		receive_flags_reply(lkb, ms);
4627 		if (is_demoted(lkb))
4628 			munge_demoted(lkb);
4629 		grant_lock_pc(r, lkb, ms);
4630 		queue_cast(r, lkb, 0);
4631 		break;
4632 
4633 	default:
4634 		log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4635 			  lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
4636 			  ms->m_result);
4637 		dlm_print_rsb(r);
4638 		dlm_print_lkb(lkb);
4639 	}
4640 }
4641 
4642 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4643 {
4644 	struct dlm_rsb *r = lkb->lkb_resource;
4645 	int error;
4646 
4647 	hold_rsb(r);
4648 	lock_rsb(r);
4649 
4650 	error = validate_message(lkb, ms);
4651 	if (error)
4652 		goto out;
4653 
4654 	/* stub reply can happen with waiters_mutex held */
4655 	error = remove_from_waiters_ms(lkb, ms);
4656 	if (error)
4657 		goto out;
4658 
4659 	__receive_convert_reply(r, lkb, ms);
4660  out:
4661 	unlock_rsb(r);
4662 	put_rsb(r);
4663 }
4664 
4665 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4666 {
4667 	struct dlm_lkb *lkb;
4668 	int error;
4669 
4670 	error = find_lkb(ls, ms->m_remid, &lkb);
4671 	if (error)
4672 		return error;
4673 
4674 	_receive_convert_reply(lkb, ms);
4675 	dlm_put_lkb(lkb);
4676 	return 0;
4677 }
4678 
4679 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4680 {
4681 	struct dlm_rsb *r = lkb->lkb_resource;
4682 	int error;
4683 
4684 	hold_rsb(r);
4685 	lock_rsb(r);
4686 
4687 	error = validate_message(lkb, ms);
4688 	if (error)
4689 		goto out;
4690 
4691 	/* stub reply can happen with waiters_mutex held */
4692 	error = remove_from_waiters_ms(lkb, ms);
4693 	if (error)
4694 		goto out;
4695 
4696 	/* this is the value returned from do_unlock() on the master */
4697 
4698 	switch (ms->m_result) {
4699 	case -DLM_EUNLOCK:
4700 		receive_flags_reply(lkb, ms);
4701 		remove_lock_pc(r, lkb);
4702 		queue_cast(r, lkb, -DLM_EUNLOCK);
4703 		break;
4704 	case -ENOENT:
4705 		break;
4706 	default:
4707 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
4708 			  lkb->lkb_id, ms->m_result);
4709 	}
4710  out:
4711 	unlock_rsb(r);
4712 	put_rsb(r);
4713 }
4714 
4715 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4716 {
4717 	struct dlm_lkb *lkb;
4718 	int error;
4719 
4720 	error = find_lkb(ls, ms->m_remid, &lkb);
4721 	if (error)
4722 		return error;
4723 
4724 	_receive_unlock_reply(lkb, ms);
4725 	dlm_put_lkb(lkb);
4726 	return 0;
4727 }
4728 
4729 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4730 {
4731 	struct dlm_rsb *r = lkb->lkb_resource;
4732 	int error;
4733 
4734 	hold_rsb(r);
4735 	lock_rsb(r);
4736 
4737 	error = validate_message(lkb, ms);
4738 	if (error)
4739 		goto out;
4740 
4741 	/* stub reply can happen with waiters_mutex held */
4742 	error = remove_from_waiters_ms(lkb, ms);
4743 	if (error)
4744 		goto out;
4745 
4746 	/* this is the value returned from do_cancel() on the master */
4747 
4748 	switch (ms->m_result) {
4749 	case -DLM_ECANCEL:
4750 		receive_flags_reply(lkb, ms);
4751 		revert_lock_pc(r, lkb);
4752 		queue_cast(r, lkb, -DLM_ECANCEL);
4753 		break;
4754 	case 0:
4755 		break;
4756 	default:
4757 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
4758 			  lkb->lkb_id, ms->m_result);
4759 	}
4760  out:
4761 	unlock_rsb(r);
4762 	put_rsb(r);
4763 }
4764 
4765 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4766 {
4767 	struct dlm_lkb *lkb;
4768 	int error;
4769 
4770 	error = find_lkb(ls, ms->m_remid, &lkb);
4771 	if (error)
4772 		return error;
4773 
4774 	_receive_cancel_reply(lkb, ms);
4775 	dlm_put_lkb(lkb);
4776 	return 0;
4777 }
4778 
4779 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4780 {
4781 	struct dlm_lkb *lkb;
4782 	struct dlm_rsb *r;
4783 	int error, ret_nodeid;
4784 	int do_lookup_list = 0;
4785 
4786 	error = find_lkb(ls, ms->m_lkid, &lkb);
4787 	if (error) {
4788 		log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
4789 		return;
4790 	}
4791 
4792 	/* ms->m_result is the value returned by dlm_master_lookup on dir node
4793 	   FIXME: will a non-zero error ever be returned? */
4794 
4795 	r = lkb->lkb_resource;
4796 	hold_rsb(r);
4797 	lock_rsb(r);
4798 
4799 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4800 	if (error)
4801 		goto out;
4802 
4803 	ret_nodeid = ms->m_nodeid;
4804 
4805 	/* We sometimes receive a request from the dir node for this
4806 	   rsb before we've received the dir node's loookup_reply for it.
4807 	   The request from the dir node implies we're the master, so we set
4808 	   ourself as master in receive_request_reply, and verify here that
4809 	   we are indeed the master. */
4810 
4811 	if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4812 		/* This should never happen */
4813 		log_error(ls, "receive_lookup_reply %x from %d ret %d "
4814 			  "master %d dir %d our %d first %x %s",
4815 			  lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid,
4816 			  r->res_master_nodeid, r->res_dir_nodeid,
4817 			  dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4818 	}
4819 
4820 	if (ret_nodeid == dlm_our_nodeid()) {
4821 		r->res_master_nodeid = ret_nodeid;
4822 		r->res_nodeid = 0;
4823 		do_lookup_list = 1;
4824 		r->res_first_lkid = 0;
4825 	} else if (ret_nodeid == -1) {
4826 		/* the remote node doesn't believe it's the dir node */
4827 		log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4828 			  lkb->lkb_id, ms->m_header.h_nodeid);
4829 		r->res_master_nodeid = 0;
4830 		r->res_nodeid = -1;
4831 		lkb->lkb_nodeid = -1;
4832 	} else {
4833 		/* set_master() will set lkb_nodeid from r */
4834 		r->res_master_nodeid = ret_nodeid;
4835 		r->res_nodeid = ret_nodeid;
4836 	}
4837 
4838 	if (is_overlap(lkb)) {
4839 		log_debug(ls, "receive_lookup_reply %x unlock %x",
4840 			  lkb->lkb_id, lkb->lkb_flags);
4841 		queue_cast_overlap(r, lkb);
4842 		unhold_lkb(lkb); /* undoes create_lkb() */
4843 		goto out_list;
4844 	}
4845 
4846 	_request_lock(r, lkb);
4847 
4848  out_list:
4849 	if (do_lookup_list)
4850 		process_lookup_list(r);
4851  out:
4852 	unlock_rsb(r);
4853 	put_rsb(r);
4854 	dlm_put_lkb(lkb);
4855 }
4856 
4857 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4858 			     uint32_t saved_seq)
4859 {
4860 	int error = 0, noent = 0;
4861 
4862 	if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4863 		log_limit(ls, "receive %d from non-member %d %x %x %d",
4864 			  ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4865 			  ms->m_remid, ms->m_result);
4866 		return;
4867 	}
4868 
4869 	switch (ms->m_type) {
4870 
4871 	/* messages sent to a master node */
4872 
4873 	case DLM_MSG_REQUEST:
4874 		error = receive_request(ls, ms);
4875 		break;
4876 
4877 	case DLM_MSG_CONVERT:
4878 		error = receive_convert(ls, ms);
4879 		break;
4880 
4881 	case DLM_MSG_UNLOCK:
4882 		error = receive_unlock(ls, ms);
4883 		break;
4884 
4885 	case DLM_MSG_CANCEL:
4886 		noent = 1;
4887 		error = receive_cancel(ls, ms);
4888 		break;
4889 
4890 	/* messages sent from a master node (replies to above) */
4891 
4892 	case DLM_MSG_REQUEST_REPLY:
4893 		error = receive_request_reply(ls, ms);
4894 		break;
4895 
4896 	case DLM_MSG_CONVERT_REPLY:
4897 		error = receive_convert_reply(ls, ms);
4898 		break;
4899 
4900 	case DLM_MSG_UNLOCK_REPLY:
4901 		error = receive_unlock_reply(ls, ms);
4902 		break;
4903 
4904 	case DLM_MSG_CANCEL_REPLY:
4905 		error = receive_cancel_reply(ls, ms);
4906 		break;
4907 
4908 	/* messages sent from a master node (only two types of async msg) */
4909 
4910 	case DLM_MSG_GRANT:
4911 		noent = 1;
4912 		error = receive_grant(ls, ms);
4913 		break;
4914 
4915 	case DLM_MSG_BAST:
4916 		noent = 1;
4917 		error = receive_bast(ls, ms);
4918 		break;
4919 
4920 	/* messages sent to a dir node */
4921 
4922 	case DLM_MSG_LOOKUP:
4923 		receive_lookup(ls, ms);
4924 		break;
4925 
4926 	case DLM_MSG_REMOVE:
4927 		receive_remove(ls, ms);
4928 		break;
4929 
4930 	/* messages sent from a dir node (remove has no reply) */
4931 
4932 	case DLM_MSG_LOOKUP_REPLY:
4933 		receive_lookup_reply(ls, ms);
4934 		break;
4935 
4936 	/* other messages */
4937 
4938 	case DLM_MSG_PURGE:
4939 		receive_purge(ls, ms);
4940 		break;
4941 
4942 	default:
4943 		log_error(ls, "unknown message type %d", ms->m_type);
4944 	}
4945 
4946 	/*
4947 	 * When checking for ENOENT, we're checking the result of
4948 	 * find_lkb(m_remid):
4949 	 *
4950 	 * The lock id referenced in the message wasn't found.  This may
4951 	 * happen in normal usage for the async messages and cancel, so
4952 	 * only use log_debug for them.
4953 	 *
4954 	 * Some errors are expected and normal.
4955 	 */
4956 
4957 	if (error == -ENOENT && noent) {
4958 		log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4959 			  ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4960 			  ms->m_lkid, saved_seq);
4961 	} else if (error == -ENOENT) {
4962 		log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4963 			  ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4964 			  ms->m_lkid, saved_seq);
4965 
4966 		if (ms->m_type == DLM_MSG_CONVERT)
4967 			dlm_dump_rsb_hash(ls, ms->m_hash);
4968 	}
4969 
4970 	if (error == -EINVAL) {
4971 		log_error(ls, "receive %d inval from %d lkid %x remid %x "
4972 			  "saved_seq %u",
4973 			  ms->m_type, ms->m_header.h_nodeid,
4974 			  ms->m_lkid, ms->m_remid, saved_seq);
4975 	}
4976 }
4977 
4978 /* If the lockspace is in recovery mode (locking stopped), then normal
4979    messages are saved on the requestqueue for processing after recovery is
4980    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4981    messages off the requestqueue before we process new ones. This occurs right
4982    after recovery completes when we transition from saving all messages on
4983    requestqueue, to processing all the saved messages, to processing new
4984    messages as they arrive. */
4985 
4986 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4987 				int nodeid)
4988 {
4989 	if (dlm_locking_stopped(ls)) {
4990 		/* If we were a member of this lockspace, left, and rejoined,
4991 		   other nodes may still be sending us messages from the
4992 		   lockspace generation before we left. */
4993 		if (!ls->ls_generation) {
4994 			log_limit(ls, "receive %d from %d ignore old gen",
4995 				  ms->m_type, nodeid);
4996 			return;
4997 		}
4998 
4999 		dlm_add_requestqueue(ls, nodeid, ms);
5000 	} else {
5001 		dlm_wait_requestqueue(ls);
5002 		_receive_message(ls, ms, 0);
5003 	}
5004 }
5005 
5006 /* This is called by dlm_recoverd to process messages that were saved on
5007    the requestqueue. */
5008 
5009 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
5010 			       uint32_t saved_seq)
5011 {
5012 	_receive_message(ls, ms, saved_seq);
5013 }
5014 
5015 /* This is called by the midcomms layer when something is received for
5016    the lockspace.  It could be either a MSG (normal message sent as part of
5017    standard locking activity) or an RCOM (recovery message sent as part of
5018    lockspace recovery). */
5019 
5020 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
5021 {
5022 	struct dlm_header *hd = &p->header;
5023 	struct dlm_ls *ls;
5024 	int type = 0;
5025 
5026 	switch (hd->h_cmd) {
5027 	case DLM_MSG:
5028 		dlm_message_in(&p->message);
5029 		type = p->message.m_type;
5030 		break;
5031 	case DLM_RCOM:
5032 		dlm_rcom_in(&p->rcom);
5033 		type = p->rcom.rc_type;
5034 		break;
5035 	default:
5036 		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
5037 		return;
5038 	}
5039 
5040 	if (hd->h_nodeid != nodeid) {
5041 		log_print("invalid h_nodeid %d from %d lockspace %x",
5042 			  hd->h_nodeid, nodeid, hd->h_lockspace);
5043 		return;
5044 	}
5045 
5046 	ls = dlm_find_lockspace_global(hd->h_lockspace);
5047 	if (!ls) {
5048 		if (dlm_config.ci_log_debug) {
5049 			printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
5050 				"%u from %d cmd %d type %d\n",
5051 				hd->h_lockspace, nodeid, hd->h_cmd, type);
5052 		}
5053 
5054 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
5055 			dlm_send_ls_not_ready(nodeid, &p->rcom);
5056 		return;
5057 	}
5058 
5059 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
5060 	   be inactive (in this ls) before transitioning to recovery mode */
5061 
5062 	down_read(&ls->ls_recv_active);
5063 	if (hd->h_cmd == DLM_MSG)
5064 		dlm_receive_message(ls, &p->message, nodeid);
5065 	else
5066 		dlm_receive_rcom(ls, &p->rcom, nodeid);
5067 	up_read(&ls->ls_recv_active);
5068 
5069 	dlm_put_lockspace(ls);
5070 }
5071 
5072 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5073 				   struct dlm_message *ms_stub)
5074 {
5075 	if (middle_conversion(lkb)) {
5076 		hold_lkb(lkb);
5077 		memset(ms_stub, 0, sizeof(struct dlm_message));
5078 		ms_stub->m_flags = DLM_IFL_STUB_MS;
5079 		ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
5080 		ms_stub->m_result = -EINPROGRESS;
5081 		ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5082 		_receive_convert_reply(lkb, ms_stub);
5083 
5084 		/* Same special case as in receive_rcom_lock_args() */
5085 		lkb->lkb_grmode = DLM_LOCK_IV;
5086 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5087 		unhold_lkb(lkb);
5088 
5089 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5090 		lkb->lkb_flags |= DLM_IFL_RESEND;
5091 	}
5092 
5093 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5094 	   conversions are async; there's no reply from the remote master */
5095 }
5096 
5097 /* A waiting lkb needs recovery if the master node has failed, or
5098    the master node is changing (only when no directory is used) */
5099 
5100 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5101 				 int dir_nodeid)
5102 {
5103 	if (dlm_no_directory(ls))
5104 		return 1;
5105 
5106 	if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5107 		return 1;
5108 
5109 	return 0;
5110 }
5111 
5112 /* Recovery for locks that are waiting for replies from nodes that are now
5113    gone.  We can just complete unlocks and cancels by faking a reply from the
5114    dead node.  Requests and up-conversions we flag to be resent after
5115    recovery.  Down-conversions can just be completed with a fake reply like
5116    unlocks.  Conversions between PR and CW need special attention. */
5117 
5118 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5119 {
5120 	struct dlm_lkb *lkb, *safe;
5121 	struct dlm_message *ms_stub;
5122 	int wait_type, stub_unlock_result, stub_cancel_result;
5123 	int dir_nodeid;
5124 
5125 	ms_stub = kmalloc(sizeof(*ms_stub), GFP_KERNEL);
5126 	if (!ms_stub)
5127 		return;
5128 
5129 	mutex_lock(&ls->ls_waiters_mutex);
5130 
5131 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5132 
5133 		dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5134 
5135 		/* exclude debug messages about unlocks because there can be so
5136 		   many and they aren't very interesting */
5137 
5138 		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5139 			log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5140 				  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5141 				  lkb->lkb_id,
5142 				  lkb->lkb_remid,
5143 				  lkb->lkb_wait_type,
5144 				  lkb->lkb_resource->res_nodeid,
5145 				  lkb->lkb_nodeid,
5146 				  lkb->lkb_wait_nodeid,
5147 				  dir_nodeid);
5148 		}
5149 
5150 		/* all outstanding lookups, regardless of destination  will be
5151 		   resent after recovery is done */
5152 
5153 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5154 			lkb->lkb_flags |= DLM_IFL_RESEND;
5155 			continue;
5156 		}
5157 
5158 		if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5159 			continue;
5160 
5161 		wait_type = lkb->lkb_wait_type;
5162 		stub_unlock_result = -DLM_EUNLOCK;
5163 		stub_cancel_result = -DLM_ECANCEL;
5164 
5165 		/* Main reply may have been received leaving a zero wait_type,
5166 		   but a reply for the overlapping op may not have been
5167 		   received.  In that case we need to fake the appropriate
5168 		   reply for the overlap op. */
5169 
5170 		if (!wait_type) {
5171 			if (is_overlap_cancel(lkb)) {
5172 				wait_type = DLM_MSG_CANCEL;
5173 				if (lkb->lkb_grmode == DLM_LOCK_IV)
5174 					stub_cancel_result = 0;
5175 			}
5176 			if (is_overlap_unlock(lkb)) {
5177 				wait_type = DLM_MSG_UNLOCK;
5178 				if (lkb->lkb_grmode == DLM_LOCK_IV)
5179 					stub_unlock_result = -ENOENT;
5180 			}
5181 
5182 			log_debug(ls, "rwpre overlap %x %x %d %d %d",
5183 				  lkb->lkb_id, lkb->lkb_flags, wait_type,
5184 				  stub_cancel_result, stub_unlock_result);
5185 		}
5186 
5187 		switch (wait_type) {
5188 
5189 		case DLM_MSG_REQUEST:
5190 			lkb->lkb_flags |= DLM_IFL_RESEND;
5191 			break;
5192 
5193 		case DLM_MSG_CONVERT:
5194 			recover_convert_waiter(ls, lkb, ms_stub);
5195 			break;
5196 
5197 		case DLM_MSG_UNLOCK:
5198 			hold_lkb(lkb);
5199 			memset(ms_stub, 0, sizeof(struct dlm_message));
5200 			ms_stub->m_flags = DLM_IFL_STUB_MS;
5201 			ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
5202 			ms_stub->m_result = stub_unlock_result;
5203 			ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5204 			_receive_unlock_reply(lkb, ms_stub);
5205 			dlm_put_lkb(lkb);
5206 			break;
5207 
5208 		case DLM_MSG_CANCEL:
5209 			hold_lkb(lkb);
5210 			memset(ms_stub, 0, sizeof(struct dlm_message));
5211 			ms_stub->m_flags = DLM_IFL_STUB_MS;
5212 			ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
5213 			ms_stub->m_result = stub_cancel_result;
5214 			ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5215 			_receive_cancel_reply(lkb, ms_stub);
5216 			dlm_put_lkb(lkb);
5217 			break;
5218 
5219 		default:
5220 			log_error(ls, "invalid lkb wait_type %d %d",
5221 				  lkb->lkb_wait_type, wait_type);
5222 		}
5223 		schedule();
5224 	}
5225 	mutex_unlock(&ls->ls_waiters_mutex);
5226 	kfree(ms_stub);
5227 }
5228 
5229 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5230 {
5231 	struct dlm_lkb *lkb;
5232 	int found = 0;
5233 
5234 	mutex_lock(&ls->ls_waiters_mutex);
5235 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
5236 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
5237 			hold_lkb(lkb);
5238 			found = 1;
5239 			break;
5240 		}
5241 	}
5242 	mutex_unlock(&ls->ls_waiters_mutex);
5243 
5244 	if (!found)
5245 		lkb = NULL;
5246 	return lkb;
5247 }
5248 
5249 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
5250    master or dir-node for r.  Processing the lkb may result in it being placed
5251    back on waiters. */
5252 
5253 /* We do this after normal locking has been enabled and any saved messages
5254    (in requestqueue) have been processed.  We should be confident that at
5255    this point we won't get or process a reply to any of these waiting
5256    operations.  But, new ops may be coming in on the rsbs/locks here from
5257    userspace or remotely. */
5258 
5259 /* there may have been an overlap unlock/cancel prior to recovery or after
5260    recovery.  if before, the lkb may still have a pos wait_count; if after, the
5261    overlap flag would just have been set and nothing new sent.  we can be
5262    confident here than any replies to either the initial op or overlap ops
5263    prior to recovery have been received. */
5264 
5265 int dlm_recover_waiters_post(struct dlm_ls *ls)
5266 {
5267 	struct dlm_lkb *lkb;
5268 	struct dlm_rsb *r;
5269 	int error = 0, mstype, err, oc, ou;
5270 
5271 	while (1) {
5272 		if (dlm_locking_stopped(ls)) {
5273 			log_debug(ls, "recover_waiters_post aborted");
5274 			error = -EINTR;
5275 			break;
5276 		}
5277 
5278 		lkb = find_resend_waiter(ls);
5279 		if (!lkb)
5280 			break;
5281 
5282 		r = lkb->lkb_resource;
5283 		hold_rsb(r);
5284 		lock_rsb(r);
5285 
5286 		mstype = lkb->lkb_wait_type;
5287 		oc = is_overlap_cancel(lkb);
5288 		ou = is_overlap_unlock(lkb);
5289 		err = 0;
5290 
5291 		log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5292 			  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5293 			  "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5294 			  r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5295 			  dlm_dir_nodeid(r), oc, ou);
5296 
5297 		/* At this point we assume that we won't get a reply to any
5298 		   previous op or overlap op on this lock.  First, do a big
5299 		   remove_from_waiters() for all previous ops. */
5300 
5301 		lkb->lkb_flags &= ~DLM_IFL_RESEND;
5302 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
5303 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
5304 		lkb->lkb_wait_type = 0;
5305 		lkb->lkb_wait_count = 0;
5306 		mutex_lock(&ls->ls_waiters_mutex);
5307 		list_del_init(&lkb->lkb_wait_reply);
5308 		mutex_unlock(&ls->ls_waiters_mutex);
5309 		unhold_lkb(lkb); /* for waiters list */
5310 
5311 		if (oc || ou) {
5312 			/* do an unlock or cancel instead of resending */
5313 			switch (mstype) {
5314 			case DLM_MSG_LOOKUP:
5315 			case DLM_MSG_REQUEST:
5316 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5317 							-DLM_ECANCEL);
5318 				unhold_lkb(lkb); /* undoes create_lkb() */
5319 				break;
5320 			case DLM_MSG_CONVERT:
5321 				if (oc) {
5322 					queue_cast(r, lkb, -DLM_ECANCEL);
5323 				} else {
5324 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5325 					_unlock_lock(r, lkb);
5326 				}
5327 				break;
5328 			default:
5329 				err = 1;
5330 			}
5331 		} else {
5332 			switch (mstype) {
5333 			case DLM_MSG_LOOKUP:
5334 			case DLM_MSG_REQUEST:
5335 				_request_lock(r, lkb);
5336 				if (is_master(r))
5337 					confirm_master(r, 0);
5338 				break;
5339 			case DLM_MSG_CONVERT:
5340 				_convert_lock(r, lkb);
5341 				break;
5342 			default:
5343 				err = 1;
5344 			}
5345 		}
5346 
5347 		if (err) {
5348 			log_error(ls, "waiter %x msg %d r_nodeid %d "
5349 				  "dir_nodeid %d overlap %d %d",
5350 				  lkb->lkb_id, mstype, r->res_nodeid,
5351 				  dlm_dir_nodeid(r), oc, ou);
5352 		}
5353 		unlock_rsb(r);
5354 		put_rsb(r);
5355 		dlm_put_lkb(lkb);
5356 	}
5357 
5358 	return error;
5359 }
5360 
5361 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5362 			      struct list_head *list)
5363 {
5364 	struct dlm_lkb *lkb, *safe;
5365 
5366 	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5367 		if (!is_master_copy(lkb))
5368 			continue;
5369 
5370 		/* don't purge lkbs we've added in recover_master_copy for
5371 		   the current recovery seq */
5372 
5373 		if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5374 			continue;
5375 
5376 		del_lkb(r, lkb);
5377 
5378 		/* this put should free the lkb */
5379 		if (!dlm_put_lkb(lkb))
5380 			log_error(ls, "purged mstcpy lkb not released");
5381 	}
5382 }
5383 
5384 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5385 {
5386 	struct dlm_ls *ls = r->res_ls;
5387 
5388 	purge_mstcpy_list(ls, r, &r->res_grantqueue);
5389 	purge_mstcpy_list(ls, r, &r->res_convertqueue);
5390 	purge_mstcpy_list(ls, r, &r->res_waitqueue);
5391 }
5392 
5393 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5394 			    struct list_head *list,
5395 			    int nodeid_gone, unsigned int *count)
5396 {
5397 	struct dlm_lkb *lkb, *safe;
5398 
5399 	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5400 		if (!is_master_copy(lkb))
5401 			continue;
5402 
5403 		if ((lkb->lkb_nodeid == nodeid_gone) ||
5404 		    dlm_is_removed(ls, lkb->lkb_nodeid)) {
5405 
5406 			/* tell recover_lvb to invalidate the lvb
5407 			   because a node holding EX/PW failed */
5408 			if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5409 			    (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5410 				rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5411 			}
5412 
5413 			del_lkb(r, lkb);
5414 
5415 			/* this put should free the lkb */
5416 			if (!dlm_put_lkb(lkb))
5417 				log_error(ls, "purged dead lkb not released");
5418 
5419 			rsb_set_flag(r, RSB_RECOVER_GRANT);
5420 
5421 			(*count)++;
5422 		}
5423 	}
5424 }
5425 
5426 /* Get rid of locks held by nodes that are gone. */
5427 
5428 void dlm_recover_purge(struct dlm_ls *ls)
5429 {
5430 	struct dlm_rsb *r;
5431 	struct dlm_member *memb;
5432 	int nodes_count = 0;
5433 	int nodeid_gone = 0;
5434 	unsigned int lkb_count = 0;
5435 
5436 	/* cache one removed nodeid to optimize the common
5437 	   case of a single node removed */
5438 
5439 	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5440 		nodes_count++;
5441 		nodeid_gone = memb->nodeid;
5442 	}
5443 
5444 	if (!nodes_count)
5445 		return;
5446 
5447 	down_write(&ls->ls_root_sem);
5448 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5449 		hold_rsb(r);
5450 		lock_rsb(r);
5451 		if (is_master(r)) {
5452 			purge_dead_list(ls, r, &r->res_grantqueue,
5453 					nodeid_gone, &lkb_count);
5454 			purge_dead_list(ls, r, &r->res_convertqueue,
5455 					nodeid_gone, &lkb_count);
5456 			purge_dead_list(ls, r, &r->res_waitqueue,
5457 					nodeid_gone, &lkb_count);
5458 		}
5459 		unlock_rsb(r);
5460 		unhold_rsb(r);
5461 		cond_resched();
5462 	}
5463 	up_write(&ls->ls_root_sem);
5464 
5465 	if (lkb_count)
5466 		log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5467 			  lkb_count, nodes_count);
5468 }
5469 
5470 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5471 {
5472 	struct rb_node *n;
5473 	struct dlm_rsb *r;
5474 
5475 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
5476 	for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5477 		r = rb_entry(n, struct dlm_rsb, res_hashnode);
5478 
5479 		if (!rsb_flag(r, RSB_RECOVER_GRANT))
5480 			continue;
5481 		if (!is_master(r)) {
5482 			rsb_clear_flag(r, RSB_RECOVER_GRANT);
5483 			continue;
5484 		}
5485 		hold_rsb(r);
5486 		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5487 		return r;
5488 	}
5489 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5490 	return NULL;
5491 }
5492 
5493 /*
5494  * Attempt to grant locks on resources that we are the master of.
5495  * Locks may have become grantable during recovery because locks
5496  * from departed nodes have been purged (or not rebuilt), allowing
5497  * previously blocked locks to now be granted.  The subset of rsb's
5498  * we are interested in are those with lkb's on either the convert or
5499  * waiting queues.
5500  *
5501  * Simplest would be to go through each master rsb and check for non-empty
5502  * convert or waiting queues, and attempt to grant on those rsbs.
5503  * Checking the queues requires lock_rsb, though, for which we'd need
5504  * to release the rsbtbl lock.  This would make iterating through all
5505  * rsb's very inefficient.  So, we rely on earlier recovery routines
5506  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5507  * locks for.
5508  */
5509 
5510 void dlm_recover_grant(struct dlm_ls *ls)
5511 {
5512 	struct dlm_rsb *r;
5513 	int bucket = 0;
5514 	unsigned int count = 0;
5515 	unsigned int rsb_count = 0;
5516 	unsigned int lkb_count = 0;
5517 
5518 	while (1) {
5519 		r = find_grant_rsb(ls, bucket);
5520 		if (!r) {
5521 			if (bucket == ls->ls_rsbtbl_size - 1)
5522 				break;
5523 			bucket++;
5524 			continue;
5525 		}
5526 		rsb_count++;
5527 		count = 0;
5528 		lock_rsb(r);
5529 		/* the RECOVER_GRANT flag is checked in the grant path */
5530 		grant_pending_locks(r, &count);
5531 		rsb_clear_flag(r, RSB_RECOVER_GRANT);
5532 		lkb_count += count;
5533 		confirm_master(r, 0);
5534 		unlock_rsb(r);
5535 		put_rsb(r);
5536 		cond_resched();
5537 	}
5538 
5539 	if (lkb_count)
5540 		log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5541 			  lkb_count, rsb_count);
5542 }
5543 
5544 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5545 					 uint32_t remid)
5546 {
5547 	struct dlm_lkb *lkb;
5548 
5549 	list_for_each_entry(lkb, head, lkb_statequeue) {
5550 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5551 			return lkb;
5552 	}
5553 	return NULL;
5554 }
5555 
5556 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5557 				    uint32_t remid)
5558 {
5559 	struct dlm_lkb *lkb;
5560 
5561 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5562 	if (lkb)
5563 		return lkb;
5564 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5565 	if (lkb)
5566 		return lkb;
5567 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5568 	if (lkb)
5569 		return lkb;
5570 	return NULL;
5571 }
5572 
5573 /* needs at least dlm_rcom + rcom_lock */
5574 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5575 				  struct dlm_rsb *r, struct dlm_rcom *rc)
5576 {
5577 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5578 
5579 	lkb->lkb_nodeid = rc->rc_header.h_nodeid;
5580 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5581 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5582 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5583 	lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
5584 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
5585 	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5586 	lkb->lkb_rqmode = rl->rl_rqmode;
5587 	lkb->lkb_grmode = rl->rl_grmode;
5588 	/* don't set lkb_status because add_lkb wants to itself */
5589 
5590 	lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5591 	lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5592 
5593 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5594 		int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
5595 			 sizeof(struct rcom_lock);
5596 		if (lvblen > ls->ls_lvblen)
5597 			return -EINVAL;
5598 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5599 		if (!lkb->lkb_lvbptr)
5600 			return -ENOMEM;
5601 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5602 	}
5603 
5604 	/* Conversions between PR and CW (middle modes) need special handling.
5605 	   The real granted mode of these converting locks cannot be determined
5606 	   until all locks have been rebuilt on the rsb (recover_conversion) */
5607 
5608 	if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5609 	    middle_conversion(lkb)) {
5610 		rl->rl_status = DLM_LKSTS_CONVERT;
5611 		lkb->lkb_grmode = DLM_LOCK_IV;
5612 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
5613 	}
5614 
5615 	return 0;
5616 }
5617 
5618 /* This lkb may have been recovered in a previous aborted recovery so we need
5619    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5620    If so we just send back a standard reply.  If not, we create a new lkb with
5621    the given values and send back our lkid.  We send back our lkid by sending
5622    back the rcom_lock struct we got but with the remid field filled in. */
5623 
5624 /* needs at least dlm_rcom + rcom_lock */
5625 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5626 {
5627 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5628 	struct dlm_rsb *r;
5629 	struct dlm_lkb *lkb;
5630 	uint32_t remid = 0;
5631 	int from_nodeid = rc->rc_header.h_nodeid;
5632 	int error;
5633 
5634 	if (rl->rl_parent_lkid) {
5635 		error = -EOPNOTSUPP;
5636 		goto out;
5637 	}
5638 
5639 	remid = le32_to_cpu(rl->rl_lkid);
5640 
5641 	/* In general we expect the rsb returned to be R_MASTER, but we don't
5642 	   have to require it.  Recovery of masters on one node can overlap
5643 	   recovery of locks on another node, so one node can send us MSTCPY
5644 	   locks before we've made ourselves master of this rsb.  We can still
5645 	   add new MSTCPY locks that we receive here without any harm; when
5646 	   we make ourselves master, dlm_recover_masters() won't touch the
5647 	   MSTCPY locks we've received early. */
5648 
5649 	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5650 			 from_nodeid, R_RECEIVE_RECOVER, &r);
5651 	if (error)
5652 		goto out;
5653 
5654 	lock_rsb(r);
5655 
5656 	if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5657 		log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5658 			  from_nodeid, remid);
5659 		error = -EBADR;
5660 		goto out_unlock;
5661 	}
5662 
5663 	lkb = search_remid(r, from_nodeid, remid);
5664 	if (lkb) {
5665 		error = -EEXIST;
5666 		goto out_remid;
5667 	}
5668 
5669 	error = create_lkb(ls, &lkb);
5670 	if (error)
5671 		goto out_unlock;
5672 
5673 	error = receive_rcom_lock_args(ls, lkb, r, rc);
5674 	if (error) {
5675 		__put_lkb(ls, lkb);
5676 		goto out_unlock;
5677 	}
5678 
5679 	attach_lkb(r, lkb);
5680 	add_lkb(r, lkb, rl->rl_status);
5681 	error = 0;
5682 	ls->ls_recover_locks_in++;
5683 
5684 	if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5685 		rsb_set_flag(r, RSB_RECOVER_GRANT);
5686 
5687  out_remid:
5688 	/* this is the new value returned to the lock holder for
5689 	   saving in its process-copy lkb */
5690 	rl->rl_remid = cpu_to_le32(lkb->lkb_id);
5691 
5692 	lkb->lkb_recover_seq = ls->ls_recover_seq;
5693 
5694  out_unlock:
5695 	unlock_rsb(r);
5696 	put_rsb(r);
5697  out:
5698 	if (error && error != -EEXIST)
5699 		log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5700 			  from_nodeid, remid, error);
5701 	rl->rl_result = cpu_to_le32(error);
5702 	return error;
5703 }
5704 
5705 /* needs at least dlm_rcom + rcom_lock */
5706 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5707 {
5708 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5709 	struct dlm_rsb *r;
5710 	struct dlm_lkb *lkb;
5711 	uint32_t lkid, remid;
5712 	int error, result;
5713 
5714 	lkid = le32_to_cpu(rl->rl_lkid);
5715 	remid = le32_to_cpu(rl->rl_remid);
5716 	result = le32_to_cpu(rl->rl_result);
5717 
5718 	error = find_lkb(ls, lkid, &lkb);
5719 	if (error) {
5720 		log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5721 			  lkid, rc->rc_header.h_nodeid, remid, result);
5722 		return error;
5723 	}
5724 
5725 	r = lkb->lkb_resource;
5726 	hold_rsb(r);
5727 	lock_rsb(r);
5728 
5729 	if (!is_process_copy(lkb)) {
5730 		log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5731 			  lkid, rc->rc_header.h_nodeid, remid, result);
5732 		dlm_dump_rsb(r);
5733 		unlock_rsb(r);
5734 		put_rsb(r);
5735 		dlm_put_lkb(lkb);
5736 		return -EINVAL;
5737 	}
5738 
5739 	switch (result) {
5740 	case -EBADR:
5741 		/* There's a chance the new master received our lock before
5742 		   dlm_recover_master_reply(), this wouldn't happen if we did
5743 		   a barrier between recover_masters and recover_locks. */
5744 
5745 		log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5746 			  lkid, rc->rc_header.h_nodeid, remid, result);
5747 
5748 		dlm_send_rcom_lock(r, lkb);
5749 		goto out;
5750 	case -EEXIST:
5751 	case 0:
5752 		lkb->lkb_remid = remid;
5753 		break;
5754 	default:
5755 		log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5756 			  lkid, rc->rc_header.h_nodeid, remid, result);
5757 	}
5758 
5759 	/* an ack for dlm_recover_locks() which waits for replies from
5760 	   all the locks it sends to new masters */
5761 	dlm_recovered_lock(r);
5762  out:
5763 	unlock_rsb(r);
5764 	put_rsb(r);
5765 	dlm_put_lkb(lkb);
5766 
5767 	return 0;
5768 }
5769 
5770 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5771 		     int mode, uint32_t flags, void *name, unsigned int namelen,
5772 		     unsigned long timeout_cs)
5773 {
5774 	struct dlm_lkb *lkb;
5775 	struct dlm_args args;
5776 	int error;
5777 
5778 	dlm_lock_recovery(ls);
5779 
5780 	error = create_lkb(ls, &lkb);
5781 	if (error) {
5782 		kfree(ua);
5783 		goto out;
5784 	}
5785 
5786 	if (flags & DLM_LKF_VALBLK) {
5787 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5788 		if (!ua->lksb.sb_lvbptr) {
5789 			kfree(ua);
5790 			__put_lkb(ls, lkb);
5791 			error = -ENOMEM;
5792 			goto out;
5793 		}
5794 	}
5795 
5796 	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
5797 	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
5798 	   lock and that lkb_astparam is the dlm_user_args structure. */
5799 
5800 	error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
5801 			      fake_astfn, ua, fake_bastfn, &args);
5802 	lkb->lkb_flags |= DLM_IFL_USER;
5803 
5804 	if (error) {
5805 		__put_lkb(ls, lkb);
5806 		goto out;
5807 	}
5808 
5809 	error = request_lock(ls, lkb, name, namelen, &args);
5810 
5811 	switch (error) {
5812 	case 0:
5813 		break;
5814 	case -EINPROGRESS:
5815 		error = 0;
5816 		break;
5817 	case -EAGAIN:
5818 		error = 0;
5819 		/* fall through */
5820 	default:
5821 		__put_lkb(ls, lkb);
5822 		goto out;
5823 	}
5824 
5825 	/* add this new lkb to the per-process list of locks */
5826 	spin_lock(&ua->proc->locks_spin);
5827 	hold_lkb(lkb);
5828 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5829 	spin_unlock(&ua->proc->locks_spin);
5830  out:
5831 	dlm_unlock_recovery(ls);
5832 	return error;
5833 }
5834 
5835 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5836 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
5837 		     unsigned long timeout_cs)
5838 {
5839 	struct dlm_lkb *lkb;
5840 	struct dlm_args args;
5841 	struct dlm_user_args *ua;
5842 	int error;
5843 
5844 	dlm_lock_recovery(ls);
5845 
5846 	error = find_lkb(ls, lkid, &lkb);
5847 	if (error)
5848 		goto out;
5849 
5850 	/* user can change the params on its lock when it converts it, or
5851 	   add an lvb that didn't exist before */
5852 
5853 	ua = lkb->lkb_ua;
5854 
5855 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5856 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5857 		if (!ua->lksb.sb_lvbptr) {
5858 			error = -ENOMEM;
5859 			goto out_put;
5860 		}
5861 	}
5862 	if (lvb_in && ua->lksb.sb_lvbptr)
5863 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5864 
5865 	ua->xid = ua_tmp->xid;
5866 	ua->castparam = ua_tmp->castparam;
5867 	ua->castaddr = ua_tmp->castaddr;
5868 	ua->bastparam = ua_tmp->bastparam;
5869 	ua->bastaddr = ua_tmp->bastaddr;
5870 	ua->user_lksb = ua_tmp->user_lksb;
5871 
5872 	error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
5873 			      fake_astfn, ua, fake_bastfn, &args);
5874 	if (error)
5875 		goto out_put;
5876 
5877 	error = convert_lock(ls, lkb, &args);
5878 
5879 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5880 		error = 0;
5881  out_put:
5882 	dlm_put_lkb(lkb);
5883  out:
5884 	dlm_unlock_recovery(ls);
5885 	kfree(ua_tmp);
5886 	return error;
5887 }
5888 
5889 /*
5890  * The caller asks for an orphan lock on a given resource with a given mode.
5891  * If a matching lock exists, it's moved to the owner's list of locks and
5892  * the lkid is returned.
5893  */
5894 
5895 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5896 		     int mode, uint32_t flags, void *name, unsigned int namelen,
5897 		     unsigned long timeout_cs, uint32_t *lkid)
5898 {
5899 	struct dlm_lkb *lkb;
5900 	struct dlm_user_args *ua;
5901 	int found_other_mode = 0;
5902 	int found = 0;
5903 	int rv = 0;
5904 
5905 	mutex_lock(&ls->ls_orphans_mutex);
5906 	list_for_each_entry(lkb, &ls->ls_orphans, lkb_ownqueue) {
5907 		if (lkb->lkb_resource->res_length != namelen)
5908 			continue;
5909 		if (memcmp(lkb->lkb_resource->res_name, name, namelen))
5910 			continue;
5911 		if (lkb->lkb_grmode != mode) {
5912 			found_other_mode = 1;
5913 			continue;
5914 		}
5915 
5916 		found = 1;
5917 		list_del_init(&lkb->lkb_ownqueue);
5918 		lkb->lkb_flags &= ~DLM_IFL_ORPHAN;
5919 		*lkid = lkb->lkb_id;
5920 		break;
5921 	}
5922 	mutex_unlock(&ls->ls_orphans_mutex);
5923 
5924 	if (!found && found_other_mode) {
5925 		rv = -EAGAIN;
5926 		goto out;
5927 	}
5928 
5929 	if (!found) {
5930 		rv = -ENOENT;
5931 		goto out;
5932 	}
5933 
5934 	lkb->lkb_exflags = flags;
5935 	lkb->lkb_ownpid = (int) current->pid;
5936 
5937 	ua = lkb->lkb_ua;
5938 
5939 	ua->proc = ua_tmp->proc;
5940 	ua->xid = ua_tmp->xid;
5941 	ua->castparam = ua_tmp->castparam;
5942 	ua->castaddr = ua_tmp->castaddr;
5943 	ua->bastparam = ua_tmp->bastparam;
5944 	ua->bastaddr = ua_tmp->bastaddr;
5945 	ua->user_lksb = ua_tmp->user_lksb;
5946 
5947 	/*
5948 	 * The lkb reference from the ls_orphans list was not
5949 	 * removed above, and is now considered the reference
5950 	 * for the proc locks list.
5951 	 */
5952 
5953 	spin_lock(&ua->proc->locks_spin);
5954 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5955 	spin_unlock(&ua->proc->locks_spin);
5956  out:
5957 	kfree(ua_tmp);
5958 	return rv;
5959 }
5960 
5961 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5962 		    uint32_t flags, uint32_t lkid, char *lvb_in)
5963 {
5964 	struct dlm_lkb *lkb;
5965 	struct dlm_args args;
5966 	struct dlm_user_args *ua;
5967 	int error;
5968 
5969 	dlm_lock_recovery(ls);
5970 
5971 	error = find_lkb(ls, lkid, &lkb);
5972 	if (error)
5973 		goto out;
5974 
5975 	ua = lkb->lkb_ua;
5976 
5977 	if (lvb_in && ua->lksb.sb_lvbptr)
5978 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5979 	if (ua_tmp->castparam)
5980 		ua->castparam = ua_tmp->castparam;
5981 	ua->user_lksb = ua_tmp->user_lksb;
5982 
5983 	error = set_unlock_args(flags, ua, &args);
5984 	if (error)
5985 		goto out_put;
5986 
5987 	error = unlock_lock(ls, lkb, &args);
5988 
5989 	if (error == -DLM_EUNLOCK)
5990 		error = 0;
5991 	/* from validate_unlock_args() */
5992 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5993 		error = 0;
5994 	if (error)
5995 		goto out_put;
5996 
5997 	spin_lock(&ua->proc->locks_spin);
5998 	/* dlm_user_add_cb() may have already taken lkb off the proc list */
5999 	if (!list_empty(&lkb->lkb_ownqueue))
6000 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
6001 	spin_unlock(&ua->proc->locks_spin);
6002  out_put:
6003 	dlm_put_lkb(lkb);
6004  out:
6005 	dlm_unlock_recovery(ls);
6006 	kfree(ua_tmp);
6007 	return error;
6008 }
6009 
6010 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
6011 		    uint32_t flags, uint32_t lkid)
6012 {
6013 	struct dlm_lkb *lkb;
6014 	struct dlm_args args;
6015 	struct dlm_user_args *ua;
6016 	int error;
6017 
6018 	dlm_lock_recovery(ls);
6019 
6020 	error = find_lkb(ls, lkid, &lkb);
6021 	if (error)
6022 		goto out;
6023 
6024 	ua = lkb->lkb_ua;
6025 	if (ua_tmp->castparam)
6026 		ua->castparam = ua_tmp->castparam;
6027 	ua->user_lksb = ua_tmp->user_lksb;
6028 
6029 	error = set_unlock_args(flags, ua, &args);
6030 	if (error)
6031 		goto out_put;
6032 
6033 	error = cancel_lock(ls, lkb, &args);
6034 
6035 	if (error == -DLM_ECANCEL)
6036 		error = 0;
6037 	/* from validate_unlock_args() */
6038 	if (error == -EBUSY)
6039 		error = 0;
6040  out_put:
6041 	dlm_put_lkb(lkb);
6042  out:
6043 	dlm_unlock_recovery(ls);
6044 	kfree(ua_tmp);
6045 	return error;
6046 }
6047 
6048 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6049 {
6050 	struct dlm_lkb *lkb;
6051 	struct dlm_args args;
6052 	struct dlm_user_args *ua;
6053 	struct dlm_rsb *r;
6054 	int error;
6055 
6056 	dlm_lock_recovery(ls);
6057 
6058 	error = find_lkb(ls, lkid, &lkb);
6059 	if (error)
6060 		goto out;
6061 
6062 	ua = lkb->lkb_ua;
6063 
6064 	error = set_unlock_args(flags, ua, &args);
6065 	if (error)
6066 		goto out_put;
6067 
6068 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6069 
6070 	r = lkb->lkb_resource;
6071 	hold_rsb(r);
6072 	lock_rsb(r);
6073 
6074 	error = validate_unlock_args(lkb, &args);
6075 	if (error)
6076 		goto out_r;
6077 	lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
6078 
6079 	error = _cancel_lock(r, lkb);
6080  out_r:
6081 	unlock_rsb(r);
6082 	put_rsb(r);
6083 
6084 	if (error == -DLM_ECANCEL)
6085 		error = 0;
6086 	/* from validate_unlock_args() */
6087 	if (error == -EBUSY)
6088 		error = 0;
6089  out_put:
6090 	dlm_put_lkb(lkb);
6091  out:
6092 	dlm_unlock_recovery(ls);
6093 	return error;
6094 }
6095 
6096 /* lkb's that are removed from the waiters list by revert are just left on the
6097    orphans list with the granted orphan locks, to be freed by purge */
6098 
6099 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6100 {
6101 	struct dlm_args args;
6102 	int error;
6103 
6104 	hold_lkb(lkb); /* reference for the ls_orphans list */
6105 	mutex_lock(&ls->ls_orphans_mutex);
6106 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6107 	mutex_unlock(&ls->ls_orphans_mutex);
6108 
6109 	set_unlock_args(0, lkb->lkb_ua, &args);
6110 
6111 	error = cancel_lock(ls, lkb, &args);
6112 	if (error == -DLM_ECANCEL)
6113 		error = 0;
6114 	return error;
6115 }
6116 
6117 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6118    granted.  Regardless of what rsb queue the lock is on, it's removed and
6119    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
6120    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6121 
6122 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6123 {
6124 	struct dlm_args args;
6125 	int error;
6126 
6127 	set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6128 			lkb->lkb_ua, &args);
6129 
6130 	error = unlock_lock(ls, lkb, &args);
6131 	if (error == -DLM_EUNLOCK)
6132 		error = 0;
6133 	return error;
6134 }
6135 
6136 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6137    (which does lock_rsb) due to deadlock with receiving a message that does
6138    lock_rsb followed by dlm_user_add_cb() */
6139 
6140 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6141 				     struct dlm_user_proc *proc)
6142 {
6143 	struct dlm_lkb *lkb = NULL;
6144 
6145 	mutex_lock(&ls->ls_clear_proc_locks);
6146 	if (list_empty(&proc->locks))
6147 		goto out;
6148 
6149 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6150 	list_del_init(&lkb->lkb_ownqueue);
6151 
6152 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6153 		lkb->lkb_flags |= DLM_IFL_ORPHAN;
6154 	else
6155 		lkb->lkb_flags |= DLM_IFL_DEAD;
6156  out:
6157 	mutex_unlock(&ls->ls_clear_proc_locks);
6158 	return lkb;
6159 }
6160 
6161 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6162    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6163    which we clear here. */
6164 
6165 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6166    list, and no more device_writes should add lkb's to proc->locks list; so we
6167    shouldn't need to take asts_spin or locks_spin here.  this assumes that
6168    device reads/writes/closes are serialized -- FIXME: we may need to serialize
6169    them ourself. */
6170 
6171 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6172 {
6173 	struct dlm_lkb *lkb, *safe;
6174 
6175 	dlm_lock_recovery(ls);
6176 
6177 	while (1) {
6178 		lkb = del_proc_lock(ls, proc);
6179 		if (!lkb)
6180 			break;
6181 		del_timeout(lkb);
6182 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6183 			orphan_proc_lock(ls, lkb);
6184 		else
6185 			unlock_proc_lock(ls, lkb);
6186 
6187 		/* this removes the reference for the proc->locks list
6188 		   added by dlm_user_request, it may result in the lkb
6189 		   being freed */
6190 
6191 		dlm_put_lkb(lkb);
6192 	}
6193 
6194 	mutex_lock(&ls->ls_clear_proc_locks);
6195 
6196 	/* in-progress unlocks */
6197 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6198 		list_del_init(&lkb->lkb_ownqueue);
6199 		lkb->lkb_flags |= DLM_IFL_DEAD;
6200 		dlm_put_lkb(lkb);
6201 	}
6202 
6203 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6204 		memset(&lkb->lkb_callbacks, 0,
6205 		       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6206 		list_del_init(&lkb->lkb_cb_list);
6207 		dlm_put_lkb(lkb);
6208 	}
6209 
6210 	mutex_unlock(&ls->ls_clear_proc_locks);
6211 	dlm_unlock_recovery(ls);
6212 }
6213 
6214 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6215 {
6216 	struct dlm_lkb *lkb, *safe;
6217 
6218 	while (1) {
6219 		lkb = NULL;
6220 		spin_lock(&proc->locks_spin);
6221 		if (!list_empty(&proc->locks)) {
6222 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
6223 					 lkb_ownqueue);
6224 			list_del_init(&lkb->lkb_ownqueue);
6225 		}
6226 		spin_unlock(&proc->locks_spin);
6227 
6228 		if (!lkb)
6229 			break;
6230 
6231 		lkb->lkb_flags |= DLM_IFL_DEAD;
6232 		unlock_proc_lock(ls, lkb);
6233 		dlm_put_lkb(lkb); /* ref from proc->locks list */
6234 	}
6235 
6236 	spin_lock(&proc->locks_spin);
6237 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6238 		list_del_init(&lkb->lkb_ownqueue);
6239 		lkb->lkb_flags |= DLM_IFL_DEAD;
6240 		dlm_put_lkb(lkb);
6241 	}
6242 	spin_unlock(&proc->locks_spin);
6243 
6244 	spin_lock(&proc->asts_spin);
6245 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6246 		memset(&lkb->lkb_callbacks, 0,
6247 		       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6248 		list_del_init(&lkb->lkb_cb_list);
6249 		dlm_put_lkb(lkb);
6250 	}
6251 	spin_unlock(&proc->asts_spin);
6252 }
6253 
6254 /* pid of 0 means purge all orphans */
6255 
6256 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6257 {
6258 	struct dlm_lkb *lkb, *safe;
6259 
6260 	mutex_lock(&ls->ls_orphans_mutex);
6261 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6262 		if (pid && lkb->lkb_ownpid != pid)
6263 			continue;
6264 		unlock_proc_lock(ls, lkb);
6265 		list_del_init(&lkb->lkb_ownqueue);
6266 		dlm_put_lkb(lkb);
6267 	}
6268 	mutex_unlock(&ls->ls_orphans_mutex);
6269 }
6270 
6271 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6272 {
6273 	struct dlm_message *ms;
6274 	struct dlm_mhandle *mh;
6275 	int error;
6276 
6277 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6278 				DLM_MSG_PURGE, &ms, &mh);
6279 	if (error)
6280 		return error;
6281 	ms->m_nodeid = nodeid;
6282 	ms->m_pid = pid;
6283 
6284 	return send_message(mh, ms);
6285 }
6286 
6287 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6288 		   int nodeid, int pid)
6289 {
6290 	int error = 0;
6291 
6292 	if (nodeid && (nodeid != dlm_our_nodeid())) {
6293 		error = send_purge(ls, nodeid, pid);
6294 	} else {
6295 		dlm_lock_recovery(ls);
6296 		if (pid == current->pid)
6297 			purge_proc_locks(ls, proc);
6298 		else
6299 			do_purge(ls, nodeid, pid);
6300 		dlm_unlock_recovery(ls);
6301 	}
6302 	return error;
6303 }
6304 
6305