xref: /openbmc/linux/fs/dlm/lock.c (revision 2dd6532e)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10 
11 /* Central locking logic has four stages:
12 
13    dlm_lock()
14    dlm_unlock()
15 
16    request_lock(ls, lkb)
17    convert_lock(ls, lkb)
18    unlock_lock(ls, lkb)
19    cancel_lock(ls, lkb)
20 
21    _request_lock(r, lkb)
22    _convert_lock(r, lkb)
23    _unlock_lock(r, lkb)
24    _cancel_lock(r, lkb)
25 
26    do_request(r, lkb)
27    do_convert(r, lkb)
28    do_unlock(r, lkb)
29    do_cancel(r, lkb)
30 
31    Stage 1 (lock, unlock) is mainly about checking input args and
32    splitting into one of the four main operations:
33 
34        dlm_lock          = request_lock
35        dlm_lock+CONVERT  = convert_lock
36        dlm_unlock        = unlock_lock
37        dlm_unlock+CANCEL = cancel_lock
38 
39    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40    provided to the next stage.
41 
42    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43    When remote, it calls send_xxxx(), when local it calls do_xxxx().
44 
45    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
46    given rsb and lkb and queues callbacks.
47 
48    For remote operations, send_xxxx() results in the corresponding do_xxxx()
49    function being executed on the remote node.  The connecting send/receive
50    calls on local (L) and remote (R) nodes:
51 
52    L: send_xxxx()              ->  R: receive_xxxx()
53                                    R: do_xxxx()
54    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
55 */
56 #include <trace/events/dlm.h>
57 
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "midcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77 
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 				    struct dlm_message *ms);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void del_timeout(struct dlm_lkb *lkb);
93 static void toss_rsb(struct kref *kref);
94 
95 /*
96  * Lock compatibilty matrix - thanks Steve
97  * UN = Unlocked state. Not really a state, used as a flag
98  * PD = Padding. Used to make the matrix a nice power of two in size
99  * Other states are the same as the VMS DLM.
100  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
101  */
102 
103 static const int __dlm_compat_matrix[8][8] = {
104       /* UN NL CR CW PR PW EX PD */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
106         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
107         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
108         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
109         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
110         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
111         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
112         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
113 };
114 
115 /*
116  * This defines the direction of transfer of LVB data.
117  * Granted mode is the row; requested mode is the column.
118  * Usage: matrix[grmode+1][rqmode+1]
119  * 1 = LVB is returned to the caller
120  * 0 = LVB is written to the resource
121  * -1 = nothing happens to the LVB
122  */
123 
124 const int dlm_lvb_operations[8][8] = {
125         /* UN   NL  CR  CW  PR  PW  EX  PD*/
126         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
127         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
128         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
129         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
130         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
131         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
133         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
134 };
135 
136 #define modes_compat(gr, rq) \
137 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
138 
139 int dlm_modes_compat(int mode1, int mode2)
140 {
141 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
142 }
143 
144 /*
145  * Compatibility matrix for conversions with QUECVT set.
146  * Granted mode is the row; requested mode is the column.
147  * Usage: matrix[grmode+1][rqmode+1]
148  */
149 
150 static const int __quecvt_compat_matrix[8][8] = {
151       /* UN NL CR CW PR PW EX PD */
152         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
153         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
154         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
155         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
156         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
157         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
158         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
159         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
160 };
161 
162 void dlm_print_lkb(struct dlm_lkb *lkb)
163 {
164 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
165 	       "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
166 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
167 	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
168 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
169 	       (unsigned long long)lkb->lkb_recover_seq);
170 }
171 
172 static void dlm_print_rsb(struct dlm_rsb *r)
173 {
174 	printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
175 	       "rlc %d name %s\n",
176 	       r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
177 	       r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
178 	       r->res_name);
179 }
180 
181 void dlm_dump_rsb(struct dlm_rsb *r)
182 {
183 	struct dlm_lkb *lkb;
184 
185 	dlm_print_rsb(r);
186 
187 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
188 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
189 	printk(KERN_ERR "rsb lookup list\n");
190 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
191 		dlm_print_lkb(lkb);
192 	printk(KERN_ERR "rsb grant queue:\n");
193 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
194 		dlm_print_lkb(lkb);
195 	printk(KERN_ERR "rsb convert queue:\n");
196 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
197 		dlm_print_lkb(lkb);
198 	printk(KERN_ERR "rsb wait queue:\n");
199 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
200 		dlm_print_lkb(lkb);
201 }
202 
203 /* Threads cannot use the lockspace while it's being recovered */
204 
205 static inline void dlm_lock_recovery(struct dlm_ls *ls)
206 {
207 	down_read(&ls->ls_in_recovery);
208 }
209 
210 void dlm_unlock_recovery(struct dlm_ls *ls)
211 {
212 	up_read(&ls->ls_in_recovery);
213 }
214 
215 int dlm_lock_recovery_try(struct dlm_ls *ls)
216 {
217 	return down_read_trylock(&ls->ls_in_recovery);
218 }
219 
220 static inline int can_be_queued(struct dlm_lkb *lkb)
221 {
222 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
223 }
224 
225 static inline int force_blocking_asts(struct dlm_lkb *lkb)
226 {
227 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
228 }
229 
230 static inline int is_demoted(struct dlm_lkb *lkb)
231 {
232 	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
233 }
234 
235 static inline int is_altmode(struct dlm_lkb *lkb)
236 {
237 	return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
238 }
239 
240 static inline int is_granted(struct dlm_lkb *lkb)
241 {
242 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
243 }
244 
245 static inline int is_remote(struct dlm_rsb *r)
246 {
247 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
248 	return !!r->res_nodeid;
249 }
250 
251 static inline int is_process_copy(struct dlm_lkb *lkb)
252 {
253 	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
254 }
255 
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258 	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
259 }
260 
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265 		return 1;
266 	return 0;
267 }
268 
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273 
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276 	return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
277 }
278 
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281 	return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
282 }
283 
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286 	return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
287 				  DLM_IFL_OVERLAP_CANCEL));
288 }
289 
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292 	if (is_master_copy(lkb))
293 		return;
294 
295 	del_timeout(lkb);
296 
297 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
298 
299 	/* if the operation was a cancel, then return -DLM_ECANCEL, if a
300 	   timeout caused the cancel then return -ETIMEDOUT */
301 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
302 		lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
303 		rv = -ETIMEDOUT;
304 	}
305 
306 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
307 		lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
308 		rv = -EDEADLK;
309 	}
310 
311 	dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
312 }
313 
314 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
315 {
316 	queue_cast(r, lkb,
317 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
318 }
319 
320 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
321 {
322 	if (is_master_copy(lkb)) {
323 		send_bast(r, lkb, rqmode);
324 	} else {
325 		dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
326 	}
327 }
328 
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332 
333 /* This is only called to add a reference when the code already holds
334    a valid reference to the rsb, so there's no need for locking. */
335 
336 static inline void hold_rsb(struct dlm_rsb *r)
337 {
338 	kref_get(&r->res_ref);
339 }
340 
341 void dlm_hold_rsb(struct dlm_rsb *r)
342 {
343 	hold_rsb(r);
344 }
345 
346 /* When all references to the rsb are gone it's transferred to
347    the tossed list for later disposal. */
348 
349 static void put_rsb(struct dlm_rsb *r)
350 {
351 	struct dlm_ls *ls = r->res_ls;
352 	uint32_t bucket = r->res_bucket;
353 	int rv;
354 
355 	rv = kref_put_lock(&r->res_ref, toss_rsb,
356 			   &ls->ls_rsbtbl[bucket].lock);
357 	if (rv)
358 		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
359 }
360 
361 void dlm_put_rsb(struct dlm_rsb *r)
362 {
363 	put_rsb(r);
364 }
365 
366 static int pre_rsb_struct(struct dlm_ls *ls)
367 {
368 	struct dlm_rsb *r1, *r2;
369 	int count = 0;
370 
371 	spin_lock(&ls->ls_new_rsb_spin);
372 	if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
373 		spin_unlock(&ls->ls_new_rsb_spin);
374 		return 0;
375 	}
376 	spin_unlock(&ls->ls_new_rsb_spin);
377 
378 	r1 = dlm_allocate_rsb(ls);
379 	r2 = dlm_allocate_rsb(ls);
380 
381 	spin_lock(&ls->ls_new_rsb_spin);
382 	if (r1) {
383 		list_add(&r1->res_hashchain, &ls->ls_new_rsb);
384 		ls->ls_new_rsb_count++;
385 	}
386 	if (r2) {
387 		list_add(&r2->res_hashchain, &ls->ls_new_rsb);
388 		ls->ls_new_rsb_count++;
389 	}
390 	count = ls->ls_new_rsb_count;
391 	spin_unlock(&ls->ls_new_rsb_spin);
392 
393 	if (!count)
394 		return -ENOMEM;
395 	return 0;
396 }
397 
398 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
399    unlock any spinlocks, go back and call pre_rsb_struct again.
400    Otherwise, take an rsb off the list and return it. */
401 
402 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
403 			  struct dlm_rsb **r_ret)
404 {
405 	struct dlm_rsb *r;
406 	int count;
407 
408 	spin_lock(&ls->ls_new_rsb_spin);
409 	if (list_empty(&ls->ls_new_rsb)) {
410 		count = ls->ls_new_rsb_count;
411 		spin_unlock(&ls->ls_new_rsb_spin);
412 		log_debug(ls, "find_rsb retry %d %d %s",
413 			  count, dlm_config.ci_new_rsb_count, name);
414 		return -EAGAIN;
415 	}
416 
417 	r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
418 	list_del(&r->res_hashchain);
419 	/* Convert the empty list_head to a NULL rb_node for tree usage: */
420 	memset(&r->res_hashnode, 0, sizeof(struct rb_node));
421 	ls->ls_new_rsb_count--;
422 	spin_unlock(&ls->ls_new_rsb_spin);
423 
424 	r->res_ls = ls;
425 	r->res_length = len;
426 	memcpy(r->res_name, name, len);
427 	mutex_init(&r->res_mutex);
428 
429 	INIT_LIST_HEAD(&r->res_lookup);
430 	INIT_LIST_HEAD(&r->res_grantqueue);
431 	INIT_LIST_HEAD(&r->res_convertqueue);
432 	INIT_LIST_HEAD(&r->res_waitqueue);
433 	INIT_LIST_HEAD(&r->res_root_list);
434 	INIT_LIST_HEAD(&r->res_recover_list);
435 
436 	*r_ret = r;
437 	return 0;
438 }
439 
440 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
441 {
442 	char maxname[DLM_RESNAME_MAXLEN];
443 
444 	memset(maxname, 0, DLM_RESNAME_MAXLEN);
445 	memcpy(maxname, name, nlen);
446 	return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
447 }
448 
449 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
450 			struct dlm_rsb **r_ret)
451 {
452 	struct rb_node *node = tree->rb_node;
453 	struct dlm_rsb *r;
454 	int rc;
455 
456 	while (node) {
457 		r = rb_entry(node, struct dlm_rsb, res_hashnode);
458 		rc = rsb_cmp(r, name, len);
459 		if (rc < 0)
460 			node = node->rb_left;
461 		else if (rc > 0)
462 			node = node->rb_right;
463 		else
464 			goto found;
465 	}
466 	*r_ret = NULL;
467 	return -EBADR;
468 
469  found:
470 	*r_ret = r;
471 	return 0;
472 }
473 
474 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
475 {
476 	struct rb_node **newn = &tree->rb_node;
477 	struct rb_node *parent = NULL;
478 	int rc;
479 
480 	while (*newn) {
481 		struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
482 					       res_hashnode);
483 
484 		parent = *newn;
485 		rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
486 		if (rc < 0)
487 			newn = &parent->rb_left;
488 		else if (rc > 0)
489 			newn = &parent->rb_right;
490 		else {
491 			log_print("rsb_insert match");
492 			dlm_dump_rsb(rsb);
493 			dlm_dump_rsb(cur);
494 			return -EEXIST;
495 		}
496 	}
497 
498 	rb_link_node(&rsb->res_hashnode, parent, newn);
499 	rb_insert_color(&rsb->res_hashnode, tree);
500 	return 0;
501 }
502 
503 /*
504  * Find rsb in rsbtbl and potentially create/add one
505  *
506  * Delaying the release of rsb's has a similar benefit to applications keeping
507  * NL locks on an rsb, but without the guarantee that the cached master value
508  * will still be valid when the rsb is reused.  Apps aren't always smart enough
509  * to keep NL locks on an rsb that they may lock again shortly; this can lead
510  * to excessive master lookups and removals if we don't delay the release.
511  *
512  * Searching for an rsb means looking through both the normal list and toss
513  * list.  When found on the toss list the rsb is moved to the normal list with
514  * ref count of 1; when found on normal list the ref count is incremented.
515  *
516  * rsb's on the keep list are being used locally and refcounted.
517  * rsb's on the toss list are not being used locally, and are not refcounted.
518  *
519  * The toss list rsb's were either
520  * - previously used locally but not any more (were on keep list, then
521  *   moved to toss list when last refcount dropped)
522  * - created and put on toss list as a directory record for a lookup
523  *   (we are the dir node for the res, but are not using the res right now,
524  *   but some other node is)
525  *
526  * The purpose of find_rsb() is to return a refcounted rsb for local use.
527  * So, if the given rsb is on the toss list, it is moved to the keep list
528  * before being returned.
529  *
530  * toss_rsb() happens when all local usage of the rsb is done, i.e. no
531  * more refcounts exist, so the rsb is moved from the keep list to the
532  * toss list.
533  *
534  * rsb's on both keep and toss lists are used for doing a name to master
535  * lookups.  rsb's that are in use locally (and being refcounted) are on
536  * the keep list, rsb's that are not in use locally (not refcounted) and
537  * only exist for name/master lookups are on the toss list.
538  *
539  * rsb's on the toss list who's dir_nodeid is not local can have stale
540  * name/master mappings.  So, remote requests on such rsb's can potentially
541  * return with an error, which means the mapping is stale and needs to
542  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
543  * first_lkid is to keep only a single outstanding request on an rsb
544  * while that rsb has a potentially stale master.)
545  */
546 
547 static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
548 			uint32_t hash, uint32_t b,
549 			int dir_nodeid, int from_nodeid,
550 			unsigned int flags, struct dlm_rsb **r_ret)
551 {
552 	struct dlm_rsb *r = NULL;
553 	int our_nodeid = dlm_our_nodeid();
554 	int from_local = 0;
555 	int from_other = 0;
556 	int from_dir = 0;
557 	int create = 0;
558 	int error;
559 
560 	if (flags & R_RECEIVE_REQUEST) {
561 		if (from_nodeid == dir_nodeid)
562 			from_dir = 1;
563 		else
564 			from_other = 1;
565 	} else if (flags & R_REQUEST) {
566 		from_local = 1;
567 	}
568 
569 	/*
570 	 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
571 	 * from_nodeid has sent us a lock in dlm_recover_locks, believing
572 	 * we're the new master.  Our local recovery may not have set
573 	 * res_master_nodeid to our_nodeid yet, so allow either.  Don't
574 	 * create the rsb; dlm_recover_process_copy() will handle EBADR
575 	 * by resending.
576 	 *
577 	 * If someone sends us a request, we are the dir node, and we do
578 	 * not find the rsb anywhere, then recreate it.  This happens if
579 	 * someone sends us a request after we have removed/freed an rsb
580 	 * from our toss list.  (They sent a request instead of lookup
581 	 * because they are using an rsb from their toss list.)
582 	 */
583 
584 	if (from_local || from_dir ||
585 	    (from_other && (dir_nodeid == our_nodeid))) {
586 		create = 1;
587 	}
588 
589  retry:
590 	if (create) {
591 		error = pre_rsb_struct(ls);
592 		if (error < 0)
593 			goto out;
594 	}
595 
596 	spin_lock(&ls->ls_rsbtbl[b].lock);
597 
598 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
599 	if (error)
600 		goto do_toss;
601 
602 	/*
603 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
604 	 */
605 
606 	kref_get(&r->res_ref);
607 	goto out_unlock;
608 
609 
610  do_toss:
611 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
612 	if (error)
613 		goto do_new;
614 
615 	/*
616 	 * rsb found inactive (master_nodeid may be out of date unless
617 	 * we are the dir_nodeid or were the master)  No other thread
618 	 * is using this rsb because it's on the toss list, so we can
619 	 * look at or update res_master_nodeid without lock_rsb.
620 	 */
621 
622 	if ((r->res_master_nodeid != our_nodeid) && from_other) {
623 		/* our rsb was not master, and another node (not the dir node)
624 		   has sent us a request */
625 		log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
626 			  from_nodeid, r->res_master_nodeid, dir_nodeid,
627 			  r->res_name);
628 		error = -ENOTBLK;
629 		goto out_unlock;
630 	}
631 
632 	if ((r->res_master_nodeid != our_nodeid) && from_dir) {
633 		/* don't think this should ever happen */
634 		log_error(ls, "find_rsb toss from_dir %d master %d",
635 			  from_nodeid, r->res_master_nodeid);
636 		dlm_print_rsb(r);
637 		/* fix it and go on */
638 		r->res_master_nodeid = our_nodeid;
639 		r->res_nodeid = 0;
640 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
641 		r->res_first_lkid = 0;
642 	}
643 
644 	if (from_local && (r->res_master_nodeid != our_nodeid)) {
645 		/* Because we have held no locks on this rsb,
646 		   res_master_nodeid could have become stale. */
647 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
648 		r->res_first_lkid = 0;
649 	}
650 
651 	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
652 	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
653 	goto out_unlock;
654 
655 
656  do_new:
657 	/*
658 	 * rsb not found
659 	 */
660 
661 	if (error == -EBADR && !create)
662 		goto out_unlock;
663 
664 	error = get_rsb_struct(ls, name, len, &r);
665 	if (error == -EAGAIN) {
666 		spin_unlock(&ls->ls_rsbtbl[b].lock);
667 		goto retry;
668 	}
669 	if (error)
670 		goto out_unlock;
671 
672 	r->res_hash = hash;
673 	r->res_bucket = b;
674 	r->res_dir_nodeid = dir_nodeid;
675 	kref_init(&r->res_ref);
676 
677 	if (from_dir) {
678 		/* want to see how often this happens */
679 		log_debug(ls, "find_rsb new from_dir %d recreate %s",
680 			  from_nodeid, r->res_name);
681 		r->res_master_nodeid = our_nodeid;
682 		r->res_nodeid = 0;
683 		goto out_add;
684 	}
685 
686 	if (from_other && (dir_nodeid != our_nodeid)) {
687 		/* should never happen */
688 		log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
689 			  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
690 		dlm_free_rsb(r);
691 		r = NULL;
692 		error = -ENOTBLK;
693 		goto out_unlock;
694 	}
695 
696 	if (from_other) {
697 		log_debug(ls, "find_rsb new from_other %d dir %d %s",
698 			  from_nodeid, dir_nodeid, r->res_name);
699 	}
700 
701 	if (dir_nodeid == our_nodeid) {
702 		/* When we are the dir nodeid, we can set the master
703 		   node immediately */
704 		r->res_master_nodeid = our_nodeid;
705 		r->res_nodeid = 0;
706 	} else {
707 		/* set_master will send_lookup to dir_nodeid */
708 		r->res_master_nodeid = 0;
709 		r->res_nodeid = -1;
710 	}
711 
712  out_add:
713 	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
714  out_unlock:
715 	spin_unlock(&ls->ls_rsbtbl[b].lock);
716  out:
717 	*r_ret = r;
718 	return error;
719 }
720 
721 /* During recovery, other nodes can send us new MSTCPY locks (from
722    dlm_recover_locks) before we've made ourself master (in
723    dlm_recover_masters). */
724 
725 static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
726 			  uint32_t hash, uint32_t b,
727 			  int dir_nodeid, int from_nodeid,
728 			  unsigned int flags, struct dlm_rsb **r_ret)
729 {
730 	struct dlm_rsb *r = NULL;
731 	int our_nodeid = dlm_our_nodeid();
732 	int recover = (flags & R_RECEIVE_RECOVER);
733 	int error;
734 
735  retry:
736 	error = pre_rsb_struct(ls);
737 	if (error < 0)
738 		goto out;
739 
740 	spin_lock(&ls->ls_rsbtbl[b].lock);
741 
742 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
743 	if (error)
744 		goto do_toss;
745 
746 	/*
747 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
748 	 */
749 
750 	kref_get(&r->res_ref);
751 	goto out_unlock;
752 
753 
754  do_toss:
755 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
756 	if (error)
757 		goto do_new;
758 
759 	/*
760 	 * rsb found inactive. No other thread is using this rsb because
761 	 * it's on the toss list, so we can look at or update
762 	 * res_master_nodeid without lock_rsb.
763 	 */
764 
765 	if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
766 		/* our rsb is not master, and another node has sent us a
767 		   request; this should never happen */
768 		log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
769 			  from_nodeid, r->res_master_nodeid, dir_nodeid);
770 		dlm_print_rsb(r);
771 		error = -ENOTBLK;
772 		goto out_unlock;
773 	}
774 
775 	if (!recover && (r->res_master_nodeid != our_nodeid) &&
776 	    (dir_nodeid == our_nodeid)) {
777 		/* our rsb is not master, and we are dir; may as well fix it;
778 		   this should never happen */
779 		log_error(ls, "find_rsb toss our %d master %d dir %d",
780 			  our_nodeid, r->res_master_nodeid, dir_nodeid);
781 		dlm_print_rsb(r);
782 		r->res_master_nodeid = our_nodeid;
783 		r->res_nodeid = 0;
784 	}
785 
786 	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
787 	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
788 	goto out_unlock;
789 
790 
791  do_new:
792 	/*
793 	 * rsb not found
794 	 */
795 
796 	error = get_rsb_struct(ls, name, len, &r);
797 	if (error == -EAGAIN) {
798 		spin_unlock(&ls->ls_rsbtbl[b].lock);
799 		goto retry;
800 	}
801 	if (error)
802 		goto out_unlock;
803 
804 	r->res_hash = hash;
805 	r->res_bucket = b;
806 	r->res_dir_nodeid = dir_nodeid;
807 	r->res_master_nodeid = dir_nodeid;
808 	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
809 	kref_init(&r->res_ref);
810 
811 	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
812  out_unlock:
813 	spin_unlock(&ls->ls_rsbtbl[b].lock);
814  out:
815 	*r_ret = r;
816 	return error;
817 }
818 
819 static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
820 		    unsigned int flags, struct dlm_rsb **r_ret)
821 {
822 	uint32_t hash, b;
823 	int dir_nodeid;
824 
825 	if (len > DLM_RESNAME_MAXLEN)
826 		return -EINVAL;
827 
828 	hash = jhash(name, len, 0);
829 	b = hash & (ls->ls_rsbtbl_size - 1);
830 
831 	dir_nodeid = dlm_hash2nodeid(ls, hash);
832 
833 	if (dlm_no_directory(ls))
834 		return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
835 				      from_nodeid, flags, r_ret);
836 	else
837 		return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
838 				      from_nodeid, flags, r_ret);
839 }
840 
841 /* we have received a request and found that res_master_nodeid != our_nodeid,
842    so we need to return an error or make ourself the master */
843 
844 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
845 				  int from_nodeid)
846 {
847 	if (dlm_no_directory(ls)) {
848 		log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
849 			  from_nodeid, r->res_master_nodeid,
850 			  r->res_dir_nodeid);
851 		dlm_print_rsb(r);
852 		return -ENOTBLK;
853 	}
854 
855 	if (from_nodeid != r->res_dir_nodeid) {
856 		/* our rsb is not master, and another node (not the dir node)
857 	   	   has sent us a request.  this is much more common when our
858 	   	   master_nodeid is zero, so limit debug to non-zero.  */
859 
860 		if (r->res_master_nodeid) {
861 			log_debug(ls, "validate master from_other %d master %d "
862 				  "dir %d first %x %s", from_nodeid,
863 				  r->res_master_nodeid, r->res_dir_nodeid,
864 				  r->res_first_lkid, r->res_name);
865 		}
866 		return -ENOTBLK;
867 	} else {
868 		/* our rsb is not master, but the dir nodeid has sent us a
869 	   	   request; this could happen with master 0 / res_nodeid -1 */
870 
871 		if (r->res_master_nodeid) {
872 			log_error(ls, "validate master from_dir %d master %d "
873 				  "first %x %s",
874 				  from_nodeid, r->res_master_nodeid,
875 				  r->res_first_lkid, r->res_name);
876 		}
877 
878 		r->res_master_nodeid = dlm_our_nodeid();
879 		r->res_nodeid = 0;
880 		return 0;
881 	}
882 }
883 
884 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
885 				int from_nodeid, bool toss_list, unsigned int flags,
886 				int *r_nodeid, int *result)
887 {
888 	int fix_master = (flags & DLM_LU_RECOVER_MASTER);
889 	int from_master = (flags & DLM_LU_RECOVER_DIR);
890 
891 	if (r->res_dir_nodeid != our_nodeid) {
892 		/* should not happen, but may as well fix it and carry on */
893 		log_error(ls, "%s res_dir %d our %d %s", __func__,
894 			  r->res_dir_nodeid, our_nodeid, r->res_name);
895 		r->res_dir_nodeid = our_nodeid;
896 	}
897 
898 	if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
899 		/* Recovery uses this function to set a new master when
900 		 * the previous master failed.  Setting NEW_MASTER will
901 		 * force dlm_recover_masters to call recover_master on this
902 		 * rsb even though the res_nodeid is no longer removed.
903 		 */
904 
905 		r->res_master_nodeid = from_nodeid;
906 		r->res_nodeid = from_nodeid;
907 		rsb_set_flag(r, RSB_NEW_MASTER);
908 
909 		if (toss_list) {
910 			/* I don't think we should ever find it on toss list. */
911 			log_error(ls, "%s fix_master on toss", __func__);
912 			dlm_dump_rsb(r);
913 		}
914 	}
915 
916 	if (from_master && (r->res_master_nodeid != from_nodeid)) {
917 		/* this will happen if from_nodeid became master during
918 		 * a previous recovery cycle, and we aborted the previous
919 		 * cycle before recovering this master value
920 		 */
921 
922 		log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
923 			  __func__, from_nodeid, r->res_master_nodeid,
924 			  r->res_nodeid, r->res_first_lkid, r->res_name);
925 
926 		if (r->res_master_nodeid == our_nodeid) {
927 			log_error(ls, "from_master %d our_master", from_nodeid);
928 			dlm_dump_rsb(r);
929 			goto ret_assign;
930 		}
931 
932 		r->res_master_nodeid = from_nodeid;
933 		r->res_nodeid = from_nodeid;
934 		rsb_set_flag(r, RSB_NEW_MASTER);
935 	}
936 
937 	if (!r->res_master_nodeid) {
938 		/* this will happen if recovery happens while we're looking
939 		 * up the master for this rsb
940 		 */
941 
942 		log_debug(ls, "%s master 0 to %d first %x %s", __func__,
943 			  from_nodeid, r->res_first_lkid, r->res_name);
944 		r->res_master_nodeid = from_nodeid;
945 		r->res_nodeid = from_nodeid;
946 	}
947 
948 	if (!from_master && !fix_master &&
949 	    (r->res_master_nodeid == from_nodeid)) {
950 		/* this can happen when the master sends remove, the dir node
951 		 * finds the rsb on the keep list and ignores the remove,
952 		 * and the former master sends a lookup
953 		 */
954 
955 		log_limit(ls, "%s from master %d flags %x first %x %s",
956 			  __func__, from_nodeid, flags, r->res_first_lkid,
957 			  r->res_name);
958 	}
959 
960  ret_assign:
961 	*r_nodeid = r->res_master_nodeid;
962 	if (result)
963 		*result = DLM_LU_MATCH;
964 }
965 
966 /*
967  * We're the dir node for this res and another node wants to know the
968  * master nodeid.  During normal operation (non recovery) this is only
969  * called from receive_lookup(); master lookups when the local node is
970  * the dir node are done by find_rsb().
971  *
972  * normal operation, we are the dir node for a resource
973  * . _request_lock
974  * . set_master
975  * . send_lookup
976  * . receive_lookup
977  * . dlm_master_lookup flags 0
978  *
979  * recover directory, we are rebuilding dir for all resources
980  * . dlm_recover_directory
981  * . dlm_rcom_names
982  *   remote node sends back the rsb names it is master of and we are dir of
983  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
984  *   we either create new rsb setting remote node as master, or find existing
985  *   rsb and set master to be the remote node.
986  *
987  * recover masters, we are finding the new master for resources
988  * . dlm_recover_masters
989  * . recover_master
990  * . dlm_send_rcom_lookup
991  * . receive_rcom_lookup
992  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
993  */
994 
995 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
996 		      unsigned int flags, int *r_nodeid, int *result)
997 {
998 	struct dlm_rsb *r = NULL;
999 	uint32_t hash, b;
1000 	int our_nodeid = dlm_our_nodeid();
1001 	int dir_nodeid, error;
1002 
1003 	if (len > DLM_RESNAME_MAXLEN)
1004 		return -EINVAL;
1005 
1006 	if (from_nodeid == our_nodeid) {
1007 		log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1008 			  our_nodeid, flags);
1009 		return -EINVAL;
1010 	}
1011 
1012 	hash = jhash(name, len, 0);
1013 	b = hash & (ls->ls_rsbtbl_size - 1);
1014 
1015 	dir_nodeid = dlm_hash2nodeid(ls, hash);
1016 	if (dir_nodeid != our_nodeid) {
1017 		log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1018 			  from_nodeid, dir_nodeid, our_nodeid, hash,
1019 			  ls->ls_num_nodes);
1020 		*r_nodeid = -1;
1021 		return -EINVAL;
1022 	}
1023 
1024  retry:
1025 	error = pre_rsb_struct(ls);
1026 	if (error < 0)
1027 		return error;
1028 
1029 	spin_lock(&ls->ls_rsbtbl[b].lock);
1030 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1031 	if (!error) {
1032 		/* because the rsb is active, we need to lock_rsb before
1033 		 * checking/changing re_master_nodeid
1034 		 */
1035 
1036 		hold_rsb(r);
1037 		spin_unlock(&ls->ls_rsbtbl[b].lock);
1038 		lock_rsb(r);
1039 
1040 		__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1041 				    flags, r_nodeid, result);
1042 
1043 		/* the rsb was active */
1044 		unlock_rsb(r);
1045 		put_rsb(r);
1046 
1047 		return 0;
1048 	}
1049 
1050 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1051 	if (error)
1052 		goto not_found;
1053 
1054 	/* because the rsb is inactive (on toss list), it's not refcounted
1055 	 * and lock_rsb is not used, but is protected by the rsbtbl lock
1056 	 */
1057 
1058 	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1059 			    r_nodeid, result);
1060 
1061 	r->res_toss_time = jiffies;
1062 	/* the rsb was inactive (on toss list) */
1063 	spin_unlock(&ls->ls_rsbtbl[b].lock);
1064 
1065 	return 0;
1066 
1067  not_found:
1068 	error = get_rsb_struct(ls, name, len, &r);
1069 	if (error == -EAGAIN) {
1070 		spin_unlock(&ls->ls_rsbtbl[b].lock);
1071 		goto retry;
1072 	}
1073 	if (error)
1074 		goto out_unlock;
1075 
1076 	r->res_hash = hash;
1077 	r->res_bucket = b;
1078 	r->res_dir_nodeid = our_nodeid;
1079 	r->res_master_nodeid = from_nodeid;
1080 	r->res_nodeid = from_nodeid;
1081 	kref_init(&r->res_ref);
1082 	r->res_toss_time = jiffies;
1083 
1084 	error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1085 	if (error) {
1086 		/* should never happen */
1087 		dlm_free_rsb(r);
1088 		spin_unlock(&ls->ls_rsbtbl[b].lock);
1089 		goto retry;
1090 	}
1091 
1092 	if (result)
1093 		*result = DLM_LU_ADD;
1094 	*r_nodeid = from_nodeid;
1095  out_unlock:
1096 	spin_unlock(&ls->ls_rsbtbl[b].lock);
1097 	return error;
1098 }
1099 
1100 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1101 {
1102 	struct rb_node *n;
1103 	struct dlm_rsb *r;
1104 	int i;
1105 
1106 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1107 		spin_lock(&ls->ls_rsbtbl[i].lock);
1108 		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1109 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
1110 			if (r->res_hash == hash)
1111 				dlm_dump_rsb(r);
1112 		}
1113 		spin_unlock(&ls->ls_rsbtbl[i].lock);
1114 	}
1115 }
1116 
1117 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1118 {
1119 	struct dlm_rsb *r = NULL;
1120 	uint32_t hash, b;
1121 	int error;
1122 
1123 	hash = jhash(name, len, 0);
1124 	b = hash & (ls->ls_rsbtbl_size - 1);
1125 
1126 	spin_lock(&ls->ls_rsbtbl[b].lock);
1127 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1128 	if (!error)
1129 		goto out_dump;
1130 
1131 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1132 	if (error)
1133 		goto out;
1134  out_dump:
1135 	dlm_dump_rsb(r);
1136  out:
1137 	spin_unlock(&ls->ls_rsbtbl[b].lock);
1138 }
1139 
1140 static void toss_rsb(struct kref *kref)
1141 {
1142 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1143 	struct dlm_ls *ls = r->res_ls;
1144 
1145 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1146 	kref_init(&r->res_ref);
1147 	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1148 	rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1149 	r->res_toss_time = jiffies;
1150 	ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK;
1151 	if (r->res_lvbptr) {
1152 		dlm_free_lvb(r->res_lvbptr);
1153 		r->res_lvbptr = NULL;
1154 	}
1155 }
1156 
1157 /* See comment for unhold_lkb */
1158 
1159 static void unhold_rsb(struct dlm_rsb *r)
1160 {
1161 	int rv;
1162 	rv = kref_put(&r->res_ref, toss_rsb);
1163 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
1164 }
1165 
1166 static void kill_rsb(struct kref *kref)
1167 {
1168 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1169 
1170 	/* All work is done after the return from kref_put() so we
1171 	   can release the write_lock before the remove and free. */
1172 
1173 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1174 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1175 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1176 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1177 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1178 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1179 }
1180 
1181 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1182    The rsb must exist as long as any lkb's for it do. */
1183 
1184 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1185 {
1186 	hold_rsb(r);
1187 	lkb->lkb_resource = r;
1188 }
1189 
1190 static void detach_lkb(struct dlm_lkb *lkb)
1191 {
1192 	if (lkb->lkb_resource) {
1193 		put_rsb(lkb->lkb_resource);
1194 		lkb->lkb_resource = NULL;
1195 	}
1196 }
1197 
1198 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1199 		       int start, int end)
1200 {
1201 	struct dlm_lkb *lkb;
1202 	int rv;
1203 
1204 	lkb = dlm_allocate_lkb(ls);
1205 	if (!lkb)
1206 		return -ENOMEM;
1207 
1208 	lkb->lkb_nodeid = -1;
1209 	lkb->lkb_grmode = DLM_LOCK_IV;
1210 	kref_init(&lkb->lkb_ref);
1211 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1212 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1213 	INIT_LIST_HEAD(&lkb->lkb_time_list);
1214 	INIT_LIST_HEAD(&lkb->lkb_cb_list);
1215 	mutex_init(&lkb->lkb_cb_mutex);
1216 	INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1217 
1218 	idr_preload(GFP_NOFS);
1219 	spin_lock(&ls->ls_lkbidr_spin);
1220 	rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
1221 	if (rv >= 0)
1222 		lkb->lkb_id = rv;
1223 	spin_unlock(&ls->ls_lkbidr_spin);
1224 	idr_preload_end();
1225 
1226 	if (rv < 0) {
1227 		log_error(ls, "create_lkb idr error %d", rv);
1228 		dlm_free_lkb(lkb);
1229 		return rv;
1230 	}
1231 
1232 	*lkb_ret = lkb;
1233 	return 0;
1234 }
1235 
1236 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1237 {
1238 	return _create_lkb(ls, lkb_ret, 1, 0);
1239 }
1240 
1241 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1242 {
1243 	struct dlm_lkb *lkb;
1244 
1245 	spin_lock(&ls->ls_lkbidr_spin);
1246 	lkb = idr_find(&ls->ls_lkbidr, lkid);
1247 	if (lkb)
1248 		kref_get(&lkb->lkb_ref);
1249 	spin_unlock(&ls->ls_lkbidr_spin);
1250 
1251 	*lkb_ret = lkb;
1252 	return lkb ? 0 : -ENOENT;
1253 }
1254 
1255 static void kill_lkb(struct kref *kref)
1256 {
1257 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1258 
1259 	/* All work is done after the return from kref_put() so we
1260 	   can release the write_lock before the detach_lkb */
1261 
1262 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1263 }
1264 
1265 /* __put_lkb() is used when an lkb may not have an rsb attached to
1266    it so we need to provide the lockspace explicitly */
1267 
1268 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1269 {
1270 	uint32_t lkid = lkb->lkb_id;
1271 	int rv;
1272 
1273 	rv = kref_put_lock(&lkb->lkb_ref, kill_lkb,
1274 			   &ls->ls_lkbidr_spin);
1275 	if (rv) {
1276 		idr_remove(&ls->ls_lkbidr, lkid);
1277 		spin_unlock(&ls->ls_lkbidr_spin);
1278 
1279 		detach_lkb(lkb);
1280 
1281 		/* for local/process lkbs, lvbptr points to caller's lksb */
1282 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
1283 			dlm_free_lvb(lkb->lkb_lvbptr);
1284 		dlm_free_lkb(lkb);
1285 	}
1286 
1287 	return rv;
1288 }
1289 
1290 int dlm_put_lkb(struct dlm_lkb *lkb)
1291 {
1292 	struct dlm_ls *ls;
1293 
1294 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1295 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1296 
1297 	ls = lkb->lkb_resource->res_ls;
1298 	return __put_lkb(ls, lkb);
1299 }
1300 
1301 /* This is only called to add a reference when the code already holds
1302    a valid reference to the lkb, so there's no need for locking. */
1303 
1304 static inline void hold_lkb(struct dlm_lkb *lkb)
1305 {
1306 	kref_get(&lkb->lkb_ref);
1307 }
1308 
1309 /* This is called when we need to remove a reference and are certain
1310    it's not the last ref.  e.g. del_lkb is always called between a
1311    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1312    put_lkb would work fine, but would involve unnecessary locking */
1313 
1314 static inline void unhold_lkb(struct dlm_lkb *lkb)
1315 {
1316 	int rv;
1317 	rv = kref_put(&lkb->lkb_ref, kill_lkb);
1318 	DLM_ASSERT(!rv, dlm_print_lkb(lkb););
1319 }
1320 
1321 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1322 			    int mode)
1323 {
1324 	struct dlm_lkb *lkb = NULL, *iter;
1325 
1326 	list_for_each_entry(iter, head, lkb_statequeue)
1327 		if (iter->lkb_rqmode < mode) {
1328 			lkb = iter;
1329 			list_add_tail(new, &iter->lkb_statequeue);
1330 			break;
1331 		}
1332 
1333 	if (!lkb)
1334 		list_add_tail(new, head);
1335 }
1336 
1337 /* add/remove lkb to rsb's grant/convert/wait queue */
1338 
1339 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1340 {
1341 	kref_get(&lkb->lkb_ref);
1342 
1343 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1344 
1345 	lkb->lkb_timestamp = ktime_get();
1346 
1347 	lkb->lkb_status = status;
1348 
1349 	switch (status) {
1350 	case DLM_LKSTS_WAITING:
1351 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1352 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1353 		else
1354 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1355 		break;
1356 	case DLM_LKSTS_GRANTED:
1357 		/* convention says granted locks kept in order of grmode */
1358 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1359 				lkb->lkb_grmode);
1360 		break;
1361 	case DLM_LKSTS_CONVERT:
1362 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1363 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1364 		else
1365 			list_add_tail(&lkb->lkb_statequeue,
1366 				      &r->res_convertqueue);
1367 		break;
1368 	default:
1369 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1370 	}
1371 }
1372 
1373 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1374 {
1375 	lkb->lkb_status = 0;
1376 	list_del(&lkb->lkb_statequeue);
1377 	unhold_lkb(lkb);
1378 }
1379 
1380 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1381 {
1382 	hold_lkb(lkb);
1383 	del_lkb(r, lkb);
1384 	add_lkb(r, lkb, sts);
1385 	unhold_lkb(lkb);
1386 }
1387 
1388 static int msg_reply_type(int mstype)
1389 {
1390 	switch (mstype) {
1391 	case DLM_MSG_REQUEST:
1392 		return DLM_MSG_REQUEST_REPLY;
1393 	case DLM_MSG_CONVERT:
1394 		return DLM_MSG_CONVERT_REPLY;
1395 	case DLM_MSG_UNLOCK:
1396 		return DLM_MSG_UNLOCK_REPLY;
1397 	case DLM_MSG_CANCEL:
1398 		return DLM_MSG_CANCEL_REPLY;
1399 	case DLM_MSG_LOOKUP:
1400 		return DLM_MSG_LOOKUP_REPLY;
1401 	}
1402 	return -1;
1403 }
1404 
1405 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
1406 {
1407 	int i;
1408 
1409 	for (i = 0; i < num_nodes; i++) {
1410 		if (!warned[i]) {
1411 			warned[i] = nodeid;
1412 			return 0;
1413 		}
1414 		if (warned[i] == nodeid)
1415 			return 1;
1416 	}
1417 	return 0;
1418 }
1419 
1420 void dlm_scan_waiters(struct dlm_ls *ls)
1421 {
1422 	struct dlm_lkb *lkb;
1423 	s64 us;
1424 	s64 debug_maxus = 0;
1425 	u32 debug_scanned = 0;
1426 	u32 debug_expired = 0;
1427 	int num_nodes = 0;
1428 	int *warned = NULL;
1429 
1430 	if (!dlm_config.ci_waitwarn_us)
1431 		return;
1432 
1433 	mutex_lock(&ls->ls_waiters_mutex);
1434 
1435 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1436 		if (!lkb->lkb_wait_time)
1437 			continue;
1438 
1439 		debug_scanned++;
1440 
1441 		us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
1442 
1443 		if (us < dlm_config.ci_waitwarn_us)
1444 			continue;
1445 
1446 		lkb->lkb_wait_time = 0;
1447 
1448 		debug_expired++;
1449 		if (us > debug_maxus)
1450 			debug_maxus = us;
1451 
1452 		if (!num_nodes) {
1453 			num_nodes = ls->ls_num_nodes;
1454 			warned = kcalloc(num_nodes, sizeof(int), GFP_KERNEL);
1455 		}
1456 		if (!warned)
1457 			continue;
1458 		if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
1459 			continue;
1460 
1461 		log_error(ls, "waitwarn %x %lld %d us check connection to "
1462 			  "node %d", lkb->lkb_id, (long long)us,
1463 			  dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
1464 	}
1465 	mutex_unlock(&ls->ls_waiters_mutex);
1466 	kfree(warned);
1467 
1468 	if (debug_expired)
1469 		log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
1470 			  debug_scanned, debug_expired,
1471 			  dlm_config.ci_waitwarn_us, (long long)debug_maxus);
1472 }
1473 
1474 /* add/remove lkb from global waiters list of lkb's waiting for
1475    a reply from a remote node */
1476 
1477 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1478 {
1479 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1480 	int error = 0;
1481 
1482 	mutex_lock(&ls->ls_waiters_mutex);
1483 
1484 	if (is_overlap_unlock(lkb) ||
1485 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1486 		error = -EINVAL;
1487 		goto out;
1488 	}
1489 
1490 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1491 		switch (mstype) {
1492 		case DLM_MSG_UNLOCK:
1493 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1494 			break;
1495 		case DLM_MSG_CANCEL:
1496 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1497 			break;
1498 		default:
1499 			error = -EBUSY;
1500 			goto out;
1501 		}
1502 		lkb->lkb_wait_count++;
1503 		hold_lkb(lkb);
1504 
1505 		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1506 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
1507 			  lkb->lkb_wait_count, lkb->lkb_flags);
1508 		goto out;
1509 	}
1510 
1511 	DLM_ASSERT(!lkb->lkb_wait_count,
1512 		   dlm_print_lkb(lkb);
1513 		   printk("wait_count %d\n", lkb->lkb_wait_count););
1514 
1515 	lkb->lkb_wait_count++;
1516 	lkb->lkb_wait_type = mstype;
1517 	lkb->lkb_wait_time = ktime_get();
1518 	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1519 	hold_lkb(lkb);
1520 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1521  out:
1522 	if (error)
1523 		log_error(ls, "addwait error %x %d flags %x %d %d %s",
1524 			  lkb->lkb_id, error, lkb->lkb_flags, mstype,
1525 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1526 	mutex_unlock(&ls->ls_waiters_mutex);
1527 	return error;
1528 }
1529 
1530 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1531    list as part of process_requestqueue (e.g. a lookup that has an optimized
1532    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1533    set RESEND and dlm_recover_waiters_post() */
1534 
1535 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1536 				struct dlm_message *ms)
1537 {
1538 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1539 	int overlap_done = 0;
1540 
1541 	if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1542 		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1543 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1544 		overlap_done = 1;
1545 		goto out_del;
1546 	}
1547 
1548 	if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1549 		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1550 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1551 		overlap_done = 1;
1552 		goto out_del;
1553 	}
1554 
1555 	/* Cancel state was preemptively cleared by a successful convert,
1556 	   see next comment, nothing to do. */
1557 
1558 	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1559 	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1560 		log_debug(ls, "remwait %x cancel_reply wait_type %d",
1561 			  lkb->lkb_id, lkb->lkb_wait_type);
1562 		return -1;
1563 	}
1564 
1565 	/* Remove for the convert reply, and premptively remove for the
1566 	   cancel reply.  A convert has been granted while there's still
1567 	   an outstanding cancel on it (the cancel is moot and the result
1568 	   in the cancel reply should be 0).  We preempt the cancel reply
1569 	   because the app gets the convert result and then can follow up
1570 	   with another op, like convert.  This subsequent op would see the
1571 	   lingering state of the cancel and fail with -EBUSY. */
1572 
1573 	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1574 	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1575 	    is_overlap_cancel(lkb) && ms && !ms->m_result) {
1576 		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1577 			  lkb->lkb_id);
1578 		lkb->lkb_wait_type = 0;
1579 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1580 		lkb->lkb_wait_count--;
1581 		unhold_lkb(lkb);
1582 		goto out_del;
1583 	}
1584 
1585 	/* N.B. type of reply may not always correspond to type of original
1586 	   msg due to lookup->request optimization, verify others? */
1587 
1588 	if (lkb->lkb_wait_type) {
1589 		lkb->lkb_wait_type = 0;
1590 		goto out_del;
1591 	}
1592 
1593 	log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1594 		  lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1595 		  lkb->lkb_remid, mstype, lkb->lkb_flags);
1596 	return -1;
1597 
1598  out_del:
1599 	/* the force-unlock/cancel has completed and we haven't recvd a reply
1600 	   to the op that was in progress prior to the unlock/cancel; we
1601 	   give up on any reply to the earlier op.  FIXME: not sure when/how
1602 	   this would happen */
1603 
1604 	if (overlap_done && lkb->lkb_wait_type) {
1605 		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1606 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
1607 		lkb->lkb_wait_count--;
1608 		unhold_lkb(lkb);
1609 		lkb->lkb_wait_type = 0;
1610 	}
1611 
1612 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1613 
1614 	lkb->lkb_flags &= ~DLM_IFL_RESEND;
1615 	lkb->lkb_wait_count--;
1616 	if (!lkb->lkb_wait_count)
1617 		list_del_init(&lkb->lkb_wait_reply);
1618 	unhold_lkb(lkb);
1619 	return 0;
1620 }
1621 
1622 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1623 {
1624 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1625 	int error;
1626 
1627 	mutex_lock(&ls->ls_waiters_mutex);
1628 	error = _remove_from_waiters(lkb, mstype, NULL);
1629 	mutex_unlock(&ls->ls_waiters_mutex);
1630 	return error;
1631 }
1632 
1633 /* Handles situations where we might be processing a "fake" or "stub" reply in
1634    which we can't try to take waiters_mutex again. */
1635 
1636 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1637 {
1638 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1639 	int error;
1640 
1641 	if (ms->m_flags != cpu_to_le32(DLM_IFL_STUB_MS))
1642 		mutex_lock(&ls->ls_waiters_mutex);
1643 	error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1644 	if (ms->m_flags != cpu_to_le32(DLM_IFL_STUB_MS))
1645 		mutex_unlock(&ls->ls_waiters_mutex);
1646 	return error;
1647 }
1648 
1649 /* If there's an rsb for the same resource being removed, ensure
1650  * that the remove message is sent before the new lookup message.
1651  */
1652 
1653 #define DLM_WAIT_PENDING_COND(ls, r)		\
1654 	(ls->ls_remove_len &&			\
1655 	 !rsb_cmp(r, ls->ls_remove_name,	\
1656 		  ls->ls_remove_len))
1657 
1658 static void wait_pending_remove(struct dlm_rsb *r)
1659 {
1660 	struct dlm_ls *ls = r->res_ls;
1661  restart:
1662 	spin_lock(&ls->ls_remove_spin);
1663 	if (DLM_WAIT_PENDING_COND(ls, r)) {
1664 		log_debug(ls, "delay lookup for remove dir %d %s",
1665 			  r->res_dir_nodeid, r->res_name);
1666 		spin_unlock(&ls->ls_remove_spin);
1667 		wait_event(ls->ls_remove_wait, !DLM_WAIT_PENDING_COND(ls, r));
1668 		goto restart;
1669 	}
1670 	spin_unlock(&ls->ls_remove_spin);
1671 }
1672 
1673 /*
1674  * ls_remove_spin protects ls_remove_name and ls_remove_len which are
1675  * read by other threads in wait_pending_remove.  ls_remove_names
1676  * and ls_remove_lens are only used by the scan thread, so they do
1677  * not need protection.
1678  */
1679 
1680 static void shrink_bucket(struct dlm_ls *ls, int b)
1681 {
1682 	struct rb_node *n, *next;
1683 	struct dlm_rsb *r;
1684 	char *name;
1685 	int our_nodeid = dlm_our_nodeid();
1686 	int remote_count = 0;
1687 	int need_shrink = 0;
1688 	int i, len, rv;
1689 
1690 	memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1691 
1692 	spin_lock(&ls->ls_rsbtbl[b].lock);
1693 
1694 	if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) {
1695 		spin_unlock(&ls->ls_rsbtbl[b].lock);
1696 		return;
1697 	}
1698 
1699 	for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1700 		next = rb_next(n);
1701 		r = rb_entry(n, struct dlm_rsb, res_hashnode);
1702 
1703 		/* If we're the directory record for this rsb, and
1704 		   we're not the master of it, then we need to wait
1705 		   for the master node to send us a dir remove for
1706 		   before removing the dir record. */
1707 
1708 		if (!dlm_no_directory(ls) &&
1709 		    (r->res_master_nodeid != our_nodeid) &&
1710 		    (dlm_dir_nodeid(r) == our_nodeid)) {
1711 			continue;
1712 		}
1713 
1714 		need_shrink = 1;
1715 
1716 		if (!time_after_eq(jiffies, r->res_toss_time +
1717 				   dlm_config.ci_toss_secs * HZ)) {
1718 			continue;
1719 		}
1720 
1721 		if (!dlm_no_directory(ls) &&
1722 		    (r->res_master_nodeid == our_nodeid) &&
1723 		    (dlm_dir_nodeid(r) != our_nodeid)) {
1724 
1725 			/* We're the master of this rsb but we're not
1726 			   the directory record, so we need to tell the
1727 			   dir node to remove the dir record. */
1728 
1729 			ls->ls_remove_lens[remote_count] = r->res_length;
1730 			memcpy(ls->ls_remove_names[remote_count], r->res_name,
1731 			       DLM_RESNAME_MAXLEN);
1732 			remote_count++;
1733 
1734 			if (remote_count >= DLM_REMOVE_NAMES_MAX)
1735 				break;
1736 			continue;
1737 		}
1738 
1739 		if (!kref_put(&r->res_ref, kill_rsb)) {
1740 			log_error(ls, "tossed rsb in use %s", r->res_name);
1741 			continue;
1742 		}
1743 
1744 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1745 		dlm_free_rsb(r);
1746 	}
1747 
1748 	if (need_shrink)
1749 		ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK;
1750 	else
1751 		ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK;
1752 	spin_unlock(&ls->ls_rsbtbl[b].lock);
1753 
1754 	/*
1755 	 * While searching for rsb's to free, we found some that require
1756 	 * remote removal.  We leave them in place and find them again here
1757 	 * so there is a very small gap between removing them from the toss
1758 	 * list and sending the removal.  Keeping this gap small is
1759 	 * important to keep us (the master node) from being out of sync
1760 	 * with the remote dir node for very long.
1761 	 *
1762 	 * From the time the rsb is removed from toss until just after
1763 	 * send_remove, the rsb name is saved in ls_remove_name.  A new
1764 	 * lookup checks this to ensure that a new lookup message for the
1765 	 * same resource name is not sent just before the remove message.
1766 	 */
1767 
1768 	for (i = 0; i < remote_count; i++) {
1769 		name = ls->ls_remove_names[i];
1770 		len = ls->ls_remove_lens[i];
1771 
1772 		spin_lock(&ls->ls_rsbtbl[b].lock);
1773 		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1774 		if (rv) {
1775 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1776 			log_debug(ls, "remove_name not toss %s", name);
1777 			continue;
1778 		}
1779 
1780 		if (r->res_master_nodeid != our_nodeid) {
1781 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1782 			log_debug(ls, "remove_name master %d dir %d our %d %s",
1783 				  r->res_master_nodeid, r->res_dir_nodeid,
1784 				  our_nodeid, name);
1785 			continue;
1786 		}
1787 
1788 		if (r->res_dir_nodeid == our_nodeid) {
1789 			/* should never happen */
1790 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1791 			log_error(ls, "remove_name dir %d master %d our %d %s",
1792 				  r->res_dir_nodeid, r->res_master_nodeid,
1793 				  our_nodeid, name);
1794 			continue;
1795 		}
1796 
1797 		if (!time_after_eq(jiffies, r->res_toss_time +
1798 				   dlm_config.ci_toss_secs * HZ)) {
1799 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1800 			log_debug(ls, "remove_name toss_time %lu now %lu %s",
1801 				  r->res_toss_time, jiffies, name);
1802 			continue;
1803 		}
1804 
1805 		if (!kref_put(&r->res_ref, kill_rsb)) {
1806 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1807 			log_error(ls, "remove_name in use %s", name);
1808 			continue;
1809 		}
1810 
1811 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1812 
1813 		/* block lookup of same name until we've sent remove */
1814 		spin_lock(&ls->ls_remove_spin);
1815 		ls->ls_remove_len = len;
1816 		memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
1817 		spin_unlock(&ls->ls_remove_spin);
1818 		spin_unlock(&ls->ls_rsbtbl[b].lock);
1819 
1820 		send_remove(r);
1821 
1822 		/* allow lookup of name again */
1823 		spin_lock(&ls->ls_remove_spin);
1824 		ls->ls_remove_len = 0;
1825 		memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
1826 		spin_unlock(&ls->ls_remove_spin);
1827 		wake_up(&ls->ls_remove_wait);
1828 
1829 		dlm_free_rsb(r);
1830 	}
1831 }
1832 
1833 void dlm_scan_rsbs(struct dlm_ls *ls)
1834 {
1835 	int i;
1836 
1837 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1838 		shrink_bucket(ls, i);
1839 		if (dlm_locking_stopped(ls))
1840 			break;
1841 		cond_resched();
1842 	}
1843 }
1844 
1845 static void add_timeout(struct dlm_lkb *lkb)
1846 {
1847 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1848 
1849 	if (is_master_copy(lkb))
1850 		return;
1851 
1852 	if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1853 	    !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1854 		lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1855 		goto add_it;
1856 	}
1857 	if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1858 		goto add_it;
1859 	return;
1860 
1861  add_it:
1862 	DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1863 	mutex_lock(&ls->ls_timeout_mutex);
1864 	hold_lkb(lkb);
1865 	list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1866 	mutex_unlock(&ls->ls_timeout_mutex);
1867 }
1868 
1869 static void del_timeout(struct dlm_lkb *lkb)
1870 {
1871 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1872 
1873 	mutex_lock(&ls->ls_timeout_mutex);
1874 	if (!list_empty(&lkb->lkb_time_list)) {
1875 		list_del_init(&lkb->lkb_time_list);
1876 		unhold_lkb(lkb);
1877 	}
1878 	mutex_unlock(&ls->ls_timeout_mutex);
1879 }
1880 
1881 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1882    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1883    and then lock rsb because of lock ordering in add_timeout.  We may need
1884    to specify some special timeout-related bits in the lkb that are just to
1885    be accessed under the timeout_mutex. */
1886 
1887 void dlm_scan_timeout(struct dlm_ls *ls)
1888 {
1889 	struct dlm_rsb *r;
1890 	struct dlm_lkb *lkb = NULL, *iter;
1891 	int do_cancel, do_warn;
1892 	s64 wait_us;
1893 
1894 	for (;;) {
1895 		if (dlm_locking_stopped(ls))
1896 			break;
1897 
1898 		do_cancel = 0;
1899 		do_warn = 0;
1900 		mutex_lock(&ls->ls_timeout_mutex);
1901 		list_for_each_entry(iter, &ls->ls_timeout, lkb_time_list) {
1902 
1903 			wait_us = ktime_to_us(ktime_sub(ktime_get(),
1904 							iter->lkb_timestamp));
1905 
1906 			if ((iter->lkb_exflags & DLM_LKF_TIMEOUT) &&
1907 			    wait_us >= (iter->lkb_timeout_cs * 10000))
1908 				do_cancel = 1;
1909 
1910 			if ((iter->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1911 			    wait_us >= dlm_config.ci_timewarn_cs * 10000)
1912 				do_warn = 1;
1913 
1914 			if (!do_cancel && !do_warn)
1915 				continue;
1916 			hold_lkb(iter);
1917 			lkb = iter;
1918 			break;
1919 		}
1920 		mutex_unlock(&ls->ls_timeout_mutex);
1921 
1922 		if (!lkb)
1923 			break;
1924 
1925 		r = lkb->lkb_resource;
1926 		hold_rsb(r);
1927 		lock_rsb(r);
1928 
1929 		if (do_warn) {
1930 			/* clear flag so we only warn once */
1931 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1932 			if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1933 				del_timeout(lkb);
1934 			dlm_timeout_warn(lkb);
1935 		}
1936 
1937 		if (do_cancel) {
1938 			log_debug(ls, "timeout cancel %x node %d %s",
1939 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1940 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1941 			lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1942 			del_timeout(lkb);
1943 			_cancel_lock(r, lkb);
1944 		}
1945 
1946 		unlock_rsb(r);
1947 		unhold_rsb(r);
1948 		dlm_put_lkb(lkb);
1949 	}
1950 }
1951 
1952 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1953    dlm_recoverd before checking/setting ls_recover_begin. */
1954 
1955 void dlm_adjust_timeouts(struct dlm_ls *ls)
1956 {
1957 	struct dlm_lkb *lkb;
1958 	u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1959 
1960 	ls->ls_recover_begin = 0;
1961 	mutex_lock(&ls->ls_timeout_mutex);
1962 	list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1963 		lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1964 	mutex_unlock(&ls->ls_timeout_mutex);
1965 
1966 	if (!dlm_config.ci_waitwarn_us)
1967 		return;
1968 
1969 	mutex_lock(&ls->ls_waiters_mutex);
1970 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1971 		if (ktime_to_us(lkb->lkb_wait_time))
1972 			lkb->lkb_wait_time = ktime_get();
1973 	}
1974 	mutex_unlock(&ls->ls_waiters_mutex);
1975 }
1976 
1977 /* lkb is master or local copy */
1978 
1979 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1980 {
1981 	int b, len = r->res_ls->ls_lvblen;
1982 
1983 	/* b=1 lvb returned to caller
1984 	   b=0 lvb written to rsb or invalidated
1985 	   b=-1 do nothing */
1986 
1987 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1988 
1989 	if (b == 1) {
1990 		if (!lkb->lkb_lvbptr)
1991 			return;
1992 
1993 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1994 			return;
1995 
1996 		if (!r->res_lvbptr)
1997 			return;
1998 
1999 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
2000 		lkb->lkb_lvbseq = r->res_lvbseq;
2001 
2002 	} else if (b == 0) {
2003 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
2004 			rsb_set_flag(r, RSB_VALNOTVALID);
2005 			return;
2006 		}
2007 
2008 		if (!lkb->lkb_lvbptr)
2009 			return;
2010 
2011 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2012 			return;
2013 
2014 		if (!r->res_lvbptr)
2015 			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
2016 
2017 		if (!r->res_lvbptr)
2018 			return;
2019 
2020 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
2021 		r->res_lvbseq++;
2022 		lkb->lkb_lvbseq = r->res_lvbseq;
2023 		rsb_clear_flag(r, RSB_VALNOTVALID);
2024 	}
2025 
2026 	if (rsb_flag(r, RSB_VALNOTVALID))
2027 		lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
2028 }
2029 
2030 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2031 {
2032 	if (lkb->lkb_grmode < DLM_LOCK_PW)
2033 		return;
2034 
2035 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
2036 		rsb_set_flag(r, RSB_VALNOTVALID);
2037 		return;
2038 	}
2039 
2040 	if (!lkb->lkb_lvbptr)
2041 		return;
2042 
2043 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2044 		return;
2045 
2046 	if (!r->res_lvbptr)
2047 		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
2048 
2049 	if (!r->res_lvbptr)
2050 		return;
2051 
2052 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2053 	r->res_lvbseq++;
2054 	rsb_clear_flag(r, RSB_VALNOTVALID);
2055 }
2056 
2057 /* lkb is process copy (pc) */
2058 
2059 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2060 			    struct dlm_message *ms)
2061 {
2062 	int b;
2063 
2064 	if (!lkb->lkb_lvbptr)
2065 		return;
2066 
2067 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2068 		return;
2069 
2070 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
2071 	if (b == 1) {
2072 		int len = receive_extralen(ms);
2073 		if (len > r->res_ls->ls_lvblen)
2074 			len = r->res_ls->ls_lvblen;
2075 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2076 		lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
2077 	}
2078 }
2079 
2080 /* Manipulate lkb's on rsb's convert/granted/waiting queues
2081    remove_lock -- used for unlock, removes lkb from granted
2082    revert_lock -- used for cancel, moves lkb from convert to granted
2083    grant_lock  -- used for request and convert, adds lkb to granted or
2084                   moves lkb from convert or waiting to granted
2085 
2086    Each of these is used for master or local copy lkb's.  There is
2087    also a _pc() variation used to make the corresponding change on
2088    a process copy (pc) lkb. */
2089 
2090 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2091 {
2092 	del_lkb(r, lkb);
2093 	lkb->lkb_grmode = DLM_LOCK_IV;
2094 	/* this unhold undoes the original ref from create_lkb()
2095 	   so this leads to the lkb being freed */
2096 	unhold_lkb(lkb);
2097 }
2098 
2099 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2100 {
2101 	set_lvb_unlock(r, lkb);
2102 	_remove_lock(r, lkb);
2103 }
2104 
2105 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2106 {
2107 	_remove_lock(r, lkb);
2108 }
2109 
2110 /* returns: 0 did nothing
2111 	    1 moved lock to granted
2112 	   -1 removed lock */
2113 
2114 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2115 {
2116 	int rv = 0;
2117 
2118 	lkb->lkb_rqmode = DLM_LOCK_IV;
2119 
2120 	switch (lkb->lkb_status) {
2121 	case DLM_LKSTS_GRANTED:
2122 		break;
2123 	case DLM_LKSTS_CONVERT:
2124 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2125 		rv = 1;
2126 		break;
2127 	case DLM_LKSTS_WAITING:
2128 		del_lkb(r, lkb);
2129 		lkb->lkb_grmode = DLM_LOCK_IV;
2130 		/* this unhold undoes the original ref from create_lkb()
2131 		   so this leads to the lkb being freed */
2132 		unhold_lkb(lkb);
2133 		rv = -1;
2134 		break;
2135 	default:
2136 		log_print("invalid status for revert %d", lkb->lkb_status);
2137 	}
2138 	return rv;
2139 }
2140 
2141 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2142 {
2143 	return revert_lock(r, lkb);
2144 }
2145 
2146 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2147 {
2148 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2149 		lkb->lkb_grmode = lkb->lkb_rqmode;
2150 		if (lkb->lkb_status)
2151 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2152 		else
2153 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2154 	}
2155 
2156 	lkb->lkb_rqmode = DLM_LOCK_IV;
2157 	lkb->lkb_highbast = 0;
2158 }
2159 
2160 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2161 {
2162 	set_lvb_lock(r, lkb);
2163 	_grant_lock(r, lkb);
2164 }
2165 
2166 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2167 			  struct dlm_message *ms)
2168 {
2169 	set_lvb_lock_pc(r, lkb, ms);
2170 	_grant_lock(r, lkb);
2171 }
2172 
2173 /* called by grant_pending_locks() which means an async grant message must
2174    be sent to the requesting node in addition to granting the lock if the
2175    lkb belongs to a remote node. */
2176 
2177 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2178 {
2179 	grant_lock(r, lkb);
2180 	if (is_master_copy(lkb))
2181 		send_grant(r, lkb);
2182 	else
2183 		queue_cast(r, lkb, 0);
2184 }
2185 
2186 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2187    change the granted/requested modes.  We're munging things accordingly in
2188    the process copy.
2189    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2190    conversion deadlock
2191    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2192    compatible with other granted locks */
2193 
2194 static void munge_demoted(struct dlm_lkb *lkb)
2195 {
2196 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2197 		log_print("munge_demoted %x invalid modes gr %d rq %d",
2198 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2199 		return;
2200 	}
2201 
2202 	lkb->lkb_grmode = DLM_LOCK_NL;
2203 }
2204 
2205 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
2206 {
2207 	if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2208 	    ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2209 		log_print("munge_altmode %x invalid reply type %d",
2210 			  lkb->lkb_id, le32_to_cpu(ms->m_type));
2211 		return;
2212 	}
2213 
2214 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2215 		lkb->lkb_rqmode = DLM_LOCK_PR;
2216 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2217 		lkb->lkb_rqmode = DLM_LOCK_CW;
2218 	else {
2219 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2220 		dlm_print_lkb(lkb);
2221 	}
2222 }
2223 
2224 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2225 {
2226 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2227 					   lkb_statequeue);
2228 	if (lkb->lkb_id == first->lkb_id)
2229 		return 1;
2230 
2231 	return 0;
2232 }
2233 
2234 /* Check if the given lkb conflicts with another lkb on the queue. */
2235 
2236 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2237 {
2238 	struct dlm_lkb *this;
2239 
2240 	list_for_each_entry(this, head, lkb_statequeue) {
2241 		if (this == lkb)
2242 			continue;
2243 		if (!modes_compat(this, lkb))
2244 			return 1;
2245 	}
2246 	return 0;
2247 }
2248 
2249 /*
2250  * "A conversion deadlock arises with a pair of lock requests in the converting
2251  * queue for one resource.  The granted mode of each lock blocks the requested
2252  * mode of the other lock."
2253  *
2254  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2255  * convert queue from being granted, then deadlk/demote lkb.
2256  *
2257  * Example:
2258  * Granted Queue: empty
2259  * Convert Queue: NL->EX (first lock)
2260  *                PR->EX (second lock)
2261  *
2262  * The first lock can't be granted because of the granted mode of the second
2263  * lock and the second lock can't be granted because it's not first in the
2264  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2265  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2266  * flag set and return DEMOTED in the lksb flags.
2267  *
2268  * Originally, this function detected conv-deadlk in a more limited scope:
2269  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2270  * - if lkb1 was the first entry in the queue (not just earlier), and was
2271  *   blocked by the granted mode of lkb2, and there was nothing on the
2272  *   granted queue preventing lkb1 from being granted immediately, i.e.
2273  *   lkb2 was the only thing preventing lkb1 from being granted.
2274  *
2275  * That second condition meant we'd only say there was conv-deadlk if
2276  * resolving it (by demotion) would lead to the first lock on the convert
2277  * queue being granted right away.  It allowed conversion deadlocks to exist
2278  * between locks on the convert queue while they couldn't be granted anyway.
2279  *
2280  * Now, we detect and take action on conversion deadlocks immediately when
2281  * they're created, even if they may not be immediately consequential.  If
2282  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2283  * mode that would prevent lkb1's conversion from being granted, we do a
2284  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2285  * I think this means that the lkb_is_ahead condition below should always
2286  * be zero, i.e. there will never be conv-deadlk between two locks that are
2287  * both already on the convert queue.
2288  */
2289 
2290 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2291 {
2292 	struct dlm_lkb *lkb1;
2293 	int lkb_is_ahead = 0;
2294 
2295 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2296 		if (lkb1 == lkb2) {
2297 			lkb_is_ahead = 1;
2298 			continue;
2299 		}
2300 
2301 		if (!lkb_is_ahead) {
2302 			if (!modes_compat(lkb2, lkb1))
2303 				return 1;
2304 		} else {
2305 			if (!modes_compat(lkb2, lkb1) &&
2306 			    !modes_compat(lkb1, lkb2))
2307 				return 1;
2308 		}
2309 	}
2310 	return 0;
2311 }
2312 
2313 /*
2314  * Return 1 if the lock can be granted, 0 otherwise.
2315  * Also detect and resolve conversion deadlocks.
2316  *
2317  * lkb is the lock to be granted
2318  *
2319  * now is 1 if the function is being called in the context of the
2320  * immediate request, it is 0 if called later, after the lock has been
2321  * queued.
2322  *
2323  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2324  * after recovery.
2325  *
2326  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2327  */
2328 
2329 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2330 			   int recover)
2331 {
2332 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2333 
2334 	/*
2335 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2336 	 * a new request for a NL mode lock being blocked.
2337 	 *
2338 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2339 	 * request, then it would be granted.  In essence, the use of this flag
2340 	 * tells the Lock Manager to expedite theis request by not considering
2341 	 * what may be in the CONVERTING or WAITING queues...  As of this
2342 	 * writing, the EXPEDITE flag can be used only with new requests for NL
2343 	 * mode locks.  This flag is not valid for conversion requests.
2344 	 *
2345 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2346 	 * conversion or used with a non-NL requested mode.  We also know an
2347 	 * EXPEDITE request is always granted immediately, so now must always
2348 	 * be 1.  The full condition to grant an expedite request: (now &&
2349 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2350 	 * therefore be shortened to just checking the flag.
2351 	 */
2352 
2353 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2354 		return 1;
2355 
2356 	/*
2357 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2358 	 * added to the remaining conditions.
2359 	 */
2360 
2361 	if (queue_conflict(&r->res_grantqueue, lkb))
2362 		return 0;
2363 
2364 	/*
2365 	 * 6-3: By default, a conversion request is immediately granted if the
2366 	 * requested mode is compatible with the modes of all other granted
2367 	 * locks
2368 	 */
2369 
2370 	if (queue_conflict(&r->res_convertqueue, lkb))
2371 		return 0;
2372 
2373 	/*
2374 	 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2375 	 * locks for a recovered rsb, on which lkb's have been rebuilt.
2376 	 * The lkb's may have been rebuilt on the queues in a different
2377 	 * order than they were in on the previous master.  So, granting
2378 	 * queued conversions in order after recovery doesn't make sense
2379 	 * since the order hasn't been preserved anyway.  The new order
2380 	 * could also have created a new "in place" conversion deadlock.
2381 	 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2382 	 * After recovery, there would be no granted locks, and possibly
2383 	 * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2384 	 * recovery, grant conversions without considering order.
2385 	 */
2386 
2387 	if (conv && recover)
2388 		return 1;
2389 
2390 	/*
2391 	 * 6-5: But the default algorithm for deciding whether to grant or
2392 	 * queue conversion requests does not by itself guarantee that such
2393 	 * requests are serviced on a "first come first serve" basis.  This, in
2394 	 * turn, can lead to a phenomenon known as "indefinate postponement".
2395 	 *
2396 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2397 	 * the system service employed to request a lock conversion.  This flag
2398 	 * forces certain conversion requests to be queued, even if they are
2399 	 * compatible with the granted modes of other locks on the same
2400 	 * resource.  Thus, the use of this flag results in conversion requests
2401 	 * being ordered on a "first come first servce" basis.
2402 	 *
2403 	 * DCT: This condition is all about new conversions being able to occur
2404 	 * "in place" while the lock remains on the granted queue (assuming
2405 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2406 	 * doesn't _have_ to go onto the convert queue where it's processed in
2407 	 * order.  The "now" variable is necessary to distinguish converts
2408 	 * being received and processed for the first time now, because once a
2409 	 * convert is moved to the conversion queue the condition below applies
2410 	 * requiring fifo granting.
2411 	 */
2412 
2413 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2414 		return 1;
2415 
2416 	/*
2417 	 * Even if the convert is compat with all granted locks,
2418 	 * QUECVT forces it behind other locks on the convert queue.
2419 	 */
2420 
2421 	if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2422 		if (list_empty(&r->res_convertqueue))
2423 			return 1;
2424 		else
2425 			return 0;
2426 	}
2427 
2428 	/*
2429 	 * The NOORDER flag is set to avoid the standard vms rules on grant
2430 	 * order.
2431 	 */
2432 
2433 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2434 		return 1;
2435 
2436 	/*
2437 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2438 	 * granted until all other conversion requests ahead of it are granted
2439 	 * and/or canceled.
2440 	 */
2441 
2442 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2443 		return 1;
2444 
2445 	/*
2446 	 * 6-4: By default, a new request is immediately granted only if all
2447 	 * three of the following conditions are satisfied when the request is
2448 	 * issued:
2449 	 * - The queue of ungranted conversion requests for the resource is
2450 	 *   empty.
2451 	 * - The queue of ungranted new requests for the resource is empty.
2452 	 * - The mode of the new request is compatible with the most
2453 	 *   restrictive mode of all granted locks on the resource.
2454 	 */
2455 
2456 	if (now && !conv && list_empty(&r->res_convertqueue) &&
2457 	    list_empty(&r->res_waitqueue))
2458 		return 1;
2459 
2460 	/*
2461 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
2462 	 * it cannot be granted until the queue of ungranted conversion
2463 	 * requests is empty, all ungranted new requests ahead of it are
2464 	 * granted and/or canceled, and it is compatible with the granted mode
2465 	 * of the most restrictive lock granted on the resource.
2466 	 */
2467 
2468 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
2469 	    first_in_list(lkb, &r->res_waitqueue))
2470 		return 1;
2471 
2472 	return 0;
2473 }
2474 
2475 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2476 			  int recover, int *err)
2477 {
2478 	int rv;
2479 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2480 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2481 
2482 	if (err)
2483 		*err = 0;
2484 
2485 	rv = _can_be_granted(r, lkb, now, recover);
2486 	if (rv)
2487 		goto out;
2488 
2489 	/*
2490 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2491 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2492 	 * cancels one of the locks.
2493 	 */
2494 
2495 	if (is_convert && can_be_queued(lkb) &&
2496 	    conversion_deadlock_detect(r, lkb)) {
2497 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2498 			lkb->lkb_grmode = DLM_LOCK_NL;
2499 			lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
2500 		} else if (err) {
2501 			*err = -EDEADLK;
2502 		} else {
2503 			log_print("can_be_granted deadlock %x now %d",
2504 				  lkb->lkb_id, now);
2505 			dlm_dump_rsb(r);
2506 		}
2507 		goto out;
2508 	}
2509 
2510 	/*
2511 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2512 	 * to grant a request in a mode other than the normal rqmode.  It's a
2513 	 * simple way to provide a big optimization to applications that can
2514 	 * use them.
2515 	 */
2516 
2517 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2518 		alt = DLM_LOCK_PR;
2519 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2520 		alt = DLM_LOCK_CW;
2521 
2522 	if (alt) {
2523 		lkb->lkb_rqmode = alt;
2524 		rv = _can_be_granted(r, lkb, now, 0);
2525 		if (rv)
2526 			lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
2527 		else
2528 			lkb->lkb_rqmode = rqmode;
2529 	}
2530  out:
2531 	return rv;
2532 }
2533 
2534 /* Returns the highest requested mode of all blocked conversions; sets
2535    cw if there's a blocked conversion to DLM_LOCK_CW. */
2536 
2537 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2538 				 unsigned int *count)
2539 {
2540 	struct dlm_lkb *lkb, *s;
2541 	int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2542 	int hi, demoted, quit, grant_restart, demote_restart;
2543 	int deadlk;
2544 
2545 	quit = 0;
2546  restart:
2547 	grant_restart = 0;
2548 	demote_restart = 0;
2549 	hi = DLM_LOCK_IV;
2550 
2551 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2552 		demoted = is_demoted(lkb);
2553 		deadlk = 0;
2554 
2555 		if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2556 			grant_lock_pending(r, lkb);
2557 			grant_restart = 1;
2558 			if (count)
2559 				(*count)++;
2560 			continue;
2561 		}
2562 
2563 		if (!demoted && is_demoted(lkb)) {
2564 			log_print("WARN: pending demoted %x node %d %s",
2565 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2566 			demote_restart = 1;
2567 			continue;
2568 		}
2569 
2570 		if (deadlk) {
2571 			/*
2572 			 * If DLM_LKB_NODLKWT flag is set and conversion
2573 			 * deadlock is detected, we request blocking AST and
2574 			 * down (or cancel) conversion.
2575 			 */
2576 			if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2577 				if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2578 					queue_bast(r, lkb, lkb->lkb_rqmode);
2579 					lkb->lkb_highbast = lkb->lkb_rqmode;
2580 				}
2581 			} else {
2582 				log_print("WARN: pending deadlock %x node %d %s",
2583 					  lkb->lkb_id, lkb->lkb_nodeid,
2584 					  r->res_name);
2585 				dlm_dump_rsb(r);
2586 			}
2587 			continue;
2588 		}
2589 
2590 		hi = max_t(int, lkb->lkb_rqmode, hi);
2591 
2592 		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2593 			*cw = 1;
2594 	}
2595 
2596 	if (grant_restart)
2597 		goto restart;
2598 	if (demote_restart && !quit) {
2599 		quit = 1;
2600 		goto restart;
2601 	}
2602 
2603 	return max_t(int, high, hi);
2604 }
2605 
2606 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2607 			      unsigned int *count)
2608 {
2609 	struct dlm_lkb *lkb, *s;
2610 
2611 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2612 		if (can_be_granted(r, lkb, 0, 0, NULL)) {
2613 			grant_lock_pending(r, lkb);
2614 			if (count)
2615 				(*count)++;
2616 		} else {
2617 			high = max_t(int, lkb->lkb_rqmode, high);
2618 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
2619 				*cw = 1;
2620 		}
2621 	}
2622 
2623 	return high;
2624 }
2625 
2626 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2627    on either the convert or waiting queue.
2628    high is the largest rqmode of all locks blocked on the convert or
2629    waiting queue. */
2630 
2631 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2632 {
2633 	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2634 		if (gr->lkb_highbast < DLM_LOCK_EX)
2635 			return 1;
2636 		return 0;
2637 	}
2638 
2639 	if (gr->lkb_highbast < high &&
2640 	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2641 		return 1;
2642 	return 0;
2643 }
2644 
2645 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2646 {
2647 	struct dlm_lkb *lkb, *s;
2648 	int high = DLM_LOCK_IV;
2649 	int cw = 0;
2650 
2651 	if (!is_master(r)) {
2652 		log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2653 		dlm_dump_rsb(r);
2654 		return;
2655 	}
2656 
2657 	high = grant_pending_convert(r, high, &cw, count);
2658 	high = grant_pending_wait(r, high, &cw, count);
2659 
2660 	if (high == DLM_LOCK_IV)
2661 		return;
2662 
2663 	/*
2664 	 * If there are locks left on the wait/convert queue then send blocking
2665 	 * ASTs to granted locks based on the largest requested mode (high)
2666 	 * found above.
2667 	 */
2668 
2669 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2670 		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2671 			if (cw && high == DLM_LOCK_PR &&
2672 			    lkb->lkb_grmode == DLM_LOCK_PR)
2673 				queue_bast(r, lkb, DLM_LOCK_CW);
2674 			else
2675 				queue_bast(r, lkb, high);
2676 			lkb->lkb_highbast = high;
2677 		}
2678 	}
2679 }
2680 
2681 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2682 {
2683 	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2684 	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2685 		if (gr->lkb_highbast < DLM_LOCK_EX)
2686 			return 1;
2687 		return 0;
2688 	}
2689 
2690 	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2691 		return 1;
2692 	return 0;
2693 }
2694 
2695 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2696 			    struct dlm_lkb *lkb)
2697 {
2698 	struct dlm_lkb *gr;
2699 
2700 	list_for_each_entry(gr, head, lkb_statequeue) {
2701 		/* skip self when sending basts to convertqueue */
2702 		if (gr == lkb)
2703 			continue;
2704 		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2705 			queue_bast(r, gr, lkb->lkb_rqmode);
2706 			gr->lkb_highbast = lkb->lkb_rqmode;
2707 		}
2708 	}
2709 }
2710 
2711 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2712 {
2713 	send_bast_queue(r, &r->res_grantqueue, lkb);
2714 }
2715 
2716 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2717 {
2718 	send_bast_queue(r, &r->res_grantqueue, lkb);
2719 	send_bast_queue(r, &r->res_convertqueue, lkb);
2720 }
2721 
2722 /* set_master(r, lkb) -- set the master nodeid of a resource
2723 
2724    The purpose of this function is to set the nodeid field in the given
2725    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2726    known, it can just be copied to the lkb and the function will return
2727    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2728    before it can be copied to the lkb.
2729 
2730    When the rsb nodeid is being looked up remotely, the initial lkb
2731    causing the lookup is kept on the ls_waiters list waiting for the
2732    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2733    on the rsb's res_lookup list until the master is verified.
2734 
2735    Return values:
2736    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2737    1: the rsb master is not available and the lkb has been placed on
2738       a wait queue
2739 */
2740 
2741 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2742 {
2743 	int our_nodeid = dlm_our_nodeid();
2744 
2745 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2746 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2747 		r->res_first_lkid = lkb->lkb_id;
2748 		lkb->lkb_nodeid = r->res_nodeid;
2749 		return 0;
2750 	}
2751 
2752 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2753 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2754 		return 1;
2755 	}
2756 
2757 	if (r->res_master_nodeid == our_nodeid) {
2758 		lkb->lkb_nodeid = 0;
2759 		return 0;
2760 	}
2761 
2762 	if (r->res_master_nodeid) {
2763 		lkb->lkb_nodeid = r->res_master_nodeid;
2764 		return 0;
2765 	}
2766 
2767 	if (dlm_dir_nodeid(r) == our_nodeid) {
2768 		/* This is a somewhat unusual case; find_rsb will usually
2769 		   have set res_master_nodeid when dir nodeid is local, but
2770 		   there are cases where we become the dir node after we've
2771 		   past find_rsb and go through _request_lock again.
2772 		   confirm_master() or process_lookup_list() needs to be
2773 		   called after this. */
2774 		log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2775 			  lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2776 			  r->res_name);
2777 		r->res_master_nodeid = our_nodeid;
2778 		r->res_nodeid = 0;
2779 		lkb->lkb_nodeid = 0;
2780 		return 0;
2781 	}
2782 
2783 	wait_pending_remove(r);
2784 
2785 	r->res_first_lkid = lkb->lkb_id;
2786 	send_lookup(r, lkb);
2787 	return 1;
2788 }
2789 
2790 static void process_lookup_list(struct dlm_rsb *r)
2791 {
2792 	struct dlm_lkb *lkb, *safe;
2793 
2794 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2795 		list_del_init(&lkb->lkb_rsb_lookup);
2796 		_request_lock(r, lkb);
2797 		schedule();
2798 	}
2799 }
2800 
2801 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2802 
2803 static void confirm_master(struct dlm_rsb *r, int error)
2804 {
2805 	struct dlm_lkb *lkb;
2806 
2807 	if (!r->res_first_lkid)
2808 		return;
2809 
2810 	switch (error) {
2811 	case 0:
2812 	case -EINPROGRESS:
2813 		r->res_first_lkid = 0;
2814 		process_lookup_list(r);
2815 		break;
2816 
2817 	case -EAGAIN:
2818 	case -EBADR:
2819 	case -ENOTBLK:
2820 		/* the remote request failed and won't be retried (it was
2821 		   a NOQUEUE, or has been canceled/unlocked); make a waiting
2822 		   lkb the first_lkid */
2823 
2824 		r->res_first_lkid = 0;
2825 
2826 		if (!list_empty(&r->res_lookup)) {
2827 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2828 					 lkb_rsb_lookup);
2829 			list_del_init(&lkb->lkb_rsb_lookup);
2830 			r->res_first_lkid = lkb->lkb_id;
2831 			_request_lock(r, lkb);
2832 		}
2833 		break;
2834 
2835 	default:
2836 		log_error(r->res_ls, "confirm_master unknown error %d", error);
2837 	}
2838 }
2839 
2840 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2841 			 int namelen, unsigned long timeout_cs,
2842 			 void (*ast) (void *astparam),
2843 			 void *astparam,
2844 			 void (*bast) (void *astparam, int mode),
2845 			 struct dlm_args *args)
2846 {
2847 	int rv = -EINVAL;
2848 
2849 	/* check for invalid arg usage */
2850 
2851 	if (mode < 0 || mode > DLM_LOCK_EX)
2852 		goto out;
2853 
2854 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2855 		goto out;
2856 
2857 	if (flags & DLM_LKF_CANCEL)
2858 		goto out;
2859 
2860 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2861 		goto out;
2862 
2863 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2864 		goto out;
2865 
2866 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2867 		goto out;
2868 
2869 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2870 		goto out;
2871 
2872 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2873 		goto out;
2874 
2875 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2876 		goto out;
2877 
2878 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2879 		goto out;
2880 
2881 	if (!ast || !lksb)
2882 		goto out;
2883 
2884 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2885 		goto out;
2886 
2887 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2888 		goto out;
2889 
2890 	/* these args will be copied to the lkb in validate_lock_args,
2891 	   it cannot be done now because when converting locks, fields in
2892 	   an active lkb cannot be modified before locking the rsb */
2893 
2894 	args->flags = flags;
2895 	args->astfn = ast;
2896 	args->astparam = astparam;
2897 	args->bastfn = bast;
2898 	args->timeout = timeout_cs;
2899 	args->mode = mode;
2900 	args->lksb = lksb;
2901 	rv = 0;
2902  out:
2903 	return rv;
2904 }
2905 
2906 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2907 {
2908 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2909  		      DLM_LKF_FORCEUNLOCK))
2910 		return -EINVAL;
2911 
2912 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2913 		return -EINVAL;
2914 
2915 	args->flags = flags;
2916 	args->astparam = astarg;
2917 	return 0;
2918 }
2919 
2920 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2921 			      struct dlm_args *args)
2922 {
2923 	int rv = -EINVAL;
2924 
2925 	if (args->flags & DLM_LKF_CONVERT) {
2926 		if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2927 			goto out;
2928 
2929 		if (args->flags & DLM_LKF_QUECVT &&
2930 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2931 			goto out;
2932 
2933 		rv = -EBUSY;
2934 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2935 			goto out;
2936 
2937 		/* lock not allowed if there's any op in progress */
2938 		if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2939 			goto out;
2940 
2941 		if (is_overlap(lkb))
2942 			goto out;
2943 	}
2944 
2945 	lkb->lkb_exflags = args->flags;
2946 	lkb->lkb_sbflags = 0;
2947 	lkb->lkb_astfn = args->astfn;
2948 	lkb->lkb_astparam = args->astparam;
2949 	lkb->lkb_bastfn = args->bastfn;
2950 	lkb->lkb_rqmode = args->mode;
2951 	lkb->lkb_lksb = args->lksb;
2952 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2953 	lkb->lkb_ownpid = (int) current->pid;
2954 	lkb->lkb_timeout_cs = args->timeout;
2955 	rv = 0;
2956  out:
2957 	if (rv)
2958 		log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2959 			  rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2960 			  lkb->lkb_status, lkb->lkb_wait_type,
2961 			  lkb->lkb_resource->res_name);
2962 	return rv;
2963 }
2964 
2965 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2966    for success */
2967 
2968 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2969    because there may be a lookup in progress and it's valid to do
2970    cancel/unlockf on it */
2971 
2972 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2973 {
2974 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2975 	int rv = -EINVAL;
2976 
2977 	if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2978 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2979 		dlm_print_lkb(lkb);
2980 		goto out;
2981 	}
2982 
2983 	/* an lkb may still exist even though the lock is EOL'ed due to a
2984 	   cancel, unlock or failed noqueue request; an app can't use these
2985 	   locks; return same error as if the lkid had not been found at all */
2986 
2987 	if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2988 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2989 		rv = -ENOENT;
2990 		goto out;
2991 	}
2992 
2993 	/* an lkb may be waiting for an rsb lookup to complete where the
2994 	   lookup was initiated by another lock */
2995 
2996 	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2997 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2998 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2999 			list_del_init(&lkb->lkb_rsb_lookup);
3000 			queue_cast(lkb->lkb_resource, lkb,
3001 				   args->flags & DLM_LKF_CANCEL ?
3002 				   -DLM_ECANCEL : -DLM_EUNLOCK);
3003 			unhold_lkb(lkb); /* undoes create_lkb() */
3004 		}
3005 		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
3006 		rv = -EBUSY;
3007 		goto out;
3008 	}
3009 
3010 	/* cancel not allowed with another cancel/unlock in progress */
3011 
3012 	if (args->flags & DLM_LKF_CANCEL) {
3013 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
3014 			goto out;
3015 
3016 		if (is_overlap(lkb))
3017 			goto out;
3018 
3019 		/* don't let scand try to do a cancel */
3020 		del_timeout(lkb);
3021 
3022 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
3023 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
3024 			rv = -EBUSY;
3025 			goto out;
3026 		}
3027 
3028 		/* there's nothing to cancel */
3029 		if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
3030 		    !lkb->lkb_wait_type) {
3031 			rv = -EBUSY;
3032 			goto out;
3033 		}
3034 
3035 		switch (lkb->lkb_wait_type) {
3036 		case DLM_MSG_LOOKUP:
3037 		case DLM_MSG_REQUEST:
3038 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
3039 			rv = -EBUSY;
3040 			goto out;
3041 		case DLM_MSG_UNLOCK:
3042 		case DLM_MSG_CANCEL:
3043 			goto out;
3044 		}
3045 		/* add_to_waiters() will set OVERLAP_CANCEL */
3046 		goto out_ok;
3047 	}
3048 
3049 	/* do we need to allow a force-unlock if there's a normal unlock
3050 	   already in progress?  in what conditions could the normal unlock
3051 	   fail such that we'd want to send a force-unlock to be sure? */
3052 
3053 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
3054 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
3055 			goto out;
3056 
3057 		if (is_overlap_unlock(lkb))
3058 			goto out;
3059 
3060 		/* don't let scand try to do a cancel */
3061 		del_timeout(lkb);
3062 
3063 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
3064 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3065 			rv = -EBUSY;
3066 			goto out;
3067 		}
3068 
3069 		switch (lkb->lkb_wait_type) {
3070 		case DLM_MSG_LOOKUP:
3071 		case DLM_MSG_REQUEST:
3072 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3073 			rv = -EBUSY;
3074 			goto out;
3075 		case DLM_MSG_UNLOCK:
3076 			goto out;
3077 		}
3078 		/* add_to_waiters() will set OVERLAP_UNLOCK */
3079 		goto out_ok;
3080 	}
3081 
3082 	/* normal unlock not allowed if there's any op in progress */
3083 	rv = -EBUSY;
3084 	if (lkb->lkb_wait_type || lkb->lkb_wait_count)
3085 		goto out;
3086 
3087  out_ok:
3088 	/* an overlapping op shouldn't blow away exflags from other op */
3089 	lkb->lkb_exflags |= args->flags;
3090 	lkb->lkb_sbflags = 0;
3091 	lkb->lkb_astparam = args->astparam;
3092 	rv = 0;
3093  out:
3094 	if (rv)
3095 		log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
3096 			  lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
3097 			  args->flags, lkb->lkb_wait_type,
3098 			  lkb->lkb_resource->res_name);
3099 	return rv;
3100 }
3101 
3102 /*
3103  * Four stage 4 varieties:
3104  * do_request(), do_convert(), do_unlock(), do_cancel()
3105  * These are called on the master node for the given lock and
3106  * from the central locking logic.
3107  */
3108 
3109 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3110 {
3111 	int error = 0;
3112 
3113 	if (can_be_granted(r, lkb, 1, 0, NULL)) {
3114 		grant_lock(r, lkb);
3115 		queue_cast(r, lkb, 0);
3116 		goto out;
3117 	}
3118 
3119 	if (can_be_queued(lkb)) {
3120 		error = -EINPROGRESS;
3121 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
3122 		add_timeout(lkb);
3123 		goto out;
3124 	}
3125 
3126 	error = -EAGAIN;
3127 	queue_cast(r, lkb, -EAGAIN);
3128  out:
3129 	return error;
3130 }
3131 
3132 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3133 			       int error)
3134 {
3135 	switch (error) {
3136 	case -EAGAIN:
3137 		if (force_blocking_asts(lkb))
3138 			send_blocking_asts_all(r, lkb);
3139 		break;
3140 	case -EINPROGRESS:
3141 		send_blocking_asts(r, lkb);
3142 		break;
3143 	}
3144 }
3145 
3146 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3147 {
3148 	int error = 0;
3149 	int deadlk = 0;
3150 
3151 	/* changing an existing lock may allow others to be granted */
3152 
3153 	if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3154 		grant_lock(r, lkb);
3155 		queue_cast(r, lkb, 0);
3156 		goto out;
3157 	}
3158 
3159 	/* can_be_granted() detected that this lock would block in a conversion
3160 	   deadlock, so we leave it on the granted queue and return EDEADLK in
3161 	   the ast for the convert. */
3162 
3163 	if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3164 		/* it's left on the granted queue */
3165 		revert_lock(r, lkb);
3166 		queue_cast(r, lkb, -EDEADLK);
3167 		error = -EDEADLK;
3168 		goto out;
3169 	}
3170 
3171 	/* is_demoted() means the can_be_granted() above set the grmode
3172 	   to NL, and left us on the granted queue.  This auto-demotion
3173 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
3174 	   now grantable.  We have to try to grant other converting locks
3175 	   before we try again to grant this one. */
3176 
3177 	if (is_demoted(lkb)) {
3178 		grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3179 		if (_can_be_granted(r, lkb, 1, 0)) {
3180 			grant_lock(r, lkb);
3181 			queue_cast(r, lkb, 0);
3182 			goto out;
3183 		}
3184 		/* else fall through and move to convert queue */
3185 	}
3186 
3187 	if (can_be_queued(lkb)) {
3188 		error = -EINPROGRESS;
3189 		del_lkb(r, lkb);
3190 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3191 		add_timeout(lkb);
3192 		goto out;
3193 	}
3194 
3195 	error = -EAGAIN;
3196 	queue_cast(r, lkb, -EAGAIN);
3197  out:
3198 	return error;
3199 }
3200 
3201 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3202 			       int error)
3203 {
3204 	switch (error) {
3205 	case 0:
3206 		grant_pending_locks(r, NULL);
3207 		/* grant_pending_locks also sends basts */
3208 		break;
3209 	case -EAGAIN:
3210 		if (force_blocking_asts(lkb))
3211 			send_blocking_asts_all(r, lkb);
3212 		break;
3213 	case -EINPROGRESS:
3214 		send_blocking_asts(r, lkb);
3215 		break;
3216 	}
3217 }
3218 
3219 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3220 {
3221 	remove_lock(r, lkb);
3222 	queue_cast(r, lkb, -DLM_EUNLOCK);
3223 	return -DLM_EUNLOCK;
3224 }
3225 
3226 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3227 			      int error)
3228 {
3229 	grant_pending_locks(r, NULL);
3230 }
3231 
3232 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3233 
3234 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3235 {
3236 	int error;
3237 
3238 	error = revert_lock(r, lkb);
3239 	if (error) {
3240 		queue_cast(r, lkb, -DLM_ECANCEL);
3241 		return -DLM_ECANCEL;
3242 	}
3243 	return 0;
3244 }
3245 
3246 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3247 			      int error)
3248 {
3249 	if (error)
3250 		grant_pending_locks(r, NULL);
3251 }
3252 
3253 /*
3254  * Four stage 3 varieties:
3255  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3256  */
3257 
3258 /* add a new lkb to a possibly new rsb, called by requesting process */
3259 
3260 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3261 {
3262 	int error;
3263 
3264 	/* set_master: sets lkb nodeid from r */
3265 
3266 	error = set_master(r, lkb);
3267 	if (error < 0)
3268 		goto out;
3269 	if (error) {
3270 		error = 0;
3271 		goto out;
3272 	}
3273 
3274 	if (is_remote(r)) {
3275 		/* receive_request() calls do_request() on remote node */
3276 		error = send_request(r, lkb);
3277 	} else {
3278 		error = do_request(r, lkb);
3279 		/* for remote locks the request_reply is sent
3280 		   between do_request and do_request_effects */
3281 		do_request_effects(r, lkb, error);
3282 	}
3283  out:
3284 	return error;
3285 }
3286 
3287 /* change some property of an existing lkb, e.g. mode */
3288 
3289 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3290 {
3291 	int error;
3292 
3293 	if (is_remote(r)) {
3294 		/* receive_convert() calls do_convert() on remote node */
3295 		error = send_convert(r, lkb);
3296 	} else {
3297 		error = do_convert(r, lkb);
3298 		/* for remote locks the convert_reply is sent
3299 		   between do_convert and do_convert_effects */
3300 		do_convert_effects(r, lkb, error);
3301 	}
3302 
3303 	return error;
3304 }
3305 
3306 /* remove an existing lkb from the granted queue */
3307 
3308 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3309 {
3310 	int error;
3311 
3312 	if (is_remote(r)) {
3313 		/* receive_unlock() calls do_unlock() on remote node */
3314 		error = send_unlock(r, lkb);
3315 	} else {
3316 		error = do_unlock(r, lkb);
3317 		/* for remote locks the unlock_reply is sent
3318 		   between do_unlock and do_unlock_effects */
3319 		do_unlock_effects(r, lkb, error);
3320 	}
3321 
3322 	return error;
3323 }
3324 
3325 /* remove an existing lkb from the convert or wait queue */
3326 
3327 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3328 {
3329 	int error;
3330 
3331 	if (is_remote(r)) {
3332 		/* receive_cancel() calls do_cancel() on remote node */
3333 		error = send_cancel(r, lkb);
3334 	} else {
3335 		error = do_cancel(r, lkb);
3336 		/* for remote locks the cancel_reply is sent
3337 		   between do_cancel and do_cancel_effects */
3338 		do_cancel_effects(r, lkb, error);
3339 	}
3340 
3341 	return error;
3342 }
3343 
3344 /*
3345  * Four stage 2 varieties:
3346  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3347  */
3348 
3349 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3350 			int len, struct dlm_args *args)
3351 {
3352 	struct dlm_rsb *r;
3353 	int error;
3354 
3355 	error = validate_lock_args(ls, lkb, args);
3356 	if (error)
3357 		return error;
3358 
3359 	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3360 	if (error)
3361 		return error;
3362 
3363 	lock_rsb(r);
3364 
3365 	attach_lkb(r, lkb);
3366 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3367 
3368 	error = _request_lock(r, lkb);
3369 
3370 	unlock_rsb(r);
3371 	put_rsb(r);
3372 	return error;
3373 }
3374 
3375 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3376 			struct dlm_args *args)
3377 {
3378 	struct dlm_rsb *r;
3379 	int error;
3380 
3381 	r = lkb->lkb_resource;
3382 
3383 	hold_rsb(r);
3384 	lock_rsb(r);
3385 
3386 	error = validate_lock_args(ls, lkb, args);
3387 	if (error)
3388 		goto out;
3389 
3390 	error = _convert_lock(r, lkb);
3391  out:
3392 	unlock_rsb(r);
3393 	put_rsb(r);
3394 	return error;
3395 }
3396 
3397 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3398 		       struct dlm_args *args)
3399 {
3400 	struct dlm_rsb *r;
3401 	int error;
3402 
3403 	r = lkb->lkb_resource;
3404 
3405 	hold_rsb(r);
3406 	lock_rsb(r);
3407 
3408 	error = validate_unlock_args(lkb, args);
3409 	if (error)
3410 		goto out;
3411 
3412 	error = _unlock_lock(r, lkb);
3413  out:
3414 	unlock_rsb(r);
3415 	put_rsb(r);
3416 	return error;
3417 }
3418 
3419 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3420 		       struct dlm_args *args)
3421 {
3422 	struct dlm_rsb *r;
3423 	int error;
3424 
3425 	r = lkb->lkb_resource;
3426 
3427 	hold_rsb(r);
3428 	lock_rsb(r);
3429 
3430 	error = validate_unlock_args(lkb, args);
3431 	if (error)
3432 		goto out;
3433 
3434 	error = _cancel_lock(r, lkb);
3435  out:
3436 	unlock_rsb(r);
3437 	put_rsb(r);
3438 	return error;
3439 }
3440 
3441 /*
3442  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3443  */
3444 
3445 int dlm_lock(dlm_lockspace_t *lockspace,
3446 	     int mode,
3447 	     struct dlm_lksb *lksb,
3448 	     uint32_t flags,
3449 	     void *name,
3450 	     unsigned int namelen,
3451 	     uint32_t parent_lkid,
3452 	     void (*ast) (void *astarg),
3453 	     void *astarg,
3454 	     void (*bast) (void *astarg, int mode))
3455 {
3456 	struct dlm_ls *ls;
3457 	struct dlm_lkb *lkb;
3458 	struct dlm_args args;
3459 	int error, convert = flags & DLM_LKF_CONVERT;
3460 
3461 	ls = dlm_find_lockspace_local(lockspace);
3462 	if (!ls)
3463 		return -EINVAL;
3464 
3465 	dlm_lock_recovery(ls);
3466 
3467 	if (convert)
3468 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
3469 	else
3470 		error = create_lkb(ls, &lkb);
3471 
3472 	if (error)
3473 		goto out;
3474 
3475 	trace_dlm_lock_start(ls, lkb, mode, flags);
3476 
3477 	error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3478 			      astarg, bast, &args);
3479 	if (error)
3480 		goto out_put;
3481 
3482 	if (convert)
3483 		error = convert_lock(ls, lkb, &args);
3484 	else
3485 		error = request_lock(ls, lkb, name, namelen, &args);
3486 
3487 	if (error == -EINPROGRESS)
3488 		error = 0;
3489  out_put:
3490 	trace_dlm_lock_end(ls, lkb, mode, flags, error);
3491 
3492 	if (convert || error)
3493 		__put_lkb(ls, lkb);
3494 	if (error == -EAGAIN || error == -EDEADLK)
3495 		error = 0;
3496  out:
3497 	dlm_unlock_recovery(ls);
3498 	dlm_put_lockspace(ls);
3499 	return error;
3500 }
3501 
3502 int dlm_unlock(dlm_lockspace_t *lockspace,
3503 	       uint32_t lkid,
3504 	       uint32_t flags,
3505 	       struct dlm_lksb *lksb,
3506 	       void *astarg)
3507 {
3508 	struct dlm_ls *ls;
3509 	struct dlm_lkb *lkb;
3510 	struct dlm_args args;
3511 	int error;
3512 
3513 	ls = dlm_find_lockspace_local(lockspace);
3514 	if (!ls)
3515 		return -EINVAL;
3516 
3517 	dlm_lock_recovery(ls);
3518 
3519 	error = find_lkb(ls, lkid, &lkb);
3520 	if (error)
3521 		goto out;
3522 
3523 	trace_dlm_unlock_start(ls, lkb, flags);
3524 
3525 	error = set_unlock_args(flags, astarg, &args);
3526 	if (error)
3527 		goto out_put;
3528 
3529 	if (flags & DLM_LKF_CANCEL)
3530 		error = cancel_lock(ls, lkb, &args);
3531 	else
3532 		error = unlock_lock(ls, lkb, &args);
3533 
3534 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3535 		error = 0;
3536 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3537 		error = 0;
3538  out_put:
3539 	trace_dlm_unlock_end(ls, lkb, flags, error);
3540 
3541 	dlm_put_lkb(lkb);
3542  out:
3543 	dlm_unlock_recovery(ls);
3544 	dlm_put_lockspace(ls);
3545 	return error;
3546 }
3547 
3548 /*
3549  * send/receive routines for remote operations and replies
3550  *
3551  * send_args
3552  * send_common
3553  * send_request			receive_request
3554  * send_convert			receive_convert
3555  * send_unlock			receive_unlock
3556  * send_cancel			receive_cancel
3557  * send_grant			receive_grant
3558  * send_bast			receive_bast
3559  * send_lookup			receive_lookup
3560  * send_remove			receive_remove
3561  *
3562  * 				send_common_reply
3563  * receive_request_reply	send_request_reply
3564  * receive_convert_reply	send_convert_reply
3565  * receive_unlock_reply		send_unlock_reply
3566  * receive_cancel_reply		send_cancel_reply
3567  * receive_lookup_reply		send_lookup_reply
3568  */
3569 
3570 static int _create_message(struct dlm_ls *ls, int mb_len,
3571 			   int to_nodeid, int mstype,
3572 			   struct dlm_message **ms_ret,
3573 			   struct dlm_mhandle **mh_ret)
3574 {
3575 	struct dlm_message *ms;
3576 	struct dlm_mhandle *mh;
3577 	char *mb;
3578 
3579 	/* get_buffer gives us a message handle (mh) that we need to
3580 	   pass into midcomms_commit and a message buffer (mb) that we
3581 	   write our data into */
3582 
3583 	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb);
3584 	if (!mh)
3585 		return -ENOBUFS;
3586 
3587 	ms = (struct dlm_message *) mb;
3588 
3589 	ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3590 	ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3591 	ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3592 	ms->m_header.h_length = cpu_to_le16(mb_len);
3593 	ms->m_header.h_cmd = DLM_MSG;
3594 
3595 	ms->m_type = cpu_to_le32(mstype);
3596 
3597 	*mh_ret = mh;
3598 	*ms_ret = ms;
3599 	return 0;
3600 }
3601 
3602 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3603 			  int to_nodeid, int mstype,
3604 			  struct dlm_message **ms_ret,
3605 			  struct dlm_mhandle **mh_ret)
3606 {
3607 	int mb_len = sizeof(struct dlm_message);
3608 
3609 	switch (mstype) {
3610 	case DLM_MSG_REQUEST:
3611 	case DLM_MSG_LOOKUP:
3612 	case DLM_MSG_REMOVE:
3613 		mb_len += r->res_length;
3614 		break;
3615 	case DLM_MSG_CONVERT:
3616 	case DLM_MSG_UNLOCK:
3617 	case DLM_MSG_REQUEST_REPLY:
3618 	case DLM_MSG_CONVERT_REPLY:
3619 	case DLM_MSG_GRANT:
3620 		if (lkb && lkb->lkb_lvbptr)
3621 			mb_len += r->res_ls->ls_lvblen;
3622 		break;
3623 	}
3624 
3625 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3626 			       ms_ret, mh_ret);
3627 }
3628 
3629 /* further lowcomms enhancements or alternate implementations may make
3630    the return value from this function useful at some point */
3631 
3632 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
3633 {
3634 	dlm_midcomms_commit_mhandle(mh);
3635 	return 0;
3636 }
3637 
3638 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3639 		      struct dlm_message *ms)
3640 {
3641 	ms->m_nodeid   = cpu_to_le32(lkb->lkb_nodeid);
3642 	ms->m_pid      = cpu_to_le32(lkb->lkb_ownpid);
3643 	ms->m_lkid     = cpu_to_le32(lkb->lkb_id);
3644 	ms->m_remid    = cpu_to_le32(lkb->lkb_remid);
3645 	ms->m_exflags  = cpu_to_le32(lkb->lkb_exflags);
3646 	ms->m_sbflags  = cpu_to_le32(lkb->lkb_sbflags);
3647 	ms->m_flags    = cpu_to_le32(lkb->lkb_flags);
3648 	ms->m_lvbseq   = cpu_to_le32(lkb->lkb_lvbseq);
3649 	ms->m_status   = cpu_to_le32(lkb->lkb_status);
3650 	ms->m_grmode   = cpu_to_le32(lkb->lkb_grmode);
3651 	ms->m_rqmode   = cpu_to_le32(lkb->lkb_rqmode);
3652 	ms->m_hash     = cpu_to_le32(r->res_hash);
3653 
3654 	/* m_result and m_bastmode are set from function args,
3655 	   not from lkb fields */
3656 
3657 	if (lkb->lkb_bastfn)
3658 		ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3659 	if (lkb->lkb_astfn)
3660 		ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3661 
3662 	/* compare with switch in create_message; send_remove() doesn't
3663 	   use send_args() */
3664 
3665 	switch (ms->m_type) {
3666 	case cpu_to_le32(DLM_MSG_REQUEST):
3667 	case cpu_to_le32(DLM_MSG_LOOKUP):
3668 		memcpy(ms->m_extra, r->res_name, r->res_length);
3669 		break;
3670 	case cpu_to_le32(DLM_MSG_CONVERT):
3671 	case cpu_to_le32(DLM_MSG_UNLOCK):
3672 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3673 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3674 	case cpu_to_le32(DLM_MSG_GRANT):
3675 		if (!lkb->lkb_lvbptr)
3676 			break;
3677 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3678 		break;
3679 	}
3680 }
3681 
3682 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3683 {
3684 	struct dlm_message *ms;
3685 	struct dlm_mhandle *mh;
3686 	int to_nodeid, error;
3687 
3688 	to_nodeid = r->res_nodeid;
3689 
3690 	error = add_to_waiters(lkb, mstype, to_nodeid);
3691 	if (error)
3692 		return error;
3693 
3694 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3695 	if (error)
3696 		goto fail;
3697 
3698 	send_args(r, lkb, ms);
3699 
3700 	error = send_message(mh, ms);
3701 	if (error)
3702 		goto fail;
3703 	return 0;
3704 
3705  fail:
3706 	remove_from_waiters(lkb, msg_reply_type(mstype));
3707 	return error;
3708 }
3709 
3710 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3711 {
3712 	return send_common(r, lkb, DLM_MSG_REQUEST);
3713 }
3714 
3715 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3716 {
3717 	int error;
3718 
3719 	error = send_common(r, lkb, DLM_MSG_CONVERT);
3720 
3721 	/* down conversions go without a reply from the master */
3722 	if (!error && down_conversion(lkb)) {
3723 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3724 		r->res_ls->ls_stub_ms.m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
3725 		r->res_ls->ls_stub_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3726 		r->res_ls->ls_stub_ms.m_result = 0;
3727 		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3728 	}
3729 
3730 	return error;
3731 }
3732 
3733 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3734    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3735    that the master is still correct. */
3736 
3737 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3738 {
3739 	return send_common(r, lkb, DLM_MSG_UNLOCK);
3740 }
3741 
3742 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3743 {
3744 	return send_common(r, lkb, DLM_MSG_CANCEL);
3745 }
3746 
3747 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3748 {
3749 	struct dlm_message *ms;
3750 	struct dlm_mhandle *mh;
3751 	int to_nodeid, error;
3752 
3753 	to_nodeid = lkb->lkb_nodeid;
3754 
3755 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3756 	if (error)
3757 		goto out;
3758 
3759 	send_args(r, lkb, ms);
3760 
3761 	ms->m_result = 0;
3762 
3763 	error = send_message(mh, ms);
3764  out:
3765 	return error;
3766 }
3767 
3768 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3769 {
3770 	struct dlm_message *ms;
3771 	struct dlm_mhandle *mh;
3772 	int to_nodeid, error;
3773 
3774 	to_nodeid = lkb->lkb_nodeid;
3775 
3776 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3777 	if (error)
3778 		goto out;
3779 
3780 	send_args(r, lkb, ms);
3781 
3782 	ms->m_bastmode = cpu_to_le32(mode);
3783 
3784 	error = send_message(mh, ms);
3785  out:
3786 	return error;
3787 }
3788 
3789 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3790 {
3791 	struct dlm_message *ms;
3792 	struct dlm_mhandle *mh;
3793 	int to_nodeid, error;
3794 
3795 	to_nodeid = dlm_dir_nodeid(r);
3796 
3797 	error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3798 	if (error)
3799 		return error;
3800 
3801 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3802 	if (error)
3803 		goto fail;
3804 
3805 	send_args(r, lkb, ms);
3806 
3807 	error = send_message(mh, ms);
3808 	if (error)
3809 		goto fail;
3810 	return 0;
3811 
3812  fail:
3813 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3814 	return error;
3815 }
3816 
3817 static int send_remove(struct dlm_rsb *r)
3818 {
3819 	struct dlm_message *ms;
3820 	struct dlm_mhandle *mh;
3821 	int to_nodeid, error;
3822 
3823 	to_nodeid = dlm_dir_nodeid(r);
3824 
3825 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3826 	if (error)
3827 		goto out;
3828 
3829 	memcpy(ms->m_extra, r->res_name, r->res_length);
3830 	ms->m_hash = cpu_to_le32(r->res_hash);
3831 
3832 	error = send_message(mh, ms);
3833  out:
3834 	return error;
3835 }
3836 
3837 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3838 			     int mstype, int rv)
3839 {
3840 	struct dlm_message *ms;
3841 	struct dlm_mhandle *mh;
3842 	int to_nodeid, error;
3843 
3844 	to_nodeid = lkb->lkb_nodeid;
3845 
3846 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3847 	if (error)
3848 		goto out;
3849 
3850 	send_args(r, lkb, ms);
3851 
3852 	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3853 
3854 	error = send_message(mh, ms);
3855  out:
3856 	return error;
3857 }
3858 
3859 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3860 {
3861 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3862 }
3863 
3864 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3865 {
3866 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3867 }
3868 
3869 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3870 {
3871 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3872 }
3873 
3874 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3875 {
3876 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3877 }
3878 
3879 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3880 			     int ret_nodeid, int rv)
3881 {
3882 	struct dlm_rsb *r = &ls->ls_stub_rsb;
3883 	struct dlm_message *ms;
3884 	struct dlm_mhandle *mh;
3885 	int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3886 
3887 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3888 	if (error)
3889 		goto out;
3890 
3891 	ms->m_lkid = ms_in->m_lkid;
3892 	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3893 	ms->m_nodeid = cpu_to_le32(ret_nodeid);
3894 
3895 	error = send_message(mh, ms);
3896  out:
3897 	return error;
3898 }
3899 
3900 /* which args we save from a received message depends heavily on the type
3901    of message, unlike the send side where we can safely send everything about
3902    the lkb for any type of message */
3903 
3904 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3905 {
3906 	lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3907 	lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags);
3908 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3909 			  (le32_to_cpu(ms->m_flags) & 0x0000FFFF);
3910 }
3911 
3912 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3913 {
3914 	if (ms->m_flags == cpu_to_le32(DLM_IFL_STUB_MS))
3915 		return;
3916 
3917 	lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags);
3918 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3919 			 (le32_to_cpu(ms->m_flags) & 0x0000FFFF);
3920 }
3921 
3922 static int receive_extralen(struct dlm_message *ms)
3923 {
3924 	return (le16_to_cpu(ms->m_header.h_length) -
3925 		sizeof(struct dlm_message));
3926 }
3927 
3928 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3929 		       struct dlm_message *ms)
3930 {
3931 	int len;
3932 
3933 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3934 		if (!lkb->lkb_lvbptr)
3935 			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3936 		if (!lkb->lkb_lvbptr)
3937 			return -ENOMEM;
3938 		len = receive_extralen(ms);
3939 		if (len > ls->ls_lvblen)
3940 			len = ls->ls_lvblen;
3941 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3942 	}
3943 	return 0;
3944 }
3945 
3946 static void fake_bastfn(void *astparam, int mode)
3947 {
3948 	log_print("fake_bastfn should not be called");
3949 }
3950 
3951 static void fake_astfn(void *astparam)
3952 {
3953 	log_print("fake_astfn should not be called");
3954 }
3955 
3956 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3957 				struct dlm_message *ms)
3958 {
3959 	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3960 	lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3961 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3962 	lkb->lkb_grmode = DLM_LOCK_IV;
3963 	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3964 
3965 	lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3966 	lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3967 
3968 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3969 		/* lkb was just created so there won't be an lvb yet */
3970 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3971 		if (!lkb->lkb_lvbptr)
3972 			return -ENOMEM;
3973 	}
3974 
3975 	return 0;
3976 }
3977 
3978 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3979 				struct dlm_message *ms)
3980 {
3981 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3982 		return -EBUSY;
3983 
3984 	if (receive_lvb(ls, lkb, ms))
3985 		return -ENOMEM;
3986 
3987 	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3988 	lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3989 
3990 	return 0;
3991 }
3992 
3993 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3994 			       struct dlm_message *ms)
3995 {
3996 	if (receive_lvb(ls, lkb, ms))
3997 		return -ENOMEM;
3998 	return 0;
3999 }
4000 
4001 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
4002    uses to send a reply and that the remote end uses to process the reply. */
4003 
4004 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
4005 {
4006 	struct dlm_lkb *lkb = &ls->ls_stub_lkb;
4007 	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4008 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4009 }
4010 
4011 /* This is called after the rsb is locked so that we can safely inspect
4012    fields in the lkb. */
4013 
4014 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
4015 {
4016 	int from = le32_to_cpu(ms->m_header.h_nodeid);
4017 	int error = 0;
4018 
4019 	/* currently mixing of user/kernel locks are not supported */
4020 	if (ms->m_flags & cpu_to_le32(DLM_IFL_USER) &&
4021 	    ~lkb->lkb_flags & DLM_IFL_USER) {
4022 		log_error(lkb->lkb_resource->res_ls,
4023 			  "got user dlm message for a kernel lock");
4024 		error = -EINVAL;
4025 		goto out;
4026 	}
4027 
4028 	switch (ms->m_type) {
4029 	case cpu_to_le32(DLM_MSG_CONVERT):
4030 	case cpu_to_le32(DLM_MSG_UNLOCK):
4031 	case cpu_to_le32(DLM_MSG_CANCEL):
4032 		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
4033 			error = -EINVAL;
4034 		break;
4035 
4036 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4037 	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4038 	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4039 	case cpu_to_le32(DLM_MSG_GRANT):
4040 	case cpu_to_le32(DLM_MSG_BAST):
4041 		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
4042 			error = -EINVAL;
4043 		break;
4044 
4045 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4046 		if (!is_process_copy(lkb))
4047 			error = -EINVAL;
4048 		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
4049 			error = -EINVAL;
4050 		break;
4051 
4052 	default:
4053 		error = -EINVAL;
4054 	}
4055 
4056 out:
4057 	if (error)
4058 		log_error(lkb->lkb_resource->res_ls,
4059 			  "ignore invalid message %d from %d %x %x %x %d",
4060 			  le32_to_cpu(ms->m_type), from, lkb->lkb_id,
4061 			  lkb->lkb_remid, lkb->lkb_flags, lkb->lkb_nodeid);
4062 	return error;
4063 }
4064 
4065 static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
4066 {
4067 	char name[DLM_RESNAME_MAXLEN + 1];
4068 	struct dlm_message *ms;
4069 	struct dlm_mhandle *mh;
4070 	struct dlm_rsb *r;
4071 	uint32_t hash, b;
4072 	int rv, dir_nodeid;
4073 
4074 	memset(name, 0, sizeof(name));
4075 	memcpy(name, ms_name, len);
4076 
4077 	hash = jhash(name, len, 0);
4078 	b = hash & (ls->ls_rsbtbl_size - 1);
4079 
4080 	dir_nodeid = dlm_hash2nodeid(ls, hash);
4081 
4082 	log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
4083 
4084 	spin_lock(&ls->ls_rsbtbl[b].lock);
4085 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4086 	if (!rv) {
4087 		spin_unlock(&ls->ls_rsbtbl[b].lock);
4088 		log_error(ls, "repeat_remove on keep %s", name);
4089 		return;
4090 	}
4091 
4092 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4093 	if (!rv) {
4094 		spin_unlock(&ls->ls_rsbtbl[b].lock);
4095 		log_error(ls, "repeat_remove on toss %s", name);
4096 		return;
4097 	}
4098 
4099 	/* use ls->remove_name2 to avoid conflict with shrink? */
4100 
4101 	spin_lock(&ls->ls_remove_spin);
4102 	ls->ls_remove_len = len;
4103 	memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
4104 	spin_unlock(&ls->ls_remove_spin);
4105 	spin_unlock(&ls->ls_rsbtbl[b].lock);
4106 
4107 	rv = _create_message(ls, sizeof(struct dlm_message) + len,
4108 			     dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
4109 	if (rv)
4110 		goto out;
4111 
4112 	memcpy(ms->m_extra, name, len);
4113 	ms->m_hash = cpu_to_le32(hash);
4114 
4115 	send_message(mh, ms);
4116 
4117 out:
4118 	spin_lock(&ls->ls_remove_spin);
4119 	ls->ls_remove_len = 0;
4120 	memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
4121 	spin_unlock(&ls->ls_remove_spin);
4122 	wake_up(&ls->ls_remove_wait);
4123 }
4124 
4125 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
4126 {
4127 	struct dlm_lkb *lkb;
4128 	struct dlm_rsb *r;
4129 	int from_nodeid;
4130 	int error, namelen = 0;
4131 
4132 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4133 
4134 	error = create_lkb(ls, &lkb);
4135 	if (error)
4136 		goto fail;
4137 
4138 	receive_flags(lkb, ms);
4139 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
4140 	error = receive_request_args(ls, lkb, ms);
4141 	if (error) {
4142 		__put_lkb(ls, lkb);
4143 		goto fail;
4144 	}
4145 
4146 	/* The dir node is the authority on whether we are the master
4147 	   for this rsb or not, so if the master sends us a request, we should
4148 	   recreate the rsb if we've destroyed it.   This race happens when we
4149 	   send a remove message to the dir node at the same time that the dir
4150 	   node sends us a request for the rsb. */
4151 
4152 	namelen = receive_extralen(ms);
4153 
4154 	error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4155 			 R_RECEIVE_REQUEST, &r);
4156 	if (error) {
4157 		__put_lkb(ls, lkb);
4158 		goto fail;
4159 	}
4160 
4161 	lock_rsb(r);
4162 
4163 	if (r->res_master_nodeid != dlm_our_nodeid()) {
4164 		error = validate_master_nodeid(ls, r, from_nodeid);
4165 		if (error) {
4166 			unlock_rsb(r);
4167 			put_rsb(r);
4168 			__put_lkb(ls, lkb);
4169 			goto fail;
4170 		}
4171 	}
4172 
4173 	attach_lkb(r, lkb);
4174 	error = do_request(r, lkb);
4175 	send_request_reply(r, lkb, error);
4176 	do_request_effects(r, lkb, error);
4177 
4178 	unlock_rsb(r);
4179 	put_rsb(r);
4180 
4181 	if (error == -EINPROGRESS)
4182 		error = 0;
4183 	if (error)
4184 		dlm_put_lkb(lkb);
4185 	return 0;
4186 
4187  fail:
4188 	/* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4189 	   and do this receive_request again from process_lookup_list once
4190 	   we get the lookup reply.  This would avoid a many repeated
4191 	   ENOTBLK request failures when the lookup reply designating us
4192 	   as master is delayed. */
4193 
4194 	/* We could repeatedly return -EBADR here if our send_remove() is
4195 	   delayed in being sent/arriving/being processed on the dir node.
4196 	   Another node would repeatedly lookup up the master, and the dir
4197 	   node would continue returning our nodeid until our send_remove
4198 	   took effect.
4199 
4200 	   We send another remove message in case our previous send_remove
4201 	   was lost/ignored/missed somehow. */
4202 
4203 	if (error != -ENOTBLK) {
4204 		log_limit(ls, "receive_request %x from %d %d",
4205 			  le32_to_cpu(ms->m_lkid), from_nodeid, error);
4206 	}
4207 
4208 	if (namelen && error == -EBADR) {
4209 		send_repeat_remove(ls, ms->m_extra, namelen);
4210 		msleep(1000);
4211 	}
4212 
4213 	setup_stub_lkb(ls, ms);
4214 	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4215 	return error;
4216 }
4217 
4218 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
4219 {
4220 	struct dlm_lkb *lkb;
4221 	struct dlm_rsb *r;
4222 	int error, reply = 1;
4223 
4224 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4225 	if (error)
4226 		goto fail;
4227 
4228 	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4229 		log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4230 			  "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4231 			  (unsigned long long)lkb->lkb_recover_seq,
4232 			  le32_to_cpu(ms->m_header.h_nodeid),
4233 			  le32_to_cpu(ms->m_lkid));
4234 		error = -ENOENT;
4235 		dlm_put_lkb(lkb);
4236 		goto fail;
4237 	}
4238 
4239 	r = lkb->lkb_resource;
4240 
4241 	hold_rsb(r);
4242 	lock_rsb(r);
4243 
4244 	error = validate_message(lkb, ms);
4245 	if (error)
4246 		goto out;
4247 
4248 	receive_flags(lkb, ms);
4249 
4250 	error = receive_convert_args(ls, lkb, ms);
4251 	if (error) {
4252 		send_convert_reply(r, lkb, error);
4253 		goto out;
4254 	}
4255 
4256 	reply = !down_conversion(lkb);
4257 
4258 	error = do_convert(r, lkb);
4259 	if (reply)
4260 		send_convert_reply(r, lkb, error);
4261 	do_convert_effects(r, lkb, error);
4262  out:
4263 	unlock_rsb(r);
4264 	put_rsb(r);
4265 	dlm_put_lkb(lkb);
4266 	return 0;
4267 
4268  fail:
4269 	setup_stub_lkb(ls, ms);
4270 	send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4271 	return error;
4272 }
4273 
4274 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
4275 {
4276 	struct dlm_lkb *lkb;
4277 	struct dlm_rsb *r;
4278 	int error;
4279 
4280 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4281 	if (error)
4282 		goto fail;
4283 
4284 	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4285 		log_error(ls, "receive_unlock %x remid %x remote %d %x",
4286 			  lkb->lkb_id, lkb->lkb_remid,
4287 			  le32_to_cpu(ms->m_header.h_nodeid),
4288 			  le32_to_cpu(ms->m_lkid));
4289 		error = -ENOENT;
4290 		dlm_put_lkb(lkb);
4291 		goto fail;
4292 	}
4293 
4294 	r = lkb->lkb_resource;
4295 
4296 	hold_rsb(r);
4297 	lock_rsb(r);
4298 
4299 	error = validate_message(lkb, ms);
4300 	if (error)
4301 		goto out;
4302 
4303 	receive_flags(lkb, ms);
4304 
4305 	error = receive_unlock_args(ls, lkb, ms);
4306 	if (error) {
4307 		send_unlock_reply(r, lkb, error);
4308 		goto out;
4309 	}
4310 
4311 	error = do_unlock(r, lkb);
4312 	send_unlock_reply(r, lkb, error);
4313 	do_unlock_effects(r, lkb, error);
4314  out:
4315 	unlock_rsb(r);
4316 	put_rsb(r);
4317 	dlm_put_lkb(lkb);
4318 	return 0;
4319 
4320  fail:
4321 	setup_stub_lkb(ls, ms);
4322 	send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4323 	return error;
4324 }
4325 
4326 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4327 {
4328 	struct dlm_lkb *lkb;
4329 	struct dlm_rsb *r;
4330 	int error;
4331 
4332 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4333 	if (error)
4334 		goto fail;
4335 
4336 	receive_flags(lkb, ms);
4337 
4338 	r = lkb->lkb_resource;
4339 
4340 	hold_rsb(r);
4341 	lock_rsb(r);
4342 
4343 	error = validate_message(lkb, ms);
4344 	if (error)
4345 		goto out;
4346 
4347 	error = do_cancel(r, lkb);
4348 	send_cancel_reply(r, lkb, error);
4349 	do_cancel_effects(r, lkb, error);
4350  out:
4351 	unlock_rsb(r);
4352 	put_rsb(r);
4353 	dlm_put_lkb(lkb);
4354 	return 0;
4355 
4356  fail:
4357 	setup_stub_lkb(ls, ms);
4358 	send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4359 	return error;
4360 }
4361 
4362 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4363 {
4364 	struct dlm_lkb *lkb;
4365 	struct dlm_rsb *r;
4366 	int error;
4367 
4368 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4369 	if (error)
4370 		return error;
4371 
4372 	r = lkb->lkb_resource;
4373 
4374 	hold_rsb(r);
4375 	lock_rsb(r);
4376 
4377 	error = validate_message(lkb, ms);
4378 	if (error)
4379 		goto out;
4380 
4381 	receive_flags_reply(lkb, ms);
4382 	if (is_altmode(lkb))
4383 		munge_altmode(lkb, ms);
4384 	grant_lock_pc(r, lkb, ms);
4385 	queue_cast(r, lkb, 0);
4386  out:
4387 	unlock_rsb(r);
4388 	put_rsb(r);
4389 	dlm_put_lkb(lkb);
4390 	return 0;
4391 }
4392 
4393 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4394 {
4395 	struct dlm_lkb *lkb;
4396 	struct dlm_rsb *r;
4397 	int error;
4398 
4399 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4400 	if (error)
4401 		return error;
4402 
4403 	r = lkb->lkb_resource;
4404 
4405 	hold_rsb(r);
4406 	lock_rsb(r);
4407 
4408 	error = validate_message(lkb, ms);
4409 	if (error)
4410 		goto out;
4411 
4412 	queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4413 	lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4414  out:
4415 	unlock_rsb(r);
4416 	put_rsb(r);
4417 	dlm_put_lkb(lkb);
4418 	return 0;
4419 }
4420 
4421 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4422 {
4423 	int len, error, ret_nodeid, from_nodeid, our_nodeid;
4424 
4425 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4426 	our_nodeid = dlm_our_nodeid();
4427 
4428 	len = receive_extralen(ms);
4429 
4430 	error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4431 				  &ret_nodeid, NULL);
4432 
4433 	/* Optimization: we're master so treat lookup as a request */
4434 	if (!error && ret_nodeid == our_nodeid) {
4435 		receive_request(ls, ms);
4436 		return;
4437 	}
4438 	send_lookup_reply(ls, ms, ret_nodeid, error);
4439 }
4440 
4441 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4442 {
4443 	char name[DLM_RESNAME_MAXLEN+1];
4444 	struct dlm_rsb *r;
4445 	uint32_t hash, b;
4446 	int rv, len, dir_nodeid, from_nodeid;
4447 
4448 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4449 
4450 	len = receive_extralen(ms);
4451 
4452 	if (len > DLM_RESNAME_MAXLEN) {
4453 		log_error(ls, "receive_remove from %d bad len %d",
4454 			  from_nodeid, len);
4455 		return;
4456 	}
4457 
4458 	dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4459 	if (dir_nodeid != dlm_our_nodeid()) {
4460 		log_error(ls, "receive_remove from %d bad nodeid %d",
4461 			  from_nodeid, dir_nodeid);
4462 		return;
4463 	}
4464 
4465 	/* Look for name on rsbtbl.toss, if it's there, kill it.
4466 	   If it's on rsbtbl.keep, it's being used, and we should ignore this
4467 	   message.  This is an expected race between the dir node sending a
4468 	   request to the master node at the same time as the master node sends
4469 	   a remove to the dir node.  The resolution to that race is for the
4470 	   dir node to ignore the remove message, and the master node to
4471 	   recreate the master rsb when it gets a request from the dir node for
4472 	   an rsb it doesn't have. */
4473 
4474 	memset(name, 0, sizeof(name));
4475 	memcpy(name, ms->m_extra, len);
4476 
4477 	hash = jhash(name, len, 0);
4478 	b = hash & (ls->ls_rsbtbl_size - 1);
4479 
4480 	spin_lock(&ls->ls_rsbtbl[b].lock);
4481 
4482 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4483 	if (rv) {
4484 		/* verify the rsb is on keep list per comment above */
4485 		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4486 		if (rv) {
4487 			/* should not happen */
4488 			log_error(ls, "receive_remove from %d not found %s",
4489 				  from_nodeid, name);
4490 			spin_unlock(&ls->ls_rsbtbl[b].lock);
4491 			return;
4492 		}
4493 		if (r->res_master_nodeid != from_nodeid) {
4494 			/* should not happen */
4495 			log_error(ls, "receive_remove keep from %d master %d",
4496 				  from_nodeid, r->res_master_nodeid);
4497 			dlm_print_rsb(r);
4498 			spin_unlock(&ls->ls_rsbtbl[b].lock);
4499 			return;
4500 		}
4501 
4502 		log_debug(ls, "receive_remove from %d master %d first %x %s",
4503 			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4504 			  name);
4505 		spin_unlock(&ls->ls_rsbtbl[b].lock);
4506 		return;
4507 	}
4508 
4509 	if (r->res_master_nodeid != from_nodeid) {
4510 		log_error(ls, "receive_remove toss from %d master %d",
4511 			  from_nodeid, r->res_master_nodeid);
4512 		dlm_print_rsb(r);
4513 		spin_unlock(&ls->ls_rsbtbl[b].lock);
4514 		return;
4515 	}
4516 
4517 	if (kref_put(&r->res_ref, kill_rsb)) {
4518 		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4519 		spin_unlock(&ls->ls_rsbtbl[b].lock);
4520 		dlm_free_rsb(r);
4521 	} else {
4522 		log_error(ls, "receive_remove from %d rsb ref error",
4523 			  from_nodeid);
4524 		dlm_print_rsb(r);
4525 		spin_unlock(&ls->ls_rsbtbl[b].lock);
4526 	}
4527 }
4528 
4529 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4530 {
4531 	do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4532 }
4533 
4534 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4535 {
4536 	struct dlm_lkb *lkb;
4537 	struct dlm_rsb *r;
4538 	int error, mstype, result;
4539 	int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4540 
4541 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4542 	if (error)
4543 		return error;
4544 
4545 	r = lkb->lkb_resource;
4546 	hold_rsb(r);
4547 	lock_rsb(r);
4548 
4549 	error = validate_message(lkb, ms);
4550 	if (error)
4551 		goto out;
4552 
4553 	mstype = lkb->lkb_wait_type;
4554 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4555 	if (error) {
4556 		log_error(ls, "receive_request_reply %x remote %d %x result %d",
4557 			  lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4558 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4559 		dlm_dump_rsb(r);
4560 		goto out;
4561 	}
4562 
4563 	/* Optimization: the dir node was also the master, so it took our
4564 	   lookup as a request and sent request reply instead of lookup reply */
4565 	if (mstype == DLM_MSG_LOOKUP) {
4566 		r->res_master_nodeid = from_nodeid;
4567 		r->res_nodeid = from_nodeid;
4568 		lkb->lkb_nodeid = from_nodeid;
4569 	}
4570 
4571 	/* this is the value returned from do_request() on the master */
4572 	result = from_dlm_errno(le32_to_cpu(ms->m_result));
4573 
4574 	switch (result) {
4575 	case -EAGAIN:
4576 		/* request would block (be queued) on remote master */
4577 		queue_cast(r, lkb, -EAGAIN);
4578 		confirm_master(r, -EAGAIN);
4579 		unhold_lkb(lkb); /* undoes create_lkb() */
4580 		break;
4581 
4582 	case -EINPROGRESS:
4583 	case 0:
4584 		/* request was queued or granted on remote master */
4585 		receive_flags_reply(lkb, ms);
4586 		lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4587 		if (is_altmode(lkb))
4588 			munge_altmode(lkb, ms);
4589 		if (result) {
4590 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
4591 			add_timeout(lkb);
4592 		} else {
4593 			grant_lock_pc(r, lkb, ms);
4594 			queue_cast(r, lkb, 0);
4595 		}
4596 		confirm_master(r, result);
4597 		break;
4598 
4599 	case -EBADR:
4600 	case -ENOTBLK:
4601 		/* find_rsb failed to find rsb or rsb wasn't master */
4602 		log_limit(ls, "receive_request_reply %x from %d %d "
4603 			  "master %d dir %d first %x %s", lkb->lkb_id,
4604 			  from_nodeid, result, r->res_master_nodeid,
4605 			  r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4606 
4607 		if (r->res_dir_nodeid != dlm_our_nodeid() &&
4608 		    r->res_master_nodeid != dlm_our_nodeid()) {
4609 			/* cause _request_lock->set_master->send_lookup */
4610 			r->res_master_nodeid = 0;
4611 			r->res_nodeid = -1;
4612 			lkb->lkb_nodeid = -1;
4613 		}
4614 
4615 		if (is_overlap(lkb)) {
4616 			/* we'll ignore error in cancel/unlock reply */
4617 			queue_cast_overlap(r, lkb);
4618 			confirm_master(r, result);
4619 			unhold_lkb(lkb); /* undoes create_lkb() */
4620 		} else {
4621 			_request_lock(r, lkb);
4622 
4623 			if (r->res_master_nodeid == dlm_our_nodeid())
4624 				confirm_master(r, 0);
4625 		}
4626 		break;
4627 
4628 	default:
4629 		log_error(ls, "receive_request_reply %x error %d",
4630 			  lkb->lkb_id, result);
4631 	}
4632 
4633 	if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
4634 		log_debug(ls, "receive_request_reply %x result %d unlock",
4635 			  lkb->lkb_id, result);
4636 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4637 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4638 		send_unlock(r, lkb);
4639 	} else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
4640 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4641 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4642 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4643 		send_cancel(r, lkb);
4644 	} else {
4645 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4646 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4647 	}
4648  out:
4649 	unlock_rsb(r);
4650 	put_rsb(r);
4651 	dlm_put_lkb(lkb);
4652 	return 0;
4653 }
4654 
4655 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4656 				    struct dlm_message *ms)
4657 {
4658 	/* this is the value returned from do_convert() on the master */
4659 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4660 	case -EAGAIN:
4661 		/* convert would block (be queued) on remote master */
4662 		queue_cast(r, lkb, -EAGAIN);
4663 		break;
4664 
4665 	case -EDEADLK:
4666 		receive_flags_reply(lkb, ms);
4667 		revert_lock_pc(r, lkb);
4668 		queue_cast(r, lkb, -EDEADLK);
4669 		break;
4670 
4671 	case -EINPROGRESS:
4672 		/* convert was queued on remote master */
4673 		receive_flags_reply(lkb, ms);
4674 		if (is_demoted(lkb))
4675 			munge_demoted(lkb);
4676 		del_lkb(r, lkb);
4677 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4678 		add_timeout(lkb);
4679 		break;
4680 
4681 	case 0:
4682 		/* convert was granted on remote master */
4683 		receive_flags_reply(lkb, ms);
4684 		if (is_demoted(lkb))
4685 			munge_demoted(lkb);
4686 		grant_lock_pc(r, lkb, ms);
4687 		queue_cast(r, lkb, 0);
4688 		break;
4689 
4690 	default:
4691 		log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4692 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4693 			  le32_to_cpu(ms->m_lkid),
4694 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4695 		dlm_print_rsb(r);
4696 		dlm_print_lkb(lkb);
4697 	}
4698 }
4699 
4700 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4701 {
4702 	struct dlm_rsb *r = lkb->lkb_resource;
4703 	int error;
4704 
4705 	hold_rsb(r);
4706 	lock_rsb(r);
4707 
4708 	error = validate_message(lkb, ms);
4709 	if (error)
4710 		goto out;
4711 
4712 	/* stub reply can happen with waiters_mutex held */
4713 	error = remove_from_waiters_ms(lkb, ms);
4714 	if (error)
4715 		goto out;
4716 
4717 	__receive_convert_reply(r, lkb, ms);
4718  out:
4719 	unlock_rsb(r);
4720 	put_rsb(r);
4721 }
4722 
4723 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4724 {
4725 	struct dlm_lkb *lkb;
4726 	int error;
4727 
4728 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4729 	if (error)
4730 		return error;
4731 
4732 	_receive_convert_reply(lkb, ms);
4733 	dlm_put_lkb(lkb);
4734 	return 0;
4735 }
4736 
4737 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4738 {
4739 	struct dlm_rsb *r = lkb->lkb_resource;
4740 	int error;
4741 
4742 	hold_rsb(r);
4743 	lock_rsb(r);
4744 
4745 	error = validate_message(lkb, ms);
4746 	if (error)
4747 		goto out;
4748 
4749 	/* stub reply can happen with waiters_mutex held */
4750 	error = remove_from_waiters_ms(lkb, ms);
4751 	if (error)
4752 		goto out;
4753 
4754 	/* this is the value returned from do_unlock() on the master */
4755 
4756 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4757 	case -DLM_EUNLOCK:
4758 		receive_flags_reply(lkb, ms);
4759 		remove_lock_pc(r, lkb);
4760 		queue_cast(r, lkb, -DLM_EUNLOCK);
4761 		break;
4762 	case -ENOENT:
4763 		break;
4764 	default:
4765 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
4766 			  lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4767 	}
4768  out:
4769 	unlock_rsb(r);
4770 	put_rsb(r);
4771 }
4772 
4773 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4774 {
4775 	struct dlm_lkb *lkb;
4776 	int error;
4777 
4778 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4779 	if (error)
4780 		return error;
4781 
4782 	_receive_unlock_reply(lkb, ms);
4783 	dlm_put_lkb(lkb);
4784 	return 0;
4785 }
4786 
4787 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4788 {
4789 	struct dlm_rsb *r = lkb->lkb_resource;
4790 	int error;
4791 
4792 	hold_rsb(r);
4793 	lock_rsb(r);
4794 
4795 	error = validate_message(lkb, ms);
4796 	if (error)
4797 		goto out;
4798 
4799 	/* stub reply can happen with waiters_mutex held */
4800 	error = remove_from_waiters_ms(lkb, ms);
4801 	if (error)
4802 		goto out;
4803 
4804 	/* this is the value returned from do_cancel() on the master */
4805 
4806 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4807 	case -DLM_ECANCEL:
4808 		receive_flags_reply(lkb, ms);
4809 		revert_lock_pc(r, lkb);
4810 		queue_cast(r, lkb, -DLM_ECANCEL);
4811 		break;
4812 	case 0:
4813 		break;
4814 	default:
4815 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
4816 			  lkb->lkb_id,
4817 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4818 	}
4819  out:
4820 	unlock_rsb(r);
4821 	put_rsb(r);
4822 }
4823 
4824 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4825 {
4826 	struct dlm_lkb *lkb;
4827 	int error;
4828 
4829 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4830 	if (error)
4831 		return error;
4832 
4833 	_receive_cancel_reply(lkb, ms);
4834 	dlm_put_lkb(lkb);
4835 	return 0;
4836 }
4837 
4838 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4839 {
4840 	struct dlm_lkb *lkb;
4841 	struct dlm_rsb *r;
4842 	int error, ret_nodeid;
4843 	int do_lookup_list = 0;
4844 
4845 	error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4846 	if (error) {
4847 		log_error(ls, "%s no lkid %x", __func__,
4848 			  le32_to_cpu(ms->m_lkid));
4849 		return;
4850 	}
4851 
4852 	/* ms->m_result is the value returned by dlm_master_lookup on dir node
4853 	   FIXME: will a non-zero error ever be returned? */
4854 
4855 	r = lkb->lkb_resource;
4856 	hold_rsb(r);
4857 	lock_rsb(r);
4858 
4859 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4860 	if (error)
4861 		goto out;
4862 
4863 	ret_nodeid = le32_to_cpu(ms->m_nodeid);
4864 
4865 	/* We sometimes receive a request from the dir node for this
4866 	   rsb before we've received the dir node's loookup_reply for it.
4867 	   The request from the dir node implies we're the master, so we set
4868 	   ourself as master in receive_request_reply, and verify here that
4869 	   we are indeed the master. */
4870 
4871 	if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4872 		/* This should never happen */
4873 		log_error(ls, "receive_lookup_reply %x from %d ret %d "
4874 			  "master %d dir %d our %d first %x %s",
4875 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4876 			  ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4877 			  dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4878 	}
4879 
4880 	if (ret_nodeid == dlm_our_nodeid()) {
4881 		r->res_master_nodeid = ret_nodeid;
4882 		r->res_nodeid = 0;
4883 		do_lookup_list = 1;
4884 		r->res_first_lkid = 0;
4885 	} else if (ret_nodeid == -1) {
4886 		/* the remote node doesn't believe it's the dir node */
4887 		log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4888 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4889 		r->res_master_nodeid = 0;
4890 		r->res_nodeid = -1;
4891 		lkb->lkb_nodeid = -1;
4892 	} else {
4893 		/* set_master() will set lkb_nodeid from r */
4894 		r->res_master_nodeid = ret_nodeid;
4895 		r->res_nodeid = ret_nodeid;
4896 	}
4897 
4898 	if (is_overlap(lkb)) {
4899 		log_debug(ls, "receive_lookup_reply %x unlock %x",
4900 			  lkb->lkb_id, lkb->lkb_flags);
4901 		queue_cast_overlap(r, lkb);
4902 		unhold_lkb(lkb); /* undoes create_lkb() */
4903 		goto out_list;
4904 	}
4905 
4906 	_request_lock(r, lkb);
4907 
4908  out_list:
4909 	if (do_lookup_list)
4910 		process_lookup_list(r);
4911  out:
4912 	unlock_rsb(r);
4913 	put_rsb(r);
4914 	dlm_put_lkb(lkb);
4915 }
4916 
4917 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4918 			     uint32_t saved_seq)
4919 {
4920 	int error = 0, noent = 0;
4921 
4922 	if (!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid))) {
4923 		log_limit(ls, "receive %d from non-member %d %x %x %d",
4924 			  le32_to_cpu(ms->m_type),
4925 			  le32_to_cpu(ms->m_header.h_nodeid),
4926 			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4927 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4928 		return;
4929 	}
4930 
4931 	switch (ms->m_type) {
4932 
4933 	/* messages sent to a master node */
4934 
4935 	case cpu_to_le32(DLM_MSG_REQUEST):
4936 		error = receive_request(ls, ms);
4937 		break;
4938 
4939 	case cpu_to_le32(DLM_MSG_CONVERT):
4940 		error = receive_convert(ls, ms);
4941 		break;
4942 
4943 	case cpu_to_le32(DLM_MSG_UNLOCK):
4944 		error = receive_unlock(ls, ms);
4945 		break;
4946 
4947 	case cpu_to_le32(DLM_MSG_CANCEL):
4948 		noent = 1;
4949 		error = receive_cancel(ls, ms);
4950 		break;
4951 
4952 	/* messages sent from a master node (replies to above) */
4953 
4954 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4955 		error = receive_request_reply(ls, ms);
4956 		break;
4957 
4958 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4959 		error = receive_convert_reply(ls, ms);
4960 		break;
4961 
4962 	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4963 		error = receive_unlock_reply(ls, ms);
4964 		break;
4965 
4966 	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4967 		error = receive_cancel_reply(ls, ms);
4968 		break;
4969 
4970 	/* messages sent from a master node (only two types of async msg) */
4971 
4972 	case cpu_to_le32(DLM_MSG_GRANT):
4973 		noent = 1;
4974 		error = receive_grant(ls, ms);
4975 		break;
4976 
4977 	case cpu_to_le32(DLM_MSG_BAST):
4978 		noent = 1;
4979 		error = receive_bast(ls, ms);
4980 		break;
4981 
4982 	/* messages sent to a dir node */
4983 
4984 	case cpu_to_le32(DLM_MSG_LOOKUP):
4985 		receive_lookup(ls, ms);
4986 		break;
4987 
4988 	case cpu_to_le32(DLM_MSG_REMOVE):
4989 		receive_remove(ls, ms);
4990 		break;
4991 
4992 	/* messages sent from a dir node (remove has no reply) */
4993 
4994 	case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4995 		receive_lookup_reply(ls, ms);
4996 		break;
4997 
4998 	/* other messages */
4999 
5000 	case cpu_to_le32(DLM_MSG_PURGE):
5001 		receive_purge(ls, ms);
5002 		break;
5003 
5004 	default:
5005 		log_error(ls, "unknown message type %d",
5006 			  le32_to_cpu(ms->m_type));
5007 	}
5008 
5009 	/*
5010 	 * When checking for ENOENT, we're checking the result of
5011 	 * find_lkb(m_remid):
5012 	 *
5013 	 * The lock id referenced in the message wasn't found.  This may
5014 	 * happen in normal usage for the async messages and cancel, so
5015 	 * only use log_debug for them.
5016 	 *
5017 	 * Some errors are expected and normal.
5018 	 */
5019 
5020 	if (error == -ENOENT && noent) {
5021 		log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
5022 			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
5023 			  le32_to_cpu(ms->m_header.h_nodeid),
5024 			  le32_to_cpu(ms->m_lkid), saved_seq);
5025 	} else if (error == -ENOENT) {
5026 		log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
5027 			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
5028 			  le32_to_cpu(ms->m_header.h_nodeid),
5029 			  le32_to_cpu(ms->m_lkid), saved_seq);
5030 
5031 		if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
5032 			dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
5033 	}
5034 
5035 	if (error == -EINVAL) {
5036 		log_error(ls, "receive %d inval from %d lkid %x remid %x "
5037 			  "saved_seq %u",
5038 			  le32_to_cpu(ms->m_type),
5039 			  le32_to_cpu(ms->m_header.h_nodeid),
5040 			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
5041 			  saved_seq);
5042 	}
5043 }
5044 
5045 /* If the lockspace is in recovery mode (locking stopped), then normal
5046    messages are saved on the requestqueue for processing after recovery is
5047    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
5048    messages off the requestqueue before we process new ones. This occurs right
5049    after recovery completes when we transition from saving all messages on
5050    requestqueue, to processing all the saved messages, to processing new
5051    messages as they arrive. */
5052 
5053 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
5054 				int nodeid)
5055 {
5056 	if (dlm_locking_stopped(ls)) {
5057 		/* If we were a member of this lockspace, left, and rejoined,
5058 		   other nodes may still be sending us messages from the
5059 		   lockspace generation before we left. */
5060 		if (!ls->ls_generation) {
5061 			log_limit(ls, "receive %d from %d ignore old gen",
5062 				  le32_to_cpu(ms->m_type), nodeid);
5063 			return;
5064 		}
5065 
5066 		dlm_add_requestqueue(ls, nodeid, ms);
5067 	} else {
5068 		dlm_wait_requestqueue(ls);
5069 		_receive_message(ls, ms, 0);
5070 	}
5071 }
5072 
5073 /* This is called by dlm_recoverd to process messages that were saved on
5074    the requestqueue. */
5075 
5076 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
5077 			       uint32_t saved_seq)
5078 {
5079 	_receive_message(ls, ms, saved_seq);
5080 }
5081 
5082 /* This is called by the midcomms layer when something is received for
5083    the lockspace.  It could be either a MSG (normal message sent as part of
5084    standard locking activity) or an RCOM (recovery message sent as part of
5085    lockspace recovery). */
5086 
5087 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
5088 {
5089 	struct dlm_header *hd = &p->header;
5090 	struct dlm_ls *ls;
5091 	int type = 0;
5092 
5093 	switch (hd->h_cmd) {
5094 	case DLM_MSG:
5095 		type = le32_to_cpu(p->message.m_type);
5096 		break;
5097 	case DLM_RCOM:
5098 		type = le32_to_cpu(p->rcom.rc_type);
5099 		break;
5100 	default:
5101 		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
5102 		return;
5103 	}
5104 
5105 	if (le32_to_cpu(hd->h_nodeid) != nodeid) {
5106 		log_print("invalid h_nodeid %d from %d lockspace %x",
5107 			  le32_to_cpu(hd->h_nodeid), nodeid,
5108 			  le32_to_cpu(hd->u.h_lockspace));
5109 		return;
5110 	}
5111 
5112 	ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
5113 	if (!ls) {
5114 		if (dlm_config.ci_log_debug) {
5115 			printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
5116 				"%u from %d cmd %d type %d\n",
5117 				le32_to_cpu(hd->u.h_lockspace), nodeid,
5118 				hd->h_cmd, type);
5119 		}
5120 
5121 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
5122 			dlm_send_ls_not_ready(nodeid, &p->rcom);
5123 		return;
5124 	}
5125 
5126 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
5127 	   be inactive (in this ls) before transitioning to recovery mode */
5128 
5129 	down_read(&ls->ls_recv_active);
5130 	if (hd->h_cmd == DLM_MSG)
5131 		dlm_receive_message(ls, &p->message, nodeid);
5132 	else
5133 		dlm_receive_rcom(ls, &p->rcom, nodeid);
5134 	up_read(&ls->ls_recv_active);
5135 
5136 	dlm_put_lockspace(ls);
5137 }
5138 
5139 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5140 				   struct dlm_message *ms_stub)
5141 {
5142 	if (middle_conversion(lkb)) {
5143 		hold_lkb(lkb);
5144 		memset(ms_stub, 0, sizeof(struct dlm_message));
5145 		ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
5146 		ms_stub->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
5147 		ms_stub->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
5148 		ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5149 		_receive_convert_reply(lkb, ms_stub);
5150 
5151 		/* Same special case as in receive_rcom_lock_args() */
5152 		lkb->lkb_grmode = DLM_LOCK_IV;
5153 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5154 		unhold_lkb(lkb);
5155 
5156 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5157 		lkb->lkb_flags |= DLM_IFL_RESEND;
5158 	}
5159 
5160 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5161 	   conversions are async; there's no reply from the remote master */
5162 }
5163 
5164 /* A waiting lkb needs recovery if the master node has failed, or
5165    the master node is changing (only when no directory is used) */
5166 
5167 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5168 				 int dir_nodeid)
5169 {
5170 	if (dlm_no_directory(ls))
5171 		return 1;
5172 
5173 	if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5174 		return 1;
5175 
5176 	return 0;
5177 }
5178 
5179 /* Recovery for locks that are waiting for replies from nodes that are now
5180    gone.  We can just complete unlocks and cancels by faking a reply from the
5181    dead node.  Requests and up-conversions we flag to be resent after
5182    recovery.  Down-conversions can just be completed with a fake reply like
5183    unlocks.  Conversions between PR and CW need special attention. */
5184 
5185 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5186 {
5187 	struct dlm_lkb *lkb, *safe;
5188 	struct dlm_message *ms_stub;
5189 	int wait_type, stub_unlock_result, stub_cancel_result;
5190 	int dir_nodeid;
5191 
5192 	ms_stub = kmalloc(sizeof(*ms_stub), GFP_KERNEL);
5193 	if (!ms_stub)
5194 		return;
5195 
5196 	mutex_lock(&ls->ls_waiters_mutex);
5197 
5198 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5199 
5200 		dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5201 
5202 		/* exclude debug messages about unlocks because there can be so
5203 		   many and they aren't very interesting */
5204 
5205 		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5206 			log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5207 				  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5208 				  lkb->lkb_id,
5209 				  lkb->lkb_remid,
5210 				  lkb->lkb_wait_type,
5211 				  lkb->lkb_resource->res_nodeid,
5212 				  lkb->lkb_nodeid,
5213 				  lkb->lkb_wait_nodeid,
5214 				  dir_nodeid);
5215 		}
5216 
5217 		/* all outstanding lookups, regardless of destination  will be
5218 		   resent after recovery is done */
5219 
5220 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5221 			lkb->lkb_flags |= DLM_IFL_RESEND;
5222 			continue;
5223 		}
5224 
5225 		if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5226 			continue;
5227 
5228 		wait_type = lkb->lkb_wait_type;
5229 		stub_unlock_result = -DLM_EUNLOCK;
5230 		stub_cancel_result = -DLM_ECANCEL;
5231 
5232 		/* Main reply may have been received leaving a zero wait_type,
5233 		   but a reply for the overlapping op may not have been
5234 		   received.  In that case we need to fake the appropriate
5235 		   reply for the overlap op. */
5236 
5237 		if (!wait_type) {
5238 			if (is_overlap_cancel(lkb)) {
5239 				wait_type = DLM_MSG_CANCEL;
5240 				if (lkb->lkb_grmode == DLM_LOCK_IV)
5241 					stub_cancel_result = 0;
5242 			}
5243 			if (is_overlap_unlock(lkb)) {
5244 				wait_type = DLM_MSG_UNLOCK;
5245 				if (lkb->lkb_grmode == DLM_LOCK_IV)
5246 					stub_unlock_result = -ENOENT;
5247 			}
5248 
5249 			log_debug(ls, "rwpre overlap %x %x %d %d %d",
5250 				  lkb->lkb_id, lkb->lkb_flags, wait_type,
5251 				  stub_cancel_result, stub_unlock_result);
5252 		}
5253 
5254 		switch (wait_type) {
5255 
5256 		case DLM_MSG_REQUEST:
5257 			lkb->lkb_flags |= DLM_IFL_RESEND;
5258 			break;
5259 
5260 		case DLM_MSG_CONVERT:
5261 			recover_convert_waiter(ls, lkb, ms_stub);
5262 			break;
5263 
5264 		case DLM_MSG_UNLOCK:
5265 			hold_lkb(lkb);
5266 			memset(ms_stub, 0, sizeof(struct dlm_message));
5267 			ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
5268 			ms_stub->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5269 			ms_stub->m_result = cpu_to_le32(to_dlm_errno(stub_unlock_result));
5270 			ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5271 			_receive_unlock_reply(lkb, ms_stub);
5272 			dlm_put_lkb(lkb);
5273 			break;
5274 
5275 		case DLM_MSG_CANCEL:
5276 			hold_lkb(lkb);
5277 			memset(ms_stub, 0, sizeof(struct dlm_message));
5278 			ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
5279 			ms_stub->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5280 			ms_stub->m_result = cpu_to_le32(to_dlm_errno(stub_cancel_result));
5281 			ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5282 			_receive_cancel_reply(lkb, ms_stub);
5283 			dlm_put_lkb(lkb);
5284 			break;
5285 
5286 		default:
5287 			log_error(ls, "invalid lkb wait_type %d %d",
5288 				  lkb->lkb_wait_type, wait_type);
5289 		}
5290 		schedule();
5291 	}
5292 	mutex_unlock(&ls->ls_waiters_mutex);
5293 	kfree(ms_stub);
5294 }
5295 
5296 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5297 {
5298 	struct dlm_lkb *lkb = NULL, *iter;
5299 
5300 	mutex_lock(&ls->ls_waiters_mutex);
5301 	list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5302 		if (iter->lkb_flags & DLM_IFL_RESEND) {
5303 			hold_lkb(iter);
5304 			lkb = iter;
5305 			break;
5306 		}
5307 	}
5308 	mutex_unlock(&ls->ls_waiters_mutex);
5309 
5310 	return lkb;
5311 }
5312 
5313 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
5314    master or dir-node for r.  Processing the lkb may result in it being placed
5315    back on waiters. */
5316 
5317 /* We do this after normal locking has been enabled and any saved messages
5318    (in requestqueue) have been processed.  We should be confident that at
5319    this point we won't get or process a reply to any of these waiting
5320    operations.  But, new ops may be coming in on the rsbs/locks here from
5321    userspace or remotely. */
5322 
5323 /* there may have been an overlap unlock/cancel prior to recovery or after
5324    recovery.  if before, the lkb may still have a pos wait_count; if after, the
5325    overlap flag would just have been set and nothing new sent.  we can be
5326    confident here than any replies to either the initial op or overlap ops
5327    prior to recovery have been received. */
5328 
5329 int dlm_recover_waiters_post(struct dlm_ls *ls)
5330 {
5331 	struct dlm_lkb *lkb;
5332 	struct dlm_rsb *r;
5333 	int error = 0, mstype, err, oc, ou;
5334 
5335 	while (1) {
5336 		if (dlm_locking_stopped(ls)) {
5337 			log_debug(ls, "recover_waiters_post aborted");
5338 			error = -EINTR;
5339 			break;
5340 		}
5341 
5342 		lkb = find_resend_waiter(ls);
5343 		if (!lkb)
5344 			break;
5345 
5346 		r = lkb->lkb_resource;
5347 		hold_rsb(r);
5348 		lock_rsb(r);
5349 
5350 		mstype = lkb->lkb_wait_type;
5351 		oc = is_overlap_cancel(lkb);
5352 		ou = is_overlap_unlock(lkb);
5353 		err = 0;
5354 
5355 		log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5356 			  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5357 			  "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5358 			  r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5359 			  dlm_dir_nodeid(r), oc, ou);
5360 
5361 		/* At this point we assume that we won't get a reply to any
5362 		   previous op or overlap op on this lock.  First, do a big
5363 		   remove_from_waiters() for all previous ops. */
5364 
5365 		lkb->lkb_flags &= ~DLM_IFL_RESEND;
5366 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
5367 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
5368 		lkb->lkb_wait_type = 0;
5369 		/* drop all wait_count references we still
5370 		 * hold a reference for this iteration.
5371 		 */
5372 		while (lkb->lkb_wait_count) {
5373 			lkb->lkb_wait_count--;
5374 			unhold_lkb(lkb);
5375 		}
5376 		mutex_lock(&ls->ls_waiters_mutex);
5377 		list_del_init(&lkb->lkb_wait_reply);
5378 		mutex_unlock(&ls->ls_waiters_mutex);
5379 
5380 		if (oc || ou) {
5381 			/* do an unlock or cancel instead of resending */
5382 			switch (mstype) {
5383 			case DLM_MSG_LOOKUP:
5384 			case DLM_MSG_REQUEST:
5385 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5386 							-DLM_ECANCEL);
5387 				unhold_lkb(lkb); /* undoes create_lkb() */
5388 				break;
5389 			case DLM_MSG_CONVERT:
5390 				if (oc) {
5391 					queue_cast(r, lkb, -DLM_ECANCEL);
5392 				} else {
5393 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5394 					_unlock_lock(r, lkb);
5395 				}
5396 				break;
5397 			default:
5398 				err = 1;
5399 			}
5400 		} else {
5401 			switch (mstype) {
5402 			case DLM_MSG_LOOKUP:
5403 			case DLM_MSG_REQUEST:
5404 				_request_lock(r, lkb);
5405 				if (is_master(r))
5406 					confirm_master(r, 0);
5407 				break;
5408 			case DLM_MSG_CONVERT:
5409 				_convert_lock(r, lkb);
5410 				break;
5411 			default:
5412 				err = 1;
5413 			}
5414 		}
5415 
5416 		if (err) {
5417 			log_error(ls, "waiter %x msg %d r_nodeid %d "
5418 				  "dir_nodeid %d overlap %d %d",
5419 				  lkb->lkb_id, mstype, r->res_nodeid,
5420 				  dlm_dir_nodeid(r), oc, ou);
5421 		}
5422 		unlock_rsb(r);
5423 		put_rsb(r);
5424 		dlm_put_lkb(lkb);
5425 	}
5426 
5427 	return error;
5428 }
5429 
5430 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5431 			      struct list_head *list)
5432 {
5433 	struct dlm_lkb *lkb, *safe;
5434 
5435 	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5436 		if (!is_master_copy(lkb))
5437 			continue;
5438 
5439 		/* don't purge lkbs we've added in recover_master_copy for
5440 		   the current recovery seq */
5441 
5442 		if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5443 			continue;
5444 
5445 		del_lkb(r, lkb);
5446 
5447 		/* this put should free the lkb */
5448 		if (!dlm_put_lkb(lkb))
5449 			log_error(ls, "purged mstcpy lkb not released");
5450 	}
5451 }
5452 
5453 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5454 {
5455 	struct dlm_ls *ls = r->res_ls;
5456 
5457 	purge_mstcpy_list(ls, r, &r->res_grantqueue);
5458 	purge_mstcpy_list(ls, r, &r->res_convertqueue);
5459 	purge_mstcpy_list(ls, r, &r->res_waitqueue);
5460 }
5461 
5462 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5463 			    struct list_head *list,
5464 			    int nodeid_gone, unsigned int *count)
5465 {
5466 	struct dlm_lkb *lkb, *safe;
5467 
5468 	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5469 		if (!is_master_copy(lkb))
5470 			continue;
5471 
5472 		if ((lkb->lkb_nodeid == nodeid_gone) ||
5473 		    dlm_is_removed(ls, lkb->lkb_nodeid)) {
5474 
5475 			/* tell recover_lvb to invalidate the lvb
5476 			   because a node holding EX/PW failed */
5477 			if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5478 			    (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5479 				rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5480 			}
5481 
5482 			del_lkb(r, lkb);
5483 
5484 			/* this put should free the lkb */
5485 			if (!dlm_put_lkb(lkb))
5486 				log_error(ls, "purged dead lkb not released");
5487 
5488 			rsb_set_flag(r, RSB_RECOVER_GRANT);
5489 
5490 			(*count)++;
5491 		}
5492 	}
5493 }
5494 
5495 /* Get rid of locks held by nodes that are gone. */
5496 
5497 void dlm_recover_purge(struct dlm_ls *ls)
5498 {
5499 	struct dlm_rsb *r;
5500 	struct dlm_member *memb;
5501 	int nodes_count = 0;
5502 	int nodeid_gone = 0;
5503 	unsigned int lkb_count = 0;
5504 
5505 	/* cache one removed nodeid to optimize the common
5506 	   case of a single node removed */
5507 
5508 	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5509 		nodes_count++;
5510 		nodeid_gone = memb->nodeid;
5511 	}
5512 
5513 	if (!nodes_count)
5514 		return;
5515 
5516 	down_write(&ls->ls_root_sem);
5517 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5518 		hold_rsb(r);
5519 		lock_rsb(r);
5520 		if (is_master(r)) {
5521 			purge_dead_list(ls, r, &r->res_grantqueue,
5522 					nodeid_gone, &lkb_count);
5523 			purge_dead_list(ls, r, &r->res_convertqueue,
5524 					nodeid_gone, &lkb_count);
5525 			purge_dead_list(ls, r, &r->res_waitqueue,
5526 					nodeid_gone, &lkb_count);
5527 		}
5528 		unlock_rsb(r);
5529 		unhold_rsb(r);
5530 		cond_resched();
5531 	}
5532 	up_write(&ls->ls_root_sem);
5533 
5534 	if (lkb_count)
5535 		log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5536 			  lkb_count, nodes_count);
5537 }
5538 
5539 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5540 {
5541 	struct rb_node *n;
5542 	struct dlm_rsb *r;
5543 
5544 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
5545 	for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5546 		r = rb_entry(n, struct dlm_rsb, res_hashnode);
5547 
5548 		if (!rsb_flag(r, RSB_RECOVER_GRANT))
5549 			continue;
5550 		if (!is_master(r)) {
5551 			rsb_clear_flag(r, RSB_RECOVER_GRANT);
5552 			continue;
5553 		}
5554 		hold_rsb(r);
5555 		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5556 		return r;
5557 	}
5558 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5559 	return NULL;
5560 }
5561 
5562 /*
5563  * Attempt to grant locks on resources that we are the master of.
5564  * Locks may have become grantable during recovery because locks
5565  * from departed nodes have been purged (or not rebuilt), allowing
5566  * previously blocked locks to now be granted.  The subset of rsb's
5567  * we are interested in are those with lkb's on either the convert or
5568  * waiting queues.
5569  *
5570  * Simplest would be to go through each master rsb and check for non-empty
5571  * convert or waiting queues, and attempt to grant on those rsbs.
5572  * Checking the queues requires lock_rsb, though, for which we'd need
5573  * to release the rsbtbl lock.  This would make iterating through all
5574  * rsb's very inefficient.  So, we rely on earlier recovery routines
5575  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5576  * locks for.
5577  */
5578 
5579 void dlm_recover_grant(struct dlm_ls *ls)
5580 {
5581 	struct dlm_rsb *r;
5582 	int bucket = 0;
5583 	unsigned int count = 0;
5584 	unsigned int rsb_count = 0;
5585 	unsigned int lkb_count = 0;
5586 
5587 	while (1) {
5588 		r = find_grant_rsb(ls, bucket);
5589 		if (!r) {
5590 			if (bucket == ls->ls_rsbtbl_size - 1)
5591 				break;
5592 			bucket++;
5593 			continue;
5594 		}
5595 		rsb_count++;
5596 		count = 0;
5597 		lock_rsb(r);
5598 		/* the RECOVER_GRANT flag is checked in the grant path */
5599 		grant_pending_locks(r, &count);
5600 		rsb_clear_flag(r, RSB_RECOVER_GRANT);
5601 		lkb_count += count;
5602 		confirm_master(r, 0);
5603 		unlock_rsb(r);
5604 		put_rsb(r);
5605 		cond_resched();
5606 	}
5607 
5608 	if (lkb_count)
5609 		log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5610 			  lkb_count, rsb_count);
5611 }
5612 
5613 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5614 					 uint32_t remid)
5615 {
5616 	struct dlm_lkb *lkb;
5617 
5618 	list_for_each_entry(lkb, head, lkb_statequeue) {
5619 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5620 			return lkb;
5621 	}
5622 	return NULL;
5623 }
5624 
5625 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5626 				    uint32_t remid)
5627 {
5628 	struct dlm_lkb *lkb;
5629 
5630 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5631 	if (lkb)
5632 		return lkb;
5633 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5634 	if (lkb)
5635 		return lkb;
5636 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5637 	if (lkb)
5638 		return lkb;
5639 	return NULL;
5640 }
5641 
5642 /* needs at least dlm_rcom + rcom_lock */
5643 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5644 				  struct dlm_rsb *r, struct dlm_rcom *rc)
5645 {
5646 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5647 
5648 	lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5649 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5650 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5651 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5652 	lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
5653 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
5654 	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5655 	lkb->lkb_rqmode = rl->rl_rqmode;
5656 	lkb->lkb_grmode = rl->rl_grmode;
5657 	/* don't set lkb_status because add_lkb wants to itself */
5658 
5659 	lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5660 	lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5661 
5662 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5663 		int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5664 			sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5665 		if (lvblen > ls->ls_lvblen)
5666 			return -EINVAL;
5667 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5668 		if (!lkb->lkb_lvbptr)
5669 			return -ENOMEM;
5670 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5671 	}
5672 
5673 	/* Conversions between PR and CW (middle modes) need special handling.
5674 	   The real granted mode of these converting locks cannot be determined
5675 	   until all locks have been rebuilt on the rsb (recover_conversion) */
5676 
5677 	if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5678 	    middle_conversion(lkb)) {
5679 		rl->rl_status = DLM_LKSTS_CONVERT;
5680 		lkb->lkb_grmode = DLM_LOCK_IV;
5681 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
5682 	}
5683 
5684 	return 0;
5685 }
5686 
5687 /* This lkb may have been recovered in a previous aborted recovery so we need
5688    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5689    If so we just send back a standard reply.  If not, we create a new lkb with
5690    the given values and send back our lkid.  We send back our lkid by sending
5691    back the rcom_lock struct we got but with the remid field filled in. */
5692 
5693 /* needs at least dlm_rcom + rcom_lock */
5694 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5695 {
5696 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5697 	struct dlm_rsb *r;
5698 	struct dlm_lkb *lkb;
5699 	uint32_t remid = 0;
5700 	int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5701 	int error;
5702 
5703 	if (rl->rl_parent_lkid) {
5704 		error = -EOPNOTSUPP;
5705 		goto out;
5706 	}
5707 
5708 	remid = le32_to_cpu(rl->rl_lkid);
5709 
5710 	/* In general we expect the rsb returned to be R_MASTER, but we don't
5711 	   have to require it.  Recovery of masters on one node can overlap
5712 	   recovery of locks on another node, so one node can send us MSTCPY
5713 	   locks before we've made ourselves master of this rsb.  We can still
5714 	   add new MSTCPY locks that we receive here without any harm; when
5715 	   we make ourselves master, dlm_recover_masters() won't touch the
5716 	   MSTCPY locks we've received early. */
5717 
5718 	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5719 			 from_nodeid, R_RECEIVE_RECOVER, &r);
5720 	if (error)
5721 		goto out;
5722 
5723 	lock_rsb(r);
5724 
5725 	if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5726 		log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5727 			  from_nodeid, remid);
5728 		error = -EBADR;
5729 		goto out_unlock;
5730 	}
5731 
5732 	lkb = search_remid(r, from_nodeid, remid);
5733 	if (lkb) {
5734 		error = -EEXIST;
5735 		goto out_remid;
5736 	}
5737 
5738 	error = create_lkb(ls, &lkb);
5739 	if (error)
5740 		goto out_unlock;
5741 
5742 	error = receive_rcom_lock_args(ls, lkb, r, rc);
5743 	if (error) {
5744 		__put_lkb(ls, lkb);
5745 		goto out_unlock;
5746 	}
5747 
5748 	attach_lkb(r, lkb);
5749 	add_lkb(r, lkb, rl->rl_status);
5750 	ls->ls_recover_locks_in++;
5751 
5752 	if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5753 		rsb_set_flag(r, RSB_RECOVER_GRANT);
5754 
5755  out_remid:
5756 	/* this is the new value returned to the lock holder for
5757 	   saving in its process-copy lkb */
5758 	rl->rl_remid = cpu_to_le32(lkb->lkb_id);
5759 
5760 	lkb->lkb_recover_seq = ls->ls_recover_seq;
5761 
5762  out_unlock:
5763 	unlock_rsb(r);
5764 	put_rsb(r);
5765  out:
5766 	if (error && error != -EEXIST)
5767 		log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5768 			  from_nodeid, remid, error);
5769 	rl->rl_result = cpu_to_le32(error);
5770 	return error;
5771 }
5772 
5773 /* needs at least dlm_rcom + rcom_lock */
5774 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5775 {
5776 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5777 	struct dlm_rsb *r;
5778 	struct dlm_lkb *lkb;
5779 	uint32_t lkid, remid;
5780 	int error, result;
5781 
5782 	lkid = le32_to_cpu(rl->rl_lkid);
5783 	remid = le32_to_cpu(rl->rl_remid);
5784 	result = le32_to_cpu(rl->rl_result);
5785 
5786 	error = find_lkb(ls, lkid, &lkb);
5787 	if (error) {
5788 		log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5789 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5790 			  result);
5791 		return error;
5792 	}
5793 
5794 	r = lkb->lkb_resource;
5795 	hold_rsb(r);
5796 	lock_rsb(r);
5797 
5798 	if (!is_process_copy(lkb)) {
5799 		log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5800 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5801 			  result);
5802 		dlm_dump_rsb(r);
5803 		unlock_rsb(r);
5804 		put_rsb(r);
5805 		dlm_put_lkb(lkb);
5806 		return -EINVAL;
5807 	}
5808 
5809 	switch (result) {
5810 	case -EBADR:
5811 		/* There's a chance the new master received our lock before
5812 		   dlm_recover_master_reply(), this wouldn't happen if we did
5813 		   a barrier between recover_masters and recover_locks. */
5814 
5815 		log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5816 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5817 			  result);
5818 
5819 		dlm_send_rcom_lock(r, lkb);
5820 		goto out;
5821 	case -EEXIST:
5822 	case 0:
5823 		lkb->lkb_remid = remid;
5824 		break;
5825 	default:
5826 		log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5827 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5828 			  result);
5829 	}
5830 
5831 	/* an ack for dlm_recover_locks() which waits for replies from
5832 	   all the locks it sends to new masters */
5833 	dlm_recovered_lock(r);
5834  out:
5835 	unlock_rsb(r);
5836 	put_rsb(r);
5837 	dlm_put_lkb(lkb);
5838 
5839 	return 0;
5840 }
5841 
5842 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5843 		     int mode, uint32_t flags, void *name, unsigned int namelen,
5844 		     unsigned long timeout_cs)
5845 {
5846 	struct dlm_lkb *lkb;
5847 	struct dlm_args args;
5848 	int error;
5849 
5850 	dlm_lock_recovery(ls);
5851 
5852 	error = create_lkb(ls, &lkb);
5853 	if (error) {
5854 		kfree(ua);
5855 		goto out;
5856 	}
5857 
5858 	if (flags & DLM_LKF_VALBLK) {
5859 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5860 		if (!ua->lksb.sb_lvbptr) {
5861 			kfree(ua);
5862 			__put_lkb(ls, lkb);
5863 			error = -ENOMEM;
5864 			goto out;
5865 		}
5866 	}
5867 	error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
5868 			      fake_astfn, ua, fake_bastfn, &args);
5869 	if (error) {
5870 		kfree(ua->lksb.sb_lvbptr);
5871 		ua->lksb.sb_lvbptr = NULL;
5872 		kfree(ua);
5873 		__put_lkb(ls, lkb);
5874 		goto out;
5875 	}
5876 
5877 	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
5878 	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
5879 	   lock and that lkb_astparam is the dlm_user_args structure. */
5880 	lkb->lkb_flags |= DLM_IFL_USER;
5881 	error = request_lock(ls, lkb, name, namelen, &args);
5882 
5883 	switch (error) {
5884 	case 0:
5885 		break;
5886 	case -EINPROGRESS:
5887 		error = 0;
5888 		break;
5889 	case -EAGAIN:
5890 		error = 0;
5891 		fallthrough;
5892 	default:
5893 		__put_lkb(ls, lkb);
5894 		goto out;
5895 	}
5896 
5897 	/* add this new lkb to the per-process list of locks */
5898 	spin_lock(&ua->proc->locks_spin);
5899 	hold_lkb(lkb);
5900 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5901 	spin_unlock(&ua->proc->locks_spin);
5902  out:
5903 	dlm_unlock_recovery(ls);
5904 	return error;
5905 }
5906 
5907 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5908 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
5909 		     unsigned long timeout_cs)
5910 {
5911 	struct dlm_lkb *lkb;
5912 	struct dlm_args args;
5913 	struct dlm_user_args *ua;
5914 	int error;
5915 
5916 	dlm_lock_recovery(ls);
5917 
5918 	error = find_lkb(ls, lkid, &lkb);
5919 	if (error)
5920 		goto out;
5921 
5922 	/* user can change the params on its lock when it converts it, or
5923 	   add an lvb that didn't exist before */
5924 
5925 	ua = lkb->lkb_ua;
5926 
5927 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5928 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5929 		if (!ua->lksb.sb_lvbptr) {
5930 			error = -ENOMEM;
5931 			goto out_put;
5932 		}
5933 	}
5934 	if (lvb_in && ua->lksb.sb_lvbptr)
5935 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5936 
5937 	ua->xid = ua_tmp->xid;
5938 	ua->castparam = ua_tmp->castparam;
5939 	ua->castaddr = ua_tmp->castaddr;
5940 	ua->bastparam = ua_tmp->bastparam;
5941 	ua->bastaddr = ua_tmp->bastaddr;
5942 	ua->user_lksb = ua_tmp->user_lksb;
5943 
5944 	error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
5945 			      fake_astfn, ua, fake_bastfn, &args);
5946 	if (error)
5947 		goto out_put;
5948 
5949 	error = convert_lock(ls, lkb, &args);
5950 
5951 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5952 		error = 0;
5953  out_put:
5954 	dlm_put_lkb(lkb);
5955  out:
5956 	dlm_unlock_recovery(ls);
5957 	kfree(ua_tmp);
5958 	return error;
5959 }
5960 
5961 /*
5962  * The caller asks for an orphan lock on a given resource with a given mode.
5963  * If a matching lock exists, it's moved to the owner's list of locks and
5964  * the lkid is returned.
5965  */
5966 
5967 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5968 		     int mode, uint32_t flags, void *name, unsigned int namelen,
5969 		     unsigned long timeout_cs, uint32_t *lkid)
5970 {
5971 	struct dlm_lkb *lkb = NULL, *iter;
5972 	struct dlm_user_args *ua;
5973 	int found_other_mode = 0;
5974 	int rv = 0;
5975 
5976 	mutex_lock(&ls->ls_orphans_mutex);
5977 	list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5978 		if (iter->lkb_resource->res_length != namelen)
5979 			continue;
5980 		if (memcmp(iter->lkb_resource->res_name, name, namelen))
5981 			continue;
5982 		if (iter->lkb_grmode != mode) {
5983 			found_other_mode = 1;
5984 			continue;
5985 		}
5986 
5987 		lkb = iter;
5988 		list_del_init(&iter->lkb_ownqueue);
5989 		iter->lkb_flags &= ~DLM_IFL_ORPHAN;
5990 		*lkid = iter->lkb_id;
5991 		break;
5992 	}
5993 	mutex_unlock(&ls->ls_orphans_mutex);
5994 
5995 	if (!lkb && found_other_mode) {
5996 		rv = -EAGAIN;
5997 		goto out;
5998 	}
5999 
6000 	if (!lkb) {
6001 		rv = -ENOENT;
6002 		goto out;
6003 	}
6004 
6005 	lkb->lkb_exflags = flags;
6006 	lkb->lkb_ownpid = (int) current->pid;
6007 
6008 	ua = lkb->lkb_ua;
6009 
6010 	ua->proc = ua_tmp->proc;
6011 	ua->xid = ua_tmp->xid;
6012 	ua->castparam = ua_tmp->castparam;
6013 	ua->castaddr = ua_tmp->castaddr;
6014 	ua->bastparam = ua_tmp->bastparam;
6015 	ua->bastaddr = ua_tmp->bastaddr;
6016 	ua->user_lksb = ua_tmp->user_lksb;
6017 
6018 	/*
6019 	 * The lkb reference from the ls_orphans list was not
6020 	 * removed above, and is now considered the reference
6021 	 * for the proc locks list.
6022 	 */
6023 
6024 	spin_lock(&ua->proc->locks_spin);
6025 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
6026 	spin_unlock(&ua->proc->locks_spin);
6027  out:
6028 	kfree(ua_tmp);
6029 	return rv;
6030 }
6031 
6032 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
6033 		    uint32_t flags, uint32_t lkid, char *lvb_in)
6034 {
6035 	struct dlm_lkb *lkb;
6036 	struct dlm_args args;
6037 	struct dlm_user_args *ua;
6038 	int error;
6039 
6040 	dlm_lock_recovery(ls);
6041 
6042 	error = find_lkb(ls, lkid, &lkb);
6043 	if (error)
6044 		goto out;
6045 
6046 	ua = lkb->lkb_ua;
6047 
6048 	if (lvb_in && ua->lksb.sb_lvbptr)
6049 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
6050 	if (ua_tmp->castparam)
6051 		ua->castparam = ua_tmp->castparam;
6052 	ua->user_lksb = ua_tmp->user_lksb;
6053 
6054 	error = set_unlock_args(flags, ua, &args);
6055 	if (error)
6056 		goto out_put;
6057 
6058 	error = unlock_lock(ls, lkb, &args);
6059 
6060 	if (error == -DLM_EUNLOCK)
6061 		error = 0;
6062 	/* from validate_unlock_args() */
6063 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
6064 		error = 0;
6065 	if (error)
6066 		goto out_put;
6067 
6068 	spin_lock(&ua->proc->locks_spin);
6069 	/* dlm_user_add_cb() may have already taken lkb off the proc list */
6070 	if (!list_empty(&lkb->lkb_ownqueue))
6071 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
6072 	spin_unlock(&ua->proc->locks_spin);
6073  out_put:
6074 	dlm_put_lkb(lkb);
6075  out:
6076 	dlm_unlock_recovery(ls);
6077 	kfree(ua_tmp);
6078 	return error;
6079 }
6080 
6081 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
6082 		    uint32_t flags, uint32_t lkid)
6083 {
6084 	struct dlm_lkb *lkb;
6085 	struct dlm_args args;
6086 	struct dlm_user_args *ua;
6087 	int error;
6088 
6089 	dlm_lock_recovery(ls);
6090 
6091 	error = find_lkb(ls, lkid, &lkb);
6092 	if (error)
6093 		goto out;
6094 
6095 	ua = lkb->lkb_ua;
6096 	if (ua_tmp->castparam)
6097 		ua->castparam = ua_tmp->castparam;
6098 	ua->user_lksb = ua_tmp->user_lksb;
6099 
6100 	error = set_unlock_args(flags, ua, &args);
6101 	if (error)
6102 		goto out_put;
6103 
6104 	error = cancel_lock(ls, lkb, &args);
6105 
6106 	if (error == -DLM_ECANCEL)
6107 		error = 0;
6108 	/* from validate_unlock_args() */
6109 	if (error == -EBUSY)
6110 		error = 0;
6111  out_put:
6112 	dlm_put_lkb(lkb);
6113  out:
6114 	dlm_unlock_recovery(ls);
6115 	kfree(ua_tmp);
6116 	return error;
6117 }
6118 
6119 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6120 {
6121 	struct dlm_lkb *lkb;
6122 	struct dlm_args args;
6123 	struct dlm_user_args *ua;
6124 	struct dlm_rsb *r;
6125 	int error;
6126 
6127 	dlm_lock_recovery(ls);
6128 
6129 	error = find_lkb(ls, lkid, &lkb);
6130 	if (error)
6131 		goto out;
6132 
6133 	ua = lkb->lkb_ua;
6134 
6135 	error = set_unlock_args(flags, ua, &args);
6136 	if (error)
6137 		goto out_put;
6138 
6139 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6140 
6141 	r = lkb->lkb_resource;
6142 	hold_rsb(r);
6143 	lock_rsb(r);
6144 
6145 	error = validate_unlock_args(lkb, &args);
6146 	if (error)
6147 		goto out_r;
6148 	lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
6149 
6150 	error = _cancel_lock(r, lkb);
6151  out_r:
6152 	unlock_rsb(r);
6153 	put_rsb(r);
6154 
6155 	if (error == -DLM_ECANCEL)
6156 		error = 0;
6157 	/* from validate_unlock_args() */
6158 	if (error == -EBUSY)
6159 		error = 0;
6160  out_put:
6161 	dlm_put_lkb(lkb);
6162  out:
6163 	dlm_unlock_recovery(ls);
6164 	return error;
6165 }
6166 
6167 /* lkb's that are removed from the waiters list by revert are just left on the
6168    orphans list with the granted orphan locks, to be freed by purge */
6169 
6170 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6171 {
6172 	struct dlm_args args;
6173 	int error;
6174 
6175 	hold_lkb(lkb); /* reference for the ls_orphans list */
6176 	mutex_lock(&ls->ls_orphans_mutex);
6177 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6178 	mutex_unlock(&ls->ls_orphans_mutex);
6179 
6180 	set_unlock_args(0, lkb->lkb_ua, &args);
6181 
6182 	error = cancel_lock(ls, lkb, &args);
6183 	if (error == -DLM_ECANCEL)
6184 		error = 0;
6185 	return error;
6186 }
6187 
6188 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6189    granted.  Regardless of what rsb queue the lock is on, it's removed and
6190    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
6191    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6192 
6193 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6194 {
6195 	struct dlm_args args;
6196 	int error;
6197 
6198 	set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6199 			lkb->lkb_ua, &args);
6200 
6201 	error = unlock_lock(ls, lkb, &args);
6202 	if (error == -DLM_EUNLOCK)
6203 		error = 0;
6204 	return error;
6205 }
6206 
6207 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6208    (which does lock_rsb) due to deadlock with receiving a message that does
6209    lock_rsb followed by dlm_user_add_cb() */
6210 
6211 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6212 				     struct dlm_user_proc *proc)
6213 {
6214 	struct dlm_lkb *lkb = NULL;
6215 
6216 	mutex_lock(&ls->ls_clear_proc_locks);
6217 	if (list_empty(&proc->locks))
6218 		goto out;
6219 
6220 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6221 	list_del_init(&lkb->lkb_ownqueue);
6222 
6223 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6224 		lkb->lkb_flags |= DLM_IFL_ORPHAN;
6225 	else
6226 		lkb->lkb_flags |= DLM_IFL_DEAD;
6227  out:
6228 	mutex_unlock(&ls->ls_clear_proc_locks);
6229 	return lkb;
6230 }
6231 
6232 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6233    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6234    which we clear here. */
6235 
6236 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6237    list, and no more device_writes should add lkb's to proc->locks list; so we
6238    shouldn't need to take asts_spin or locks_spin here.  this assumes that
6239    device reads/writes/closes are serialized -- FIXME: we may need to serialize
6240    them ourself. */
6241 
6242 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6243 {
6244 	struct dlm_lkb *lkb, *safe;
6245 
6246 	dlm_lock_recovery(ls);
6247 
6248 	while (1) {
6249 		lkb = del_proc_lock(ls, proc);
6250 		if (!lkb)
6251 			break;
6252 		del_timeout(lkb);
6253 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6254 			orphan_proc_lock(ls, lkb);
6255 		else
6256 			unlock_proc_lock(ls, lkb);
6257 
6258 		/* this removes the reference for the proc->locks list
6259 		   added by dlm_user_request, it may result in the lkb
6260 		   being freed */
6261 
6262 		dlm_put_lkb(lkb);
6263 	}
6264 
6265 	mutex_lock(&ls->ls_clear_proc_locks);
6266 
6267 	/* in-progress unlocks */
6268 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6269 		list_del_init(&lkb->lkb_ownqueue);
6270 		lkb->lkb_flags |= DLM_IFL_DEAD;
6271 		dlm_put_lkb(lkb);
6272 	}
6273 
6274 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6275 		memset(&lkb->lkb_callbacks, 0,
6276 		       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6277 		list_del_init(&lkb->lkb_cb_list);
6278 		dlm_put_lkb(lkb);
6279 	}
6280 
6281 	mutex_unlock(&ls->ls_clear_proc_locks);
6282 	dlm_unlock_recovery(ls);
6283 }
6284 
6285 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6286 {
6287 	struct dlm_lkb *lkb, *safe;
6288 
6289 	while (1) {
6290 		lkb = NULL;
6291 		spin_lock(&proc->locks_spin);
6292 		if (!list_empty(&proc->locks)) {
6293 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
6294 					 lkb_ownqueue);
6295 			list_del_init(&lkb->lkb_ownqueue);
6296 		}
6297 		spin_unlock(&proc->locks_spin);
6298 
6299 		if (!lkb)
6300 			break;
6301 
6302 		lkb->lkb_flags |= DLM_IFL_DEAD;
6303 		unlock_proc_lock(ls, lkb);
6304 		dlm_put_lkb(lkb); /* ref from proc->locks list */
6305 	}
6306 
6307 	spin_lock(&proc->locks_spin);
6308 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6309 		list_del_init(&lkb->lkb_ownqueue);
6310 		lkb->lkb_flags |= DLM_IFL_DEAD;
6311 		dlm_put_lkb(lkb);
6312 	}
6313 	spin_unlock(&proc->locks_spin);
6314 
6315 	spin_lock(&proc->asts_spin);
6316 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6317 		memset(&lkb->lkb_callbacks, 0,
6318 		       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6319 		list_del_init(&lkb->lkb_cb_list);
6320 		dlm_put_lkb(lkb);
6321 	}
6322 	spin_unlock(&proc->asts_spin);
6323 }
6324 
6325 /* pid of 0 means purge all orphans */
6326 
6327 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6328 {
6329 	struct dlm_lkb *lkb, *safe;
6330 
6331 	mutex_lock(&ls->ls_orphans_mutex);
6332 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6333 		if (pid && lkb->lkb_ownpid != pid)
6334 			continue;
6335 		unlock_proc_lock(ls, lkb);
6336 		list_del_init(&lkb->lkb_ownqueue);
6337 		dlm_put_lkb(lkb);
6338 	}
6339 	mutex_unlock(&ls->ls_orphans_mutex);
6340 }
6341 
6342 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6343 {
6344 	struct dlm_message *ms;
6345 	struct dlm_mhandle *mh;
6346 	int error;
6347 
6348 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6349 				DLM_MSG_PURGE, &ms, &mh);
6350 	if (error)
6351 		return error;
6352 	ms->m_nodeid = cpu_to_le32(nodeid);
6353 	ms->m_pid = cpu_to_le32(pid);
6354 
6355 	return send_message(mh, ms);
6356 }
6357 
6358 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6359 		   int nodeid, int pid)
6360 {
6361 	int error = 0;
6362 
6363 	if (nodeid && (nodeid != dlm_our_nodeid())) {
6364 		error = send_purge(ls, nodeid, pid);
6365 	} else {
6366 		dlm_lock_recovery(ls);
6367 		if (pid == current->pid)
6368 			purge_proc_locks(ls, proc);
6369 		else
6370 			do_purge(ls, nodeid, pid);
6371 		dlm_unlock_recovery(ls);
6372 	}
6373 	return error;
6374 }
6375 
6376 /* debug functionality */
6377 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6378 		      int lkb_nodeid, unsigned int lkb_flags, int lkb_status)
6379 {
6380 	struct dlm_lksb *lksb;
6381 	struct dlm_lkb *lkb;
6382 	struct dlm_rsb *r;
6383 	int error;
6384 
6385 	/* we currently can't set a valid user lock */
6386 	if (lkb_flags & DLM_IFL_USER)
6387 		return -EOPNOTSUPP;
6388 
6389 	lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6390 	if (!lksb)
6391 		return -ENOMEM;
6392 
6393 	error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6394 	if (error) {
6395 		kfree(lksb);
6396 		return error;
6397 	}
6398 
6399 	lkb->lkb_flags = lkb_flags;
6400 	lkb->lkb_nodeid = lkb_nodeid;
6401 	lkb->lkb_lksb = lksb;
6402 	/* user specific pointer, just don't have it NULL for kernel locks */
6403 	if (~lkb_flags & DLM_IFL_USER)
6404 		lkb->lkb_astparam = (void *)0xDEADBEEF;
6405 
6406 	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6407 	if (error) {
6408 		kfree(lksb);
6409 		__put_lkb(ls, lkb);
6410 		return error;
6411 	}
6412 
6413 	lock_rsb(r);
6414 	attach_lkb(r, lkb);
6415 	add_lkb(r, lkb, lkb_status);
6416 	unlock_rsb(r);
6417 	put_rsb(r);
6418 
6419 	return 0;
6420 }
6421 
6422 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6423 				 int mstype, int to_nodeid)
6424 {
6425 	struct dlm_lkb *lkb;
6426 	int error;
6427 
6428 	error = find_lkb(ls, lkb_id, &lkb);
6429 	if (error)
6430 		return error;
6431 
6432 	error = add_to_waiters(lkb, mstype, to_nodeid);
6433 	dlm_put_lkb(lkb);
6434 	return error;
6435 }
6436 
6437