xref: /openbmc/linux/fs/dlm/lock.c (revision 90099433)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 /* Central locking logic has four stages:
14 
15    dlm_lock()
16    dlm_unlock()
17 
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22 
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27 
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32 
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35 
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40 
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43 
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46 
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49 
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53 
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "lowcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77 
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 				    struct dlm_message *ms);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void del_timeout(struct dlm_lkb *lkb);
93 
94 /*
95  * Lock compatibilty matrix - thanks Steve
96  * UN = Unlocked state. Not really a state, used as a flag
97  * PD = Padding. Used to make the matrix a nice power of two in size
98  * Other states are the same as the VMS DLM.
99  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100  */
101 
102 static const int __dlm_compat_matrix[8][8] = {
103       /* UN NL CR CW PR PW EX PD */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112 };
113 
114 /*
115  * This defines the direction of transfer of LVB data.
116  * Granted mode is the row; requested mode is the column.
117  * Usage: matrix[grmode+1][rqmode+1]
118  * 1 = LVB is returned to the caller
119  * 0 = LVB is written to the resource
120  * -1 = nothing happens to the LVB
121  */
122 
123 const int dlm_lvb_operations[8][8] = {
124         /* UN   NL  CR  CW  PR  PW  EX  PD*/
125         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133 };
134 
135 #define modes_compat(gr, rq) \
136 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137 
138 int dlm_modes_compat(int mode1, int mode2)
139 {
140 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142 
143 /*
144  * Compatibility matrix for conversions with QUECVT set.
145  * Granted mode is the row; requested mode is the column.
146  * Usage: matrix[grmode+1][rqmode+1]
147  */
148 
149 static const int __quecvt_compat_matrix[8][8] = {
150       /* UN NL CR CW PR PW EX PD */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159 };
160 
161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
164 	       "     status %d rqmode %d grmode %d wait_type %d\n",
165 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
167 	       lkb->lkb_grmode, lkb->lkb_wait_type);
168 }
169 
170 static void dlm_print_rsb(struct dlm_rsb *r)
171 {
172 	printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
173 	       r->res_nodeid, r->res_flags, r->res_first_lkid,
174 	       r->res_recover_locks_count, r->res_name);
175 }
176 
177 void dlm_dump_rsb(struct dlm_rsb *r)
178 {
179 	struct dlm_lkb *lkb;
180 
181 	dlm_print_rsb(r);
182 
183 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
184 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
185 	printk(KERN_ERR "rsb lookup list\n");
186 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
187 		dlm_print_lkb(lkb);
188 	printk(KERN_ERR "rsb grant queue:\n");
189 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
190 		dlm_print_lkb(lkb);
191 	printk(KERN_ERR "rsb convert queue:\n");
192 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
193 		dlm_print_lkb(lkb);
194 	printk(KERN_ERR "rsb wait queue:\n");
195 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
196 		dlm_print_lkb(lkb);
197 }
198 
199 /* Threads cannot use the lockspace while it's being recovered */
200 
201 static inline void dlm_lock_recovery(struct dlm_ls *ls)
202 {
203 	down_read(&ls->ls_in_recovery);
204 }
205 
206 void dlm_unlock_recovery(struct dlm_ls *ls)
207 {
208 	up_read(&ls->ls_in_recovery);
209 }
210 
211 int dlm_lock_recovery_try(struct dlm_ls *ls)
212 {
213 	return down_read_trylock(&ls->ls_in_recovery);
214 }
215 
216 static inline int can_be_queued(struct dlm_lkb *lkb)
217 {
218 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
219 }
220 
221 static inline int force_blocking_asts(struct dlm_lkb *lkb)
222 {
223 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
224 }
225 
226 static inline int is_demoted(struct dlm_lkb *lkb)
227 {
228 	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
229 }
230 
231 static inline int is_altmode(struct dlm_lkb *lkb)
232 {
233 	return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
234 }
235 
236 static inline int is_granted(struct dlm_lkb *lkb)
237 {
238 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
239 }
240 
241 static inline int is_remote(struct dlm_rsb *r)
242 {
243 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
244 	return !!r->res_nodeid;
245 }
246 
247 static inline int is_process_copy(struct dlm_lkb *lkb)
248 {
249 	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
250 }
251 
252 static inline int is_master_copy(struct dlm_lkb *lkb)
253 {
254 	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
255 		DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
256 	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
257 }
258 
259 static inline int middle_conversion(struct dlm_lkb *lkb)
260 {
261 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
262 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
263 		return 1;
264 	return 0;
265 }
266 
267 static inline int down_conversion(struct dlm_lkb *lkb)
268 {
269 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
270 }
271 
272 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
273 {
274 	return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
275 }
276 
277 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
278 {
279 	return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
280 }
281 
282 static inline int is_overlap(struct dlm_lkb *lkb)
283 {
284 	return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
285 				  DLM_IFL_OVERLAP_CANCEL));
286 }
287 
288 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
289 {
290 	if (is_master_copy(lkb))
291 		return;
292 
293 	del_timeout(lkb);
294 
295 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296 
297 	/* if the operation was a cancel, then return -DLM_ECANCEL, if a
298 	   timeout caused the cancel then return -ETIMEDOUT */
299 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
300 		lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
301 		rv = -ETIMEDOUT;
302 	}
303 
304 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
305 		lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
306 		rv = -EDEADLK;
307 	}
308 
309 	dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
310 }
311 
312 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
313 {
314 	queue_cast(r, lkb,
315 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
316 }
317 
318 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
319 {
320 	if (is_master_copy(lkb)) {
321 		send_bast(r, lkb, rqmode);
322 	} else {
323 		dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
324 	}
325 }
326 
327 /*
328  * Basic operations on rsb's and lkb's
329  */
330 
331 static int pre_rsb_struct(struct dlm_ls *ls)
332 {
333 	struct dlm_rsb *r1, *r2;
334 	int count = 0;
335 
336 	spin_lock(&ls->ls_new_rsb_spin);
337 	if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
338 		spin_unlock(&ls->ls_new_rsb_spin);
339 		return 0;
340 	}
341 	spin_unlock(&ls->ls_new_rsb_spin);
342 
343 	r1 = dlm_allocate_rsb(ls);
344 	r2 = dlm_allocate_rsb(ls);
345 
346 	spin_lock(&ls->ls_new_rsb_spin);
347 	if (r1) {
348 		list_add(&r1->res_hashchain, &ls->ls_new_rsb);
349 		ls->ls_new_rsb_count++;
350 	}
351 	if (r2) {
352 		list_add(&r2->res_hashchain, &ls->ls_new_rsb);
353 		ls->ls_new_rsb_count++;
354 	}
355 	count = ls->ls_new_rsb_count;
356 	spin_unlock(&ls->ls_new_rsb_spin);
357 
358 	if (!count)
359 		return -ENOMEM;
360 	return 0;
361 }
362 
363 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
364    unlock any spinlocks, go back and call pre_rsb_struct again.
365    Otherwise, take an rsb off the list and return it. */
366 
367 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
368 			  struct dlm_rsb **r_ret)
369 {
370 	struct dlm_rsb *r;
371 	int count;
372 
373 	spin_lock(&ls->ls_new_rsb_spin);
374 	if (list_empty(&ls->ls_new_rsb)) {
375 		count = ls->ls_new_rsb_count;
376 		spin_unlock(&ls->ls_new_rsb_spin);
377 		log_debug(ls, "find_rsb retry %d %d %s",
378 			  count, dlm_config.ci_new_rsb_count, name);
379 		return -EAGAIN;
380 	}
381 
382 	r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
383 	list_del(&r->res_hashchain);
384 	/* Convert the empty list_head to a NULL rb_node for tree usage: */
385 	memset(&r->res_hashnode, 0, sizeof(struct rb_node));
386 	ls->ls_new_rsb_count--;
387 	spin_unlock(&ls->ls_new_rsb_spin);
388 
389 	r->res_ls = ls;
390 	r->res_length = len;
391 	memcpy(r->res_name, name, len);
392 	mutex_init(&r->res_mutex);
393 
394 	INIT_LIST_HEAD(&r->res_lookup);
395 	INIT_LIST_HEAD(&r->res_grantqueue);
396 	INIT_LIST_HEAD(&r->res_convertqueue);
397 	INIT_LIST_HEAD(&r->res_waitqueue);
398 	INIT_LIST_HEAD(&r->res_root_list);
399 	INIT_LIST_HEAD(&r->res_recover_list);
400 
401 	*r_ret = r;
402 	return 0;
403 }
404 
405 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
406 {
407 	char maxname[DLM_RESNAME_MAXLEN];
408 
409 	memset(maxname, 0, DLM_RESNAME_MAXLEN);
410 	memcpy(maxname, name, nlen);
411 	return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
412 }
413 
414 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
415 			unsigned int flags, struct dlm_rsb **r_ret)
416 {
417 	struct rb_node *node = tree->rb_node;
418 	struct dlm_rsb *r;
419 	int error = 0;
420 	int rc;
421 
422 	while (node) {
423 		r = rb_entry(node, struct dlm_rsb, res_hashnode);
424 		rc = rsb_cmp(r, name, len);
425 		if (rc < 0)
426 			node = node->rb_left;
427 		else if (rc > 0)
428 			node = node->rb_right;
429 		else
430 			goto found;
431 	}
432 	*r_ret = NULL;
433 	return -EBADR;
434 
435  found:
436 	if (r->res_nodeid && (flags & R_MASTER))
437 		error = -ENOTBLK;
438 	*r_ret = r;
439 	return error;
440 }
441 
442 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
443 {
444 	struct rb_node **newn = &tree->rb_node;
445 	struct rb_node *parent = NULL;
446 	int rc;
447 
448 	while (*newn) {
449 		struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
450 					       res_hashnode);
451 
452 		parent = *newn;
453 		rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
454 		if (rc < 0)
455 			newn = &parent->rb_left;
456 		else if (rc > 0)
457 			newn = &parent->rb_right;
458 		else {
459 			log_print("rsb_insert match");
460 			dlm_dump_rsb(rsb);
461 			dlm_dump_rsb(cur);
462 			return -EEXIST;
463 		}
464 	}
465 
466 	rb_link_node(&rsb->res_hashnode, parent, newn);
467 	rb_insert_color(&rsb->res_hashnode, tree);
468 	return 0;
469 }
470 
471 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
472 		       unsigned int flags, struct dlm_rsb **r_ret)
473 {
474 	struct dlm_rsb *r;
475 	int error;
476 
477 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
478 	if (!error) {
479 		kref_get(&r->res_ref);
480 		goto out;
481 	}
482 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
483 	if (error)
484 		goto out;
485 
486 	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
487 	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
488 	if (error)
489 		return error;
490 
491 	if (dlm_no_directory(ls))
492 		goto out;
493 
494 	if (r->res_nodeid == -1) {
495 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
496 		r->res_first_lkid = 0;
497 	} else if (r->res_nodeid > 0) {
498 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
499 		r->res_first_lkid = 0;
500 	} else {
501 		DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
502 		DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
503 	}
504  out:
505 	*r_ret = r;
506 	return error;
507 }
508 
509 /*
510  * Find rsb in rsbtbl and potentially create/add one
511  *
512  * Delaying the release of rsb's has a similar benefit to applications keeping
513  * NL locks on an rsb, but without the guarantee that the cached master value
514  * will still be valid when the rsb is reused.  Apps aren't always smart enough
515  * to keep NL locks on an rsb that they may lock again shortly; this can lead
516  * to excessive master lookups and removals if we don't delay the release.
517  *
518  * Searching for an rsb means looking through both the normal list and toss
519  * list.  When found on the toss list the rsb is moved to the normal list with
520  * ref count of 1; when found on normal list the ref count is incremented.
521  */
522 
523 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
524 		    unsigned int flags, struct dlm_rsb **r_ret)
525 {
526 	struct dlm_rsb *r = NULL;
527 	uint32_t hash, bucket;
528 	int error;
529 
530 	if (namelen > DLM_RESNAME_MAXLEN) {
531 		error = -EINVAL;
532 		goto out;
533 	}
534 
535 	if (dlm_no_directory(ls))
536 		flags |= R_CREATE;
537 
538 	hash = jhash(name, namelen, 0);
539 	bucket = hash & (ls->ls_rsbtbl_size - 1);
540 
541  retry:
542 	if (flags & R_CREATE) {
543 		error = pre_rsb_struct(ls);
544 		if (error < 0)
545 			goto out;
546 	}
547 
548 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
549 
550 	error = _search_rsb(ls, name, namelen, bucket, flags, &r);
551 	if (!error)
552 		goto out_unlock;
553 
554 	if (error == -EBADR && !(flags & R_CREATE))
555 		goto out_unlock;
556 
557 	/* the rsb was found but wasn't a master copy */
558 	if (error == -ENOTBLK)
559 		goto out_unlock;
560 
561 	error = get_rsb_struct(ls, name, namelen, &r);
562 	if (error == -EAGAIN) {
563 		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
564 		goto retry;
565 	}
566 	if (error)
567 		goto out_unlock;
568 
569 	r->res_hash = hash;
570 	r->res_bucket = bucket;
571 	r->res_nodeid = -1;
572 	kref_init(&r->res_ref);
573 
574 	/* With no directory, the master can be set immediately */
575 	if (dlm_no_directory(ls)) {
576 		int nodeid = dlm_dir_nodeid(r);
577 		if (nodeid == dlm_our_nodeid())
578 			nodeid = 0;
579 		r->res_nodeid = nodeid;
580 	}
581 	error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
582  out_unlock:
583 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
584  out:
585 	*r_ret = r;
586 	return error;
587 }
588 
589 /* This is only called to add a reference when the code already holds
590    a valid reference to the rsb, so there's no need for locking. */
591 
592 static inline void hold_rsb(struct dlm_rsb *r)
593 {
594 	kref_get(&r->res_ref);
595 }
596 
597 void dlm_hold_rsb(struct dlm_rsb *r)
598 {
599 	hold_rsb(r);
600 }
601 
602 static void toss_rsb(struct kref *kref)
603 {
604 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
605 	struct dlm_ls *ls = r->res_ls;
606 
607 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
608 	kref_init(&r->res_ref);
609 	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
610 	rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
611 	r->res_toss_time = jiffies;
612 	if (r->res_lvbptr) {
613 		dlm_free_lvb(r->res_lvbptr);
614 		r->res_lvbptr = NULL;
615 	}
616 }
617 
618 /* When all references to the rsb are gone it's transferred to
619    the tossed list for later disposal. */
620 
621 static void put_rsb(struct dlm_rsb *r)
622 {
623 	struct dlm_ls *ls = r->res_ls;
624 	uint32_t bucket = r->res_bucket;
625 
626 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
627 	kref_put(&r->res_ref, toss_rsb);
628 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
629 }
630 
631 void dlm_put_rsb(struct dlm_rsb *r)
632 {
633 	put_rsb(r);
634 }
635 
636 /* See comment for unhold_lkb */
637 
638 static void unhold_rsb(struct dlm_rsb *r)
639 {
640 	int rv;
641 	rv = kref_put(&r->res_ref, toss_rsb);
642 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
643 }
644 
645 static void kill_rsb(struct kref *kref)
646 {
647 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
648 
649 	/* All work is done after the return from kref_put() so we
650 	   can release the write_lock before the remove and free. */
651 
652 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
653 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
654 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
655 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
656 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
657 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
658 }
659 
660 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
661    The rsb must exist as long as any lkb's for it do. */
662 
663 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
664 {
665 	hold_rsb(r);
666 	lkb->lkb_resource = r;
667 }
668 
669 static void detach_lkb(struct dlm_lkb *lkb)
670 {
671 	if (lkb->lkb_resource) {
672 		put_rsb(lkb->lkb_resource);
673 		lkb->lkb_resource = NULL;
674 	}
675 }
676 
677 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
678 {
679 	struct dlm_lkb *lkb;
680 	int rv, id;
681 
682 	lkb = dlm_allocate_lkb(ls);
683 	if (!lkb)
684 		return -ENOMEM;
685 
686 	lkb->lkb_nodeid = -1;
687 	lkb->lkb_grmode = DLM_LOCK_IV;
688 	kref_init(&lkb->lkb_ref);
689 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
690 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
691 	INIT_LIST_HEAD(&lkb->lkb_time_list);
692 	INIT_LIST_HEAD(&lkb->lkb_cb_list);
693 	mutex_init(&lkb->lkb_cb_mutex);
694 	INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
695 
696  retry:
697 	rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
698 	if (!rv)
699 		return -ENOMEM;
700 
701 	spin_lock(&ls->ls_lkbidr_spin);
702 	rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
703 	if (!rv)
704 		lkb->lkb_id = id;
705 	spin_unlock(&ls->ls_lkbidr_spin);
706 
707 	if (rv == -EAGAIN)
708 		goto retry;
709 
710 	if (rv < 0) {
711 		log_error(ls, "create_lkb idr error %d", rv);
712 		return rv;
713 	}
714 
715 	*lkb_ret = lkb;
716 	return 0;
717 }
718 
719 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
720 {
721 	struct dlm_lkb *lkb;
722 
723 	spin_lock(&ls->ls_lkbidr_spin);
724 	lkb = idr_find(&ls->ls_lkbidr, lkid);
725 	if (lkb)
726 		kref_get(&lkb->lkb_ref);
727 	spin_unlock(&ls->ls_lkbidr_spin);
728 
729 	*lkb_ret = lkb;
730 	return lkb ? 0 : -ENOENT;
731 }
732 
733 static void kill_lkb(struct kref *kref)
734 {
735 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
736 
737 	/* All work is done after the return from kref_put() so we
738 	   can release the write_lock before the detach_lkb */
739 
740 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
741 }
742 
743 /* __put_lkb() is used when an lkb may not have an rsb attached to
744    it so we need to provide the lockspace explicitly */
745 
746 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
747 {
748 	uint32_t lkid = lkb->lkb_id;
749 
750 	spin_lock(&ls->ls_lkbidr_spin);
751 	if (kref_put(&lkb->lkb_ref, kill_lkb)) {
752 		idr_remove(&ls->ls_lkbidr, lkid);
753 		spin_unlock(&ls->ls_lkbidr_spin);
754 
755 		detach_lkb(lkb);
756 
757 		/* for local/process lkbs, lvbptr points to caller's lksb */
758 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
759 			dlm_free_lvb(lkb->lkb_lvbptr);
760 		dlm_free_lkb(lkb);
761 		return 1;
762 	} else {
763 		spin_unlock(&ls->ls_lkbidr_spin);
764 		return 0;
765 	}
766 }
767 
768 int dlm_put_lkb(struct dlm_lkb *lkb)
769 {
770 	struct dlm_ls *ls;
771 
772 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
773 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
774 
775 	ls = lkb->lkb_resource->res_ls;
776 	return __put_lkb(ls, lkb);
777 }
778 
779 /* This is only called to add a reference when the code already holds
780    a valid reference to the lkb, so there's no need for locking. */
781 
782 static inline void hold_lkb(struct dlm_lkb *lkb)
783 {
784 	kref_get(&lkb->lkb_ref);
785 }
786 
787 /* This is called when we need to remove a reference and are certain
788    it's not the last ref.  e.g. del_lkb is always called between a
789    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
790    put_lkb would work fine, but would involve unnecessary locking */
791 
792 static inline void unhold_lkb(struct dlm_lkb *lkb)
793 {
794 	int rv;
795 	rv = kref_put(&lkb->lkb_ref, kill_lkb);
796 	DLM_ASSERT(!rv, dlm_print_lkb(lkb););
797 }
798 
799 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
800 			    int mode)
801 {
802 	struct dlm_lkb *lkb = NULL;
803 
804 	list_for_each_entry(lkb, head, lkb_statequeue)
805 		if (lkb->lkb_rqmode < mode)
806 			break;
807 
808 	__list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
809 }
810 
811 /* add/remove lkb to rsb's grant/convert/wait queue */
812 
813 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
814 {
815 	kref_get(&lkb->lkb_ref);
816 
817 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
818 
819 	lkb->lkb_timestamp = ktime_get();
820 
821 	lkb->lkb_status = status;
822 
823 	switch (status) {
824 	case DLM_LKSTS_WAITING:
825 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
826 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
827 		else
828 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
829 		break;
830 	case DLM_LKSTS_GRANTED:
831 		/* convention says granted locks kept in order of grmode */
832 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
833 				lkb->lkb_grmode);
834 		break;
835 	case DLM_LKSTS_CONVERT:
836 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
837 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
838 		else
839 			list_add_tail(&lkb->lkb_statequeue,
840 				      &r->res_convertqueue);
841 		break;
842 	default:
843 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
844 	}
845 }
846 
847 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
848 {
849 	lkb->lkb_status = 0;
850 	list_del(&lkb->lkb_statequeue);
851 	unhold_lkb(lkb);
852 }
853 
854 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
855 {
856 	hold_lkb(lkb);
857 	del_lkb(r, lkb);
858 	add_lkb(r, lkb, sts);
859 	unhold_lkb(lkb);
860 }
861 
862 static int msg_reply_type(int mstype)
863 {
864 	switch (mstype) {
865 	case DLM_MSG_REQUEST:
866 		return DLM_MSG_REQUEST_REPLY;
867 	case DLM_MSG_CONVERT:
868 		return DLM_MSG_CONVERT_REPLY;
869 	case DLM_MSG_UNLOCK:
870 		return DLM_MSG_UNLOCK_REPLY;
871 	case DLM_MSG_CANCEL:
872 		return DLM_MSG_CANCEL_REPLY;
873 	case DLM_MSG_LOOKUP:
874 		return DLM_MSG_LOOKUP_REPLY;
875 	}
876 	return -1;
877 }
878 
879 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
880 {
881 	int i;
882 
883 	for (i = 0; i < num_nodes; i++) {
884 		if (!warned[i]) {
885 			warned[i] = nodeid;
886 			return 0;
887 		}
888 		if (warned[i] == nodeid)
889 			return 1;
890 	}
891 	return 0;
892 }
893 
894 void dlm_scan_waiters(struct dlm_ls *ls)
895 {
896 	struct dlm_lkb *lkb;
897 	ktime_t zero = ktime_set(0, 0);
898 	s64 us;
899 	s64 debug_maxus = 0;
900 	u32 debug_scanned = 0;
901 	u32 debug_expired = 0;
902 	int num_nodes = 0;
903 	int *warned = NULL;
904 
905 	if (!dlm_config.ci_waitwarn_us)
906 		return;
907 
908 	mutex_lock(&ls->ls_waiters_mutex);
909 
910 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
911 		if (ktime_equal(lkb->lkb_wait_time, zero))
912 			continue;
913 
914 		debug_scanned++;
915 
916 		us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
917 
918 		if (us < dlm_config.ci_waitwarn_us)
919 			continue;
920 
921 		lkb->lkb_wait_time = zero;
922 
923 		debug_expired++;
924 		if (us > debug_maxus)
925 			debug_maxus = us;
926 
927 		if (!num_nodes) {
928 			num_nodes = ls->ls_num_nodes;
929 			warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
930 		}
931 		if (!warned)
932 			continue;
933 		if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
934 			continue;
935 
936 		log_error(ls, "waitwarn %x %lld %d us check connection to "
937 			  "node %d", lkb->lkb_id, (long long)us,
938 			  dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
939 	}
940 	mutex_unlock(&ls->ls_waiters_mutex);
941 	kfree(warned);
942 
943 	if (debug_expired)
944 		log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
945 			  debug_scanned, debug_expired,
946 			  dlm_config.ci_waitwarn_us, (long long)debug_maxus);
947 }
948 
949 /* add/remove lkb from global waiters list of lkb's waiting for
950    a reply from a remote node */
951 
952 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
953 {
954 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
955 	int error = 0;
956 
957 	mutex_lock(&ls->ls_waiters_mutex);
958 
959 	if (is_overlap_unlock(lkb) ||
960 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
961 		error = -EINVAL;
962 		goto out;
963 	}
964 
965 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
966 		switch (mstype) {
967 		case DLM_MSG_UNLOCK:
968 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
969 			break;
970 		case DLM_MSG_CANCEL:
971 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
972 			break;
973 		default:
974 			error = -EBUSY;
975 			goto out;
976 		}
977 		lkb->lkb_wait_count++;
978 		hold_lkb(lkb);
979 
980 		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
981 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
982 			  lkb->lkb_wait_count, lkb->lkb_flags);
983 		goto out;
984 	}
985 
986 	DLM_ASSERT(!lkb->lkb_wait_count,
987 		   dlm_print_lkb(lkb);
988 		   printk("wait_count %d\n", lkb->lkb_wait_count););
989 
990 	lkb->lkb_wait_count++;
991 	lkb->lkb_wait_type = mstype;
992 	lkb->lkb_wait_time = ktime_get();
993 	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
994 	hold_lkb(lkb);
995 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
996  out:
997 	if (error)
998 		log_error(ls, "addwait error %x %d flags %x %d %d %s",
999 			  lkb->lkb_id, error, lkb->lkb_flags, mstype,
1000 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1001 	mutex_unlock(&ls->ls_waiters_mutex);
1002 	return error;
1003 }
1004 
1005 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1006    list as part of process_requestqueue (e.g. a lookup that has an optimized
1007    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1008    set RESEND and dlm_recover_waiters_post() */
1009 
1010 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1011 				struct dlm_message *ms)
1012 {
1013 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1014 	int overlap_done = 0;
1015 
1016 	if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1017 		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1018 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1019 		overlap_done = 1;
1020 		goto out_del;
1021 	}
1022 
1023 	if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1024 		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1025 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1026 		overlap_done = 1;
1027 		goto out_del;
1028 	}
1029 
1030 	/* Cancel state was preemptively cleared by a successful convert,
1031 	   see next comment, nothing to do. */
1032 
1033 	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1034 	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1035 		log_debug(ls, "remwait %x cancel_reply wait_type %d",
1036 			  lkb->lkb_id, lkb->lkb_wait_type);
1037 		return -1;
1038 	}
1039 
1040 	/* Remove for the convert reply, and premptively remove for the
1041 	   cancel reply.  A convert has been granted while there's still
1042 	   an outstanding cancel on it (the cancel is moot and the result
1043 	   in the cancel reply should be 0).  We preempt the cancel reply
1044 	   because the app gets the convert result and then can follow up
1045 	   with another op, like convert.  This subsequent op would see the
1046 	   lingering state of the cancel and fail with -EBUSY. */
1047 
1048 	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1049 	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1050 	    is_overlap_cancel(lkb) && ms && !ms->m_result) {
1051 		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1052 			  lkb->lkb_id);
1053 		lkb->lkb_wait_type = 0;
1054 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1055 		lkb->lkb_wait_count--;
1056 		goto out_del;
1057 	}
1058 
1059 	/* N.B. type of reply may not always correspond to type of original
1060 	   msg due to lookup->request optimization, verify others? */
1061 
1062 	if (lkb->lkb_wait_type) {
1063 		lkb->lkb_wait_type = 0;
1064 		goto out_del;
1065 	}
1066 
1067 	log_error(ls, "remwait error %x reply %d flags %x no wait_type",
1068 		  lkb->lkb_id, mstype, lkb->lkb_flags);
1069 	return -1;
1070 
1071  out_del:
1072 	/* the force-unlock/cancel has completed and we haven't recvd a reply
1073 	   to the op that was in progress prior to the unlock/cancel; we
1074 	   give up on any reply to the earlier op.  FIXME: not sure when/how
1075 	   this would happen */
1076 
1077 	if (overlap_done && lkb->lkb_wait_type) {
1078 		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1079 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
1080 		lkb->lkb_wait_count--;
1081 		lkb->lkb_wait_type = 0;
1082 	}
1083 
1084 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1085 
1086 	lkb->lkb_flags &= ~DLM_IFL_RESEND;
1087 	lkb->lkb_wait_count--;
1088 	if (!lkb->lkb_wait_count)
1089 		list_del_init(&lkb->lkb_wait_reply);
1090 	unhold_lkb(lkb);
1091 	return 0;
1092 }
1093 
1094 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1095 {
1096 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1097 	int error;
1098 
1099 	mutex_lock(&ls->ls_waiters_mutex);
1100 	error = _remove_from_waiters(lkb, mstype, NULL);
1101 	mutex_unlock(&ls->ls_waiters_mutex);
1102 	return error;
1103 }
1104 
1105 /* Handles situations where we might be processing a "fake" or "stub" reply in
1106    which we can't try to take waiters_mutex again. */
1107 
1108 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1109 {
1110 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1111 	int error;
1112 
1113 	if (ms->m_flags != DLM_IFL_STUB_MS)
1114 		mutex_lock(&ls->ls_waiters_mutex);
1115 	error = _remove_from_waiters(lkb, ms->m_type, ms);
1116 	if (ms->m_flags != DLM_IFL_STUB_MS)
1117 		mutex_unlock(&ls->ls_waiters_mutex);
1118 	return error;
1119 }
1120 
1121 static void dir_remove(struct dlm_rsb *r)
1122 {
1123 	int to_nodeid;
1124 
1125 	if (dlm_no_directory(r->res_ls))
1126 		return;
1127 
1128 	to_nodeid = dlm_dir_nodeid(r);
1129 	if (to_nodeid != dlm_our_nodeid())
1130 		send_remove(r);
1131 	else
1132 		dlm_dir_remove_entry(r->res_ls, to_nodeid,
1133 				     r->res_name, r->res_length);
1134 }
1135 
1136 /* FIXME: make this more efficient */
1137 
1138 static int shrink_bucket(struct dlm_ls *ls, int b)
1139 {
1140 	struct rb_node *n;
1141 	struct dlm_rsb *r;
1142 	int count = 0, found;
1143 
1144 	for (;;) {
1145 		found = 0;
1146 		spin_lock(&ls->ls_rsbtbl[b].lock);
1147 		for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
1148 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
1149 			if (!time_after_eq(jiffies, r->res_toss_time +
1150 					   dlm_config.ci_toss_secs * HZ))
1151 				continue;
1152 			found = 1;
1153 			break;
1154 		}
1155 
1156 		if (!found) {
1157 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1158 			break;
1159 		}
1160 
1161 		if (kref_put(&r->res_ref, kill_rsb)) {
1162 			rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1163 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1164 
1165 			if (is_master(r))
1166 				dir_remove(r);
1167 			dlm_free_rsb(r);
1168 			count++;
1169 		} else {
1170 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1171 			log_error(ls, "tossed rsb in use %s", r->res_name);
1172 		}
1173 	}
1174 
1175 	return count;
1176 }
1177 
1178 void dlm_scan_rsbs(struct dlm_ls *ls)
1179 {
1180 	int i;
1181 
1182 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1183 		shrink_bucket(ls, i);
1184 		if (dlm_locking_stopped(ls))
1185 			break;
1186 		cond_resched();
1187 	}
1188 }
1189 
1190 static void add_timeout(struct dlm_lkb *lkb)
1191 {
1192 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1193 
1194 	if (is_master_copy(lkb))
1195 		return;
1196 
1197 	if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1198 	    !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1199 		lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1200 		goto add_it;
1201 	}
1202 	if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1203 		goto add_it;
1204 	return;
1205 
1206  add_it:
1207 	DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1208 	mutex_lock(&ls->ls_timeout_mutex);
1209 	hold_lkb(lkb);
1210 	list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1211 	mutex_unlock(&ls->ls_timeout_mutex);
1212 }
1213 
1214 static void del_timeout(struct dlm_lkb *lkb)
1215 {
1216 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1217 
1218 	mutex_lock(&ls->ls_timeout_mutex);
1219 	if (!list_empty(&lkb->lkb_time_list)) {
1220 		list_del_init(&lkb->lkb_time_list);
1221 		unhold_lkb(lkb);
1222 	}
1223 	mutex_unlock(&ls->ls_timeout_mutex);
1224 }
1225 
1226 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1227    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1228    and then lock rsb because of lock ordering in add_timeout.  We may need
1229    to specify some special timeout-related bits in the lkb that are just to
1230    be accessed under the timeout_mutex. */
1231 
1232 void dlm_scan_timeout(struct dlm_ls *ls)
1233 {
1234 	struct dlm_rsb *r;
1235 	struct dlm_lkb *lkb;
1236 	int do_cancel, do_warn;
1237 	s64 wait_us;
1238 
1239 	for (;;) {
1240 		if (dlm_locking_stopped(ls))
1241 			break;
1242 
1243 		do_cancel = 0;
1244 		do_warn = 0;
1245 		mutex_lock(&ls->ls_timeout_mutex);
1246 		list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1247 
1248 			wait_us = ktime_to_us(ktime_sub(ktime_get(),
1249 					      		lkb->lkb_timestamp));
1250 
1251 			if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1252 			    wait_us >= (lkb->lkb_timeout_cs * 10000))
1253 				do_cancel = 1;
1254 
1255 			if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1256 			    wait_us >= dlm_config.ci_timewarn_cs * 10000)
1257 				do_warn = 1;
1258 
1259 			if (!do_cancel && !do_warn)
1260 				continue;
1261 			hold_lkb(lkb);
1262 			break;
1263 		}
1264 		mutex_unlock(&ls->ls_timeout_mutex);
1265 
1266 		if (!do_cancel && !do_warn)
1267 			break;
1268 
1269 		r = lkb->lkb_resource;
1270 		hold_rsb(r);
1271 		lock_rsb(r);
1272 
1273 		if (do_warn) {
1274 			/* clear flag so we only warn once */
1275 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1276 			if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1277 				del_timeout(lkb);
1278 			dlm_timeout_warn(lkb);
1279 		}
1280 
1281 		if (do_cancel) {
1282 			log_debug(ls, "timeout cancel %x node %d %s",
1283 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1284 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1285 			lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1286 			del_timeout(lkb);
1287 			_cancel_lock(r, lkb);
1288 		}
1289 
1290 		unlock_rsb(r);
1291 		unhold_rsb(r);
1292 		dlm_put_lkb(lkb);
1293 	}
1294 }
1295 
1296 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1297    dlm_recoverd before checking/setting ls_recover_begin. */
1298 
1299 void dlm_adjust_timeouts(struct dlm_ls *ls)
1300 {
1301 	struct dlm_lkb *lkb;
1302 	u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1303 
1304 	ls->ls_recover_begin = 0;
1305 	mutex_lock(&ls->ls_timeout_mutex);
1306 	list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1307 		lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1308 	mutex_unlock(&ls->ls_timeout_mutex);
1309 
1310 	if (!dlm_config.ci_waitwarn_us)
1311 		return;
1312 
1313 	mutex_lock(&ls->ls_waiters_mutex);
1314 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1315 		if (ktime_to_us(lkb->lkb_wait_time))
1316 			lkb->lkb_wait_time = ktime_get();
1317 	}
1318 	mutex_unlock(&ls->ls_waiters_mutex);
1319 }
1320 
1321 /* lkb is master or local copy */
1322 
1323 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1324 {
1325 	int b, len = r->res_ls->ls_lvblen;
1326 
1327 	/* b=1 lvb returned to caller
1328 	   b=0 lvb written to rsb or invalidated
1329 	   b=-1 do nothing */
1330 
1331 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1332 
1333 	if (b == 1) {
1334 		if (!lkb->lkb_lvbptr)
1335 			return;
1336 
1337 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1338 			return;
1339 
1340 		if (!r->res_lvbptr)
1341 			return;
1342 
1343 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1344 		lkb->lkb_lvbseq = r->res_lvbseq;
1345 
1346 	} else if (b == 0) {
1347 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1348 			rsb_set_flag(r, RSB_VALNOTVALID);
1349 			return;
1350 		}
1351 
1352 		if (!lkb->lkb_lvbptr)
1353 			return;
1354 
1355 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1356 			return;
1357 
1358 		if (!r->res_lvbptr)
1359 			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1360 
1361 		if (!r->res_lvbptr)
1362 			return;
1363 
1364 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1365 		r->res_lvbseq++;
1366 		lkb->lkb_lvbseq = r->res_lvbseq;
1367 		rsb_clear_flag(r, RSB_VALNOTVALID);
1368 	}
1369 
1370 	if (rsb_flag(r, RSB_VALNOTVALID))
1371 		lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1372 }
1373 
1374 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1375 {
1376 	if (lkb->lkb_grmode < DLM_LOCK_PW)
1377 		return;
1378 
1379 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1380 		rsb_set_flag(r, RSB_VALNOTVALID);
1381 		return;
1382 	}
1383 
1384 	if (!lkb->lkb_lvbptr)
1385 		return;
1386 
1387 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1388 		return;
1389 
1390 	if (!r->res_lvbptr)
1391 		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1392 
1393 	if (!r->res_lvbptr)
1394 		return;
1395 
1396 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1397 	r->res_lvbseq++;
1398 	rsb_clear_flag(r, RSB_VALNOTVALID);
1399 }
1400 
1401 /* lkb is process copy (pc) */
1402 
1403 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1404 			    struct dlm_message *ms)
1405 {
1406 	int b;
1407 
1408 	if (!lkb->lkb_lvbptr)
1409 		return;
1410 
1411 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1412 		return;
1413 
1414 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1415 	if (b == 1) {
1416 		int len = receive_extralen(ms);
1417 		if (len > DLM_RESNAME_MAXLEN)
1418 			len = DLM_RESNAME_MAXLEN;
1419 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1420 		lkb->lkb_lvbseq = ms->m_lvbseq;
1421 	}
1422 }
1423 
1424 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1425    remove_lock -- used for unlock, removes lkb from granted
1426    revert_lock -- used for cancel, moves lkb from convert to granted
1427    grant_lock  -- used for request and convert, adds lkb to granted or
1428                   moves lkb from convert or waiting to granted
1429 
1430    Each of these is used for master or local copy lkb's.  There is
1431    also a _pc() variation used to make the corresponding change on
1432    a process copy (pc) lkb. */
1433 
1434 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1435 {
1436 	del_lkb(r, lkb);
1437 	lkb->lkb_grmode = DLM_LOCK_IV;
1438 	/* this unhold undoes the original ref from create_lkb()
1439 	   so this leads to the lkb being freed */
1440 	unhold_lkb(lkb);
1441 }
1442 
1443 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1444 {
1445 	set_lvb_unlock(r, lkb);
1446 	_remove_lock(r, lkb);
1447 }
1448 
1449 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1450 {
1451 	_remove_lock(r, lkb);
1452 }
1453 
1454 /* returns: 0 did nothing
1455 	    1 moved lock to granted
1456 	   -1 removed lock */
1457 
1458 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1459 {
1460 	int rv = 0;
1461 
1462 	lkb->lkb_rqmode = DLM_LOCK_IV;
1463 
1464 	switch (lkb->lkb_status) {
1465 	case DLM_LKSTS_GRANTED:
1466 		break;
1467 	case DLM_LKSTS_CONVERT:
1468 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1469 		rv = 1;
1470 		break;
1471 	case DLM_LKSTS_WAITING:
1472 		del_lkb(r, lkb);
1473 		lkb->lkb_grmode = DLM_LOCK_IV;
1474 		/* this unhold undoes the original ref from create_lkb()
1475 		   so this leads to the lkb being freed */
1476 		unhold_lkb(lkb);
1477 		rv = -1;
1478 		break;
1479 	default:
1480 		log_print("invalid status for revert %d", lkb->lkb_status);
1481 	}
1482 	return rv;
1483 }
1484 
1485 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1486 {
1487 	return revert_lock(r, lkb);
1488 }
1489 
1490 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1491 {
1492 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1493 		lkb->lkb_grmode = lkb->lkb_rqmode;
1494 		if (lkb->lkb_status)
1495 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1496 		else
1497 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1498 	}
1499 
1500 	lkb->lkb_rqmode = DLM_LOCK_IV;
1501 }
1502 
1503 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1504 {
1505 	set_lvb_lock(r, lkb);
1506 	_grant_lock(r, lkb);
1507 	lkb->lkb_highbast = 0;
1508 }
1509 
1510 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1511 			  struct dlm_message *ms)
1512 {
1513 	set_lvb_lock_pc(r, lkb, ms);
1514 	_grant_lock(r, lkb);
1515 }
1516 
1517 /* called by grant_pending_locks() which means an async grant message must
1518    be sent to the requesting node in addition to granting the lock if the
1519    lkb belongs to a remote node. */
1520 
1521 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1522 {
1523 	grant_lock(r, lkb);
1524 	if (is_master_copy(lkb))
1525 		send_grant(r, lkb);
1526 	else
1527 		queue_cast(r, lkb, 0);
1528 }
1529 
1530 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1531    change the granted/requested modes.  We're munging things accordingly in
1532    the process copy.
1533    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1534    conversion deadlock
1535    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1536    compatible with other granted locks */
1537 
1538 static void munge_demoted(struct dlm_lkb *lkb)
1539 {
1540 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1541 		log_print("munge_demoted %x invalid modes gr %d rq %d",
1542 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1543 		return;
1544 	}
1545 
1546 	lkb->lkb_grmode = DLM_LOCK_NL;
1547 }
1548 
1549 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1550 {
1551 	if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1552 	    ms->m_type != DLM_MSG_GRANT) {
1553 		log_print("munge_altmode %x invalid reply type %d",
1554 			  lkb->lkb_id, ms->m_type);
1555 		return;
1556 	}
1557 
1558 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1559 		lkb->lkb_rqmode = DLM_LOCK_PR;
1560 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1561 		lkb->lkb_rqmode = DLM_LOCK_CW;
1562 	else {
1563 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1564 		dlm_print_lkb(lkb);
1565 	}
1566 }
1567 
1568 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1569 {
1570 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1571 					   lkb_statequeue);
1572 	if (lkb->lkb_id == first->lkb_id)
1573 		return 1;
1574 
1575 	return 0;
1576 }
1577 
1578 /* Check if the given lkb conflicts with another lkb on the queue. */
1579 
1580 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1581 {
1582 	struct dlm_lkb *this;
1583 
1584 	list_for_each_entry(this, head, lkb_statequeue) {
1585 		if (this == lkb)
1586 			continue;
1587 		if (!modes_compat(this, lkb))
1588 			return 1;
1589 	}
1590 	return 0;
1591 }
1592 
1593 /*
1594  * "A conversion deadlock arises with a pair of lock requests in the converting
1595  * queue for one resource.  The granted mode of each lock blocks the requested
1596  * mode of the other lock."
1597  *
1598  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1599  * convert queue from being granted, then deadlk/demote lkb.
1600  *
1601  * Example:
1602  * Granted Queue: empty
1603  * Convert Queue: NL->EX (first lock)
1604  *                PR->EX (second lock)
1605  *
1606  * The first lock can't be granted because of the granted mode of the second
1607  * lock and the second lock can't be granted because it's not first in the
1608  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1609  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1610  * flag set and return DEMOTED in the lksb flags.
1611  *
1612  * Originally, this function detected conv-deadlk in a more limited scope:
1613  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1614  * - if lkb1 was the first entry in the queue (not just earlier), and was
1615  *   blocked by the granted mode of lkb2, and there was nothing on the
1616  *   granted queue preventing lkb1 from being granted immediately, i.e.
1617  *   lkb2 was the only thing preventing lkb1 from being granted.
1618  *
1619  * That second condition meant we'd only say there was conv-deadlk if
1620  * resolving it (by demotion) would lead to the first lock on the convert
1621  * queue being granted right away.  It allowed conversion deadlocks to exist
1622  * between locks on the convert queue while they couldn't be granted anyway.
1623  *
1624  * Now, we detect and take action on conversion deadlocks immediately when
1625  * they're created, even if they may not be immediately consequential.  If
1626  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1627  * mode that would prevent lkb1's conversion from being granted, we do a
1628  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1629  * I think this means that the lkb_is_ahead condition below should always
1630  * be zero, i.e. there will never be conv-deadlk between two locks that are
1631  * both already on the convert queue.
1632  */
1633 
1634 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1635 {
1636 	struct dlm_lkb *lkb1;
1637 	int lkb_is_ahead = 0;
1638 
1639 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1640 		if (lkb1 == lkb2) {
1641 			lkb_is_ahead = 1;
1642 			continue;
1643 		}
1644 
1645 		if (!lkb_is_ahead) {
1646 			if (!modes_compat(lkb2, lkb1))
1647 				return 1;
1648 		} else {
1649 			if (!modes_compat(lkb2, lkb1) &&
1650 			    !modes_compat(lkb1, lkb2))
1651 				return 1;
1652 		}
1653 	}
1654 	return 0;
1655 }
1656 
1657 /*
1658  * Return 1 if the lock can be granted, 0 otherwise.
1659  * Also detect and resolve conversion deadlocks.
1660  *
1661  * lkb is the lock to be granted
1662  *
1663  * now is 1 if the function is being called in the context of the
1664  * immediate request, it is 0 if called later, after the lock has been
1665  * queued.
1666  *
1667  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1668  */
1669 
1670 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1671 {
1672 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1673 
1674 	/*
1675 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1676 	 * a new request for a NL mode lock being blocked.
1677 	 *
1678 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1679 	 * request, then it would be granted.  In essence, the use of this flag
1680 	 * tells the Lock Manager to expedite theis request by not considering
1681 	 * what may be in the CONVERTING or WAITING queues...  As of this
1682 	 * writing, the EXPEDITE flag can be used only with new requests for NL
1683 	 * mode locks.  This flag is not valid for conversion requests.
1684 	 *
1685 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1686 	 * conversion or used with a non-NL requested mode.  We also know an
1687 	 * EXPEDITE request is always granted immediately, so now must always
1688 	 * be 1.  The full condition to grant an expedite request: (now &&
1689 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1690 	 * therefore be shortened to just checking the flag.
1691 	 */
1692 
1693 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1694 		return 1;
1695 
1696 	/*
1697 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1698 	 * added to the remaining conditions.
1699 	 */
1700 
1701 	if (queue_conflict(&r->res_grantqueue, lkb))
1702 		goto out;
1703 
1704 	/*
1705 	 * 6-3: By default, a conversion request is immediately granted if the
1706 	 * requested mode is compatible with the modes of all other granted
1707 	 * locks
1708 	 */
1709 
1710 	if (queue_conflict(&r->res_convertqueue, lkb))
1711 		goto out;
1712 
1713 	/*
1714 	 * 6-5: But the default algorithm for deciding whether to grant or
1715 	 * queue conversion requests does not by itself guarantee that such
1716 	 * requests are serviced on a "first come first serve" basis.  This, in
1717 	 * turn, can lead to a phenomenon known as "indefinate postponement".
1718 	 *
1719 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1720 	 * the system service employed to request a lock conversion.  This flag
1721 	 * forces certain conversion requests to be queued, even if they are
1722 	 * compatible with the granted modes of other locks on the same
1723 	 * resource.  Thus, the use of this flag results in conversion requests
1724 	 * being ordered on a "first come first servce" basis.
1725 	 *
1726 	 * DCT: This condition is all about new conversions being able to occur
1727 	 * "in place" while the lock remains on the granted queue (assuming
1728 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1729 	 * doesn't _have_ to go onto the convert queue where it's processed in
1730 	 * order.  The "now" variable is necessary to distinguish converts
1731 	 * being received and processed for the first time now, because once a
1732 	 * convert is moved to the conversion queue the condition below applies
1733 	 * requiring fifo granting.
1734 	 */
1735 
1736 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1737 		return 1;
1738 
1739 	/*
1740 	 * Even if the convert is compat with all granted locks,
1741 	 * QUECVT forces it behind other locks on the convert queue.
1742 	 */
1743 
1744 	if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
1745 		if (list_empty(&r->res_convertqueue))
1746 			return 1;
1747 		else
1748 			goto out;
1749 	}
1750 
1751 	/*
1752 	 * The NOORDER flag is set to avoid the standard vms rules on grant
1753 	 * order.
1754 	 */
1755 
1756 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1757 		return 1;
1758 
1759 	/*
1760 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1761 	 * granted until all other conversion requests ahead of it are granted
1762 	 * and/or canceled.
1763 	 */
1764 
1765 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1766 		return 1;
1767 
1768 	/*
1769 	 * 6-4: By default, a new request is immediately granted only if all
1770 	 * three of the following conditions are satisfied when the request is
1771 	 * issued:
1772 	 * - The queue of ungranted conversion requests for the resource is
1773 	 *   empty.
1774 	 * - The queue of ungranted new requests for the resource is empty.
1775 	 * - The mode of the new request is compatible with the most
1776 	 *   restrictive mode of all granted locks on the resource.
1777 	 */
1778 
1779 	if (now && !conv && list_empty(&r->res_convertqueue) &&
1780 	    list_empty(&r->res_waitqueue))
1781 		return 1;
1782 
1783 	/*
1784 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
1785 	 * it cannot be granted until the queue of ungranted conversion
1786 	 * requests is empty, all ungranted new requests ahead of it are
1787 	 * granted and/or canceled, and it is compatible with the granted mode
1788 	 * of the most restrictive lock granted on the resource.
1789 	 */
1790 
1791 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
1792 	    first_in_list(lkb, &r->res_waitqueue))
1793 		return 1;
1794  out:
1795 	return 0;
1796 }
1797 
1798 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1799 			  int *err)
1800 {
1801 	int rv;
1802 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1803 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1804 
1805 	if (err)
1806 		*err = 0;
1807 
1808 	rv = _can_be_granted(r, lkb, now);
1809 	if (rv)
1810 		goto out;
1811 
1812 	/*
1813 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1814 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1815 	 * cancels one of the locks.
1816 	 */
1817 
1818 	if (is_convert && can_be_queued(lkb) &&
1819 	    conversion_deadlock_detect(r, lkb)) {
1820 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1821 			lkb->lkb_grmode = DLM_LOCK_NL;
1822 			lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1823 		} else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1824 			if (err)
1825 				*err = -EDEADLK;
1826 			else {
1827 				log_print("can_be_granted deadlock %x now %d",
1828 					  lkb->lkb_id, now);
1829 				dlm_dump_rsb(r);
1830 			}
1831 		}
1832 		goto out;
1833 	}
1834 
1835 	/*
1836 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1837 	 * to grant a request in a mode other than the normal rqmode.  It's a
1838 	 * simple way to provide a big optimization to applications that can
1839 	 * use them.
1840 	 */
1841 
1842 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1843 		alt = DLM_LOCK_PR;
1844 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1845 		alt = DLM_LOCK_CW;
1846 
1847 	if (alt) {
1848 		lkb->lkb_rqmode = alt;
1849 		rv = _can_be_granted(r, lkb, now);
1850 		if (rv)
1851 			lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1852 		else
1853 			lkb->lkb_rqmode = rqmode;
1854 	}
1855  out:
1856 	return rv;
1857 }
1858 
1859 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1860    for locks pending on the convert list.  Once verified (watch for these
1861    log_prints), we should be able to just call _can_be_granted() and not
1862    bother with the demote/deadlk cases here (and there's no easy way to deal
1863    with a deadlk here, we'd have to generate something like grant_lock with
1864    the deadlk error.) */
1865 
1866 /* Returns the highest requested mode of all blocked conversions; sets
1867    cw if there's a blocked conversion to DLM_LOCK_CW. */
1868 
1869 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1870 {
1871 	struct dlm_lkb *lkb, *s;
1872 	int hi, demoted, quit, grant_restart, demote_restart;
1873 	int deadlk;
1874 
1875 	quit = 0;
1876  restart:
1877 	grant_restart = 0;
1878 	demote_restart = 0;
1879 	hi = DLM_LOCK_IV;
1880 
1881 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1882 		demoted = is_demoted(lkb);
1883 		deadlk = 0;
1884 
1885 		if (can_be_granted(r, lkb, 0, &deadlk)) {
1886 			grant_lock_pending(r, lkb);
1887 			grant_restart = 1;
1888 			continue;
1889 		}
1890 
1891 		if (!demoted && is_demoted(lkb)) {
1892 			log_print("WARN: pending demoted %x node %d %s",
1893 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1894 			demote_restart = 1;
1895 			continue;
1896 		}
1897 
1898 		if (deadlk) {
1899 			log_print("WARN: pending deadlock %x node %d %s",
1900 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1901 			dlm_dump_rsb(r);
1902 			continue;
1903 		}
1904 
1905 		hi = max_t(int, lkb->lkb_rqmode, hi);
1906 
1907 		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1908 			*cw = 1;
1909 	}
1910 
1911 	if (grant_restart)
1912 		goto restart;
1913 	if (demote_restart && !quit) {
1914 		quit = 1;
1915 		goto restart;
1916 	}
1917 
1918 	return max_t(int, high, hi);
1919 }
1920 
1921 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1922 {
1923 	struct dlm_lkb *lkb, *s;
1924 
1925 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1926 		if (can_be_granted(r, lkb, 0, NULL))
1927 			grant_lock_pending(r, lkb);
1928                 else {
1929 			high = max_t(int, lkb->lkb_rqmode, high);
1930 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
1931 				*cw = 1;
1932 		}
1933 	}
1934 
1935 	return high;
1936 }
1937 
1938 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1939    on either the convert or waiting queue.
1940    high is the largest rqmode of all locks blocked on the convert or
1941    waiting queue. */
1942 
1943 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1944 {
1945 	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1946 		if (gr->lkb_highbast < DLM_LOCK_EX)
1947 			return 1;
1948 		return 0;
1949 	}
1950 
1951 	if (gr->lkb_highbast < high &&
1952 	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1953 		return 1;
1954 	return 0;
1955 }
1956 
1957 static void grant_pending_locks(struct dlm_rsb *r)
1958 {
1959 	struct dlm_lkb *lkb, *s;
1960 	int high = DLM_LOCK_IV;
1961 	int cw = 0;
1962 
1963 	DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1964 
1965 	high = grant_pending_convert(r, high, &cw);
1966 	high = grant_pending_wait(r, high, &cw);
1967 
1968 	if (high == DLM_LOCK_IV)
1969 		return;
1970 
1971 	/*
1972 	 * If there are locks left on the wait/convert queue then send blocking
1973 	 * ASTs to granted locks based on the largest requested mode (high)
1974 	 * found above.
1975 	 */
1976 
1977 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1978 		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1979 			if (cw && high == DLM_LOCK_PR &&
1980 			    lkb->lkb_grmode == DLM_LOCK_PR)
1981 				queue_bast(r, lkb, DLM_LOCK_CW);
1982 			else
1983 				queue_bast(r, lkb, high);
1984 			lkb->lkb_highbast = high;
1985 		}
1986 	}
1987 }
1988 
1989 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1990 {
1991 	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1992 	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1993 		if (gr->lkb_highbast < DLM_LOCK_EX)
1994 			return 1;
1995 		return 0;
1996 	}
1997 
1998 	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1999 		return 1;
2000 	return 0;
2001 }
2002 
2003 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2004 			    struct dlm_lkb *lkb)
2005 {
2006 	struct dlm_lkb *gr;
2007 
2008 	list_for_each_entry(gr, head, lkb_statequeue) {
2009 		/* skip self when sending basts to convertqueue */
2010 		if (gr == lkb)
2011 			continue;
2012 		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2013 			queue_bast(r, gr, lkb->lkb_rqmode);
2014 			gr->lkb_highbast = lkb->lkb_rqmode;
2015 		}
2016 	}
2017 }
2018 
2019 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2020 {
2021 	send_bast_queue(r, &r->res_grantqueue, lkb);
2022 }
2023 
2024 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2025 {
2026 	send_bast_queue(r, &r->res_grantqueue, lkb);
2027 	send_bast_queue(r, &r->res_convertqueue, lkb);
2028 }
2029 
2030 /* set_master(r, lkb) -- set the master nodeid of a resource
2031 
2032    The purpose of this function is to set the nodeid field in the given
2033    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2034    known, it can just be copied to the lkb and the function will return
2035    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2036    before it can be copied to the lkb.
2037 
2038    When the rsb nodeid is being looked up remotely, the initial lkb
2039    causing the lookup is kept on the ls_waiters list waiting for the
2040    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2041    on the rsb's res_lookup list until the master is verified.
2042 
2043    Return values:
2044    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2045    1: the rsb master is not available and the lkb has been placed on
2046       a wait queue
2047 */
2048 
2049 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2050 {
2051 	struct dlm_ls *ls = r->res_ls;
2052 	int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
2053 
2054 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2055 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2056 		r->res_first_lkid = lkb->lkb_id;
2057 		lkb->lkb_nodeid = r->res_nodeid;
2058 		return 0;
2059 	}
2060 
2061 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2062 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2063 		return 1;
2064 	}
2065 
2066 	if (r->res_nodeid == 0) {
2067 		lkb->lkb_nodeid = 0;
2068 		return 0;
2069 	}
2070 
2071 	if (r->res_nodeid > 0) {
2072 		lkb->lkb_nodeid = r->res_nodeid;
2073 		return 0;
2074 	}
2075 
2076 	DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
2077 
2078 	dir_nodeid = dlm_dir_nodeid(r);
2079 
2080 	if (dir_nodeid != our_nodeid) {
2081 		r->res_first_lkid = lkb->lkb_id;
2082 		send_lookup(r, lkb);
2083 		return 1;
2084 	}
2085 
2086 	for (i = 0; i < 2; i++) {
2087 		/* It's possible for dlm_scand to remove an old rsb for
2088 		   this same resource from the toss list, us to create
2089 		   a new one, look up the master locally, and find it
2090 		   already exists just before dlm_scand does the
2091 		   dir_remove() on the previous rsb. */
2092 
2093 		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
2094 				       r->res_length, &ret_nodeid);
2095 		if (!error)
2096 			break;
2097 		log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
2098 		schedule();
2099 	}
2100 	if (error && error != -EEXIST)
2101 		return error;
2102 
2103 	if (ret_nodeid == our_nodeid) {
2104 		r->res_first_lkid = 0;
2105 		r->res_nodeid = 0;
2106 		lkb->lkb_nodeid = 0;
2107 	} else {
2108 		r->res_first_lkid = lkb->lkb_id;
2109 		r->res_nodeid = ret_nodeid;
2110 		lkb->lkb_nodeid = ret_nodeid;
2111 	}
2112 	return 0;
2113 }
2114 
2115 static void process_lookup_list(struct dlm_rsb *r)
2116 {
2117 	struct dlm_lkb *lkb, *safe;
2118 
2119 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2120 		list_del_init(&lkb->lkb_rsb_lookup);
2121 		_request_lock(r, lkb);
2122 		schedule();
2123 	}
2124 }
2125 
2126 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2127 
2128 static void confirm_master(struct dlm_rsb *r, int error)
2129 {
2130 	struct dlm_lkb *lkb;
2131 
2132 	if (!r->res_first_lkid)
2133 		return;
2134 
2135 	switch (error) {
2136 	case 0:
2137 	case -EINPROGRESS:
2138 		r->res_first_lkid = 0;
2139 		process_lookup_list(r);
2140 		break;
2141 
2142 	case -EAGAIN:
2143 	case -EBADR:
2144 	case -ENOTBLK:
2145 		/* the remote request failed and won't be retried (it was
2146 		   a NOQUEUE, or has been canceled/unlocked); make a waiting
2147 		   lkb the first_lkid */
2148 
2149 		r->res_first_lkid = 0;
2150 
2151 		if (!list_empty(&r->res_lookup)) {
2152 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2153 					 lkb_rsb_lookup);
2154 			list_del_init(&lkb->lkb_rsb_lookup);
2155 			r->res_first_lkid = lkb->lkb_id;
2156 			_request_lock(r, lkb);
2157 		}
2158 		break;
2159 
2160 	default:
2161 		log_error(r->res_ls, "confirm_master unknown error %d", error);
2162 	}
2163 }
2164 
2165 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2166 			 int namelen, unsigned long timeout_cs,
2167 			 void (*ast) (void *astparam),
2168 			 void *astparam,
2169 			 void (*bast) (void *astparam, int mode),
2170 			 struct dlm_args *args)
2171 {
2172 	int rv = -EINVAL;
2173 
2174 	/* check for invalid arg usage */
2175 
2176 	if (mode < 0 || mode > DLM_LOCK_EX)
2177 		goto out;
2178 
2179 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2180 		goto out;
2181 
2182 	if (flags & DLM_LKF_CANCEL)
2183 		goto out;
2184 
2185 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2186 		goto out;
2187 
2188 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2189 		goto out;
2190 
2191 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2192 		goto out;
2193 
2194 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2195 		goto out;
2196 
2197 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2198 		goto out;
2199 
2200 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2201 		goto out;
2202 
2203 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2204 		goto out;
2205 
2206 	if (!ast || !lksb)
2207 		goto out;
2208 
2209 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2210 		goto out;
2211 
2212 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2213 		goto out;
2214 
2215 	/* these args will be copied to the lkb in validate_lock_args,
2216 	   it cannot be done now because when converting locks, fields in
2217 	   an active lkb cannot be modified before locking the rsb */
2218 
2219 	args->flags = flags;
2220 	args->astfn = ast;
2221 	args->astparam = astparam;
2222 	args->bastfn = bast;
2223 	args->timeout = timeout_cs;
2224 	args->mode = mode;
2225 	args->lksb = lksb;
2226 	rv = 0;
2227  out:
2228 	return rv;
2229 }
2230 
2231 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2232 {
2233 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2234  		      DLM_LKF_FORCEUNLOCK))
2235 		return -EINVAL;
2236 
2237 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2238 		return -EINVAL;
2239 
2240 	args->flags = flags;
2241 	args->astparam = astarg;
2242 	return 0;
2243 }
2244 
2245 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2246 			      struct dlm_args *args)
2247 {
2248 	int rv = -EINVAL;
2249 
2250 	if (args->flags & DLM_LKF_CONVERT) {
2251 		if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2252 			goto out;
2253 
2254 		if (args->flags & DLM_LKF_QUECVT &&
2255 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2256 			goto out;
2257 
2258 		rv = -EBUSY;
2259 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2260 			goto out;
2261 
2262 		if (lkb->lkb_wait_type)
2263 			goto out;
2264 
2265 		if (is_overlap(lkb))
2266 			goto out;
2267 	}
2268 
2269 	lkb->lkb_exflags = args->flags;
2270 	lkb->lkb_sbflags = 0;
2271 	lkb->lkb_astfn = args->astfn;
2272 	lkb->lkb_astparam = args->astparam;
2273 	lkb->lkb_bastfn = args->bastfn;
2274 	lkb->lkb_rqmode = args->mode;
2275 	lkb->lkb_lksb = args->lksb;
2276 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2277 	lkb->lkb_ownpid = (int) current->pid;
2278 	lkb->lkb_timeout_cs = args->timeout;
2279 	rv = 0;
2280  out:
2281 	if (rv)
2282 		log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2283 			  rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2284 			  lkb->lkb_status, lkb->lkb_wait_type,
2285 			  lkb->lkb_resource->res_name);
2286 	return rv;
2287 }
2288 
2289 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2290    for success */
2291 
2292 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2293    because there may be a lookup in progress and it's valid to do
2294    cancel/unlockf on it */
2295 
2296 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2297 {
2298 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2299 	int rv = -EINVAL;
2300 
2301 	if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2302 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2303 		dlm_print_lkb(lkb);
2304 		goto out;
2305 	}
2306 
2307 	/* an lkb may still exist even though the lock is EOL'ed due to a
2308 	   cancel, unlock or failed noqueue request; an app can't use these
2309 	   locks; return same error as if the lkid had not been found at all */
2310 
2311 	if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2312 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2313 		rv = -ENOENT;
2314 		goto out;
2315 	}
2316 
2317 	/* an lkb may be waiting for an rsb lookup to complete where the
2318 	   lookup was initiated by another lock */
2319 
2320 	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2321 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2322 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2323 			list_del_init(&lkb->lkb_rsb_lookup);
2324 			queue_cast(lkb->lkb_resource, lkb,
2325 				   args->flags & DLM_LKF_CANCEL ?
2326 				   -DLM_ECANCEL : -DLM_EUNLOCK);
2327 			unhold_lkb(lkb); /* undoes create_lkb() */
2328 		}
2329 		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2330 		rv = -EBUSY;
2331 		goto out;
2332 	}
2333 
2334 	/* cancel not allowed with another cancel/unlock in progress */
2335 
2336 	if (args->flags & DLM_LKF_CANCEL) {
2337 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2338 			goto out;
2339 
2340 		if (is_overlap(lkb))
2341 			goto out;
2342 
2343 		/* don't let scand try to do a cancel */
2344 		del_timeout(lkb);
2345 
2346 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2347 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2348 			rv = -EBUSY;
2349 			goto out;
2350 		}
2351 
2352 		/* there's nothing to cancel */
2353 		if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2354 		    !lkb->lkb_wait_type) {
2355 			rv = -EBUSY;
2356 			goto out;
2357 		}
2358 
2359 		switch (lkb->lkb_wait_type) {
2360 		case DLM_MSG_LOOKUP:
2361 		case DLM_MSG_REQUEST:
2362 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2363 			rv = -EBUSY;
2364 			goto out;
2365 		case DLM_MSG_UNLOCK:
2366 		case DLM_MSG_CANCEL:
2367 			goto out;
2368 		}
2369 		/* add_to_waiters() will set OVERLAP_CANCEL */
2370 		goto out_ok;
2371 	}
2372 
2373 	/* do we need to allow a force-unlock if there's a normal unlock
2374 	   already in progress?  in what conditions could the normal unlock
2375 	   fail such that we'd want to send a force-unlock to be sure? */
2376 
2377 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2378 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2379 			goto out;
2380 
2381 		if (is_overlap_unlock(lkb))
2382 			goto out;
2383 
2384 		/* don't let scand try to do a cancel */
2385 		del_timeout(lkb);
2386 
2387 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2388 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2389 			rv = -EBUSY;
2390 			goto out;
2391 		}
2392 
2393 		switch (lkb->lkb_wait_type) {
2394 		case DLM_MSG_LOOKUP:
2395 		case DLM_MSG_REQUEST:
2396 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2397 			rv = -EBUSY;
2398 			goto out;
2399 		case DLM_MSG_UNLOCK:
2400 			goto out;
2401 		}
2402 		/* add_to_waiters() will set OVERLAP_UNLOCK */
2403 		goto out_ok;
2404 	}
2405 
2406 	/* normal unlock not allowed if there's any op in progress */
2407 	rv = -EBUSY;
2408 	if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2409 		goto out;
2410 
2411  out_ok:
2412 	/* an overlapping op shouldn't blow away exflags from other op */
2413 	lkb->lkb_exflags |= args->flags;
2414 	lkb->lkb_sbflags = 0;
2415 	lkb->lkb_astparam = args->astparam;
2416 	rv = 0;
2417  out:
2418 	if (rv)
2419 		log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2420 			  lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2421 			  args->flags, lkb->lkb_wait_type,
2422 			  lkb->lkb_resource->res_name);
2423 	return rv;
2424 }
2425 
2426 /*
2427  * Four stage 4 varieties:
2428  * do_request(), do_convert(), do_unlock(), do_cancel()
2429  * These are called on the master node for the given lock and
2430  * from the central locking logic.
2431  */
2432 
2433 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2434 {
2435 	int error = 0;
2436 
2437 	if (can_be_granted(r, lkb, 1, NULL)) {
2438 		grant_lock(r, lkb);
2439 		queue_cast(r, lkb, 0);
2440 		goto out;
2441 	}
2442 
2443 	if (can_be_queued(lkb)) {
2444 		error = -EINPROGRESS;
2445 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
2446 		add_timeout(lkb);
2447 		goto out;
2448 	}
2449 
2450 	error = -EAGAIN;
2451 	queue_cast(r, lkb, -EAGAIN);
2452  out:
2453 	return error;
2454 }
2455 
2456 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2457 			       int error)
2458 {
2459 	switch (error) {
2460 	case -EAGAIN:
2461 		if (force_blocking_asts(lkb))
2462 			send_blocking_asts_all(r, lkb);
2463 		break;
2464 	case -EINPROGRESS:
2465 		send_blocking_asts(r, lkb);
2466 		break;
2467 	}
2468 }
2469 
2470 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2471 {
2472 	int error = 0;
2473 	int deadlk = 0;
2474 
2475 	/* changing an existing lock may allow others to be granted */
2476 
2477 	if (can_be_granted(r, lkb, 1, &deadlk)) {
2478 		grant_lock(r, lkb);
2479 		queue_cast(r, lkb, 0);
2480 		goto out;
2481 	}
2482 
2483 	/* can_be_granted() detected that this lock would block in a conversion
2484 	   deadlock, so we leave it on the granted queue and return EDEADLK in
2485 	   the ast for the convert. */
2486 
2487 	if (deadlk) {
2488 		/* it's left on the granted queue */
2489 		revert_lock(r, lkb);
2490 		queue_cast(r, lkb, -EDEADLK);
2491 		error = -EDEADLK;
2492 		goto out;
2493 	}
2494 
2495 	/* is_demoted() means the can_be_granted() above set the grmode
2496 	   to NL, and left us on the granted queue.  This auto-demotion
2497 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
2498 	   now grantable.  We have to try to grant other converting locks
2499 	   before we try again to grant this one. */
2500 
2501 	if (is_demoted(lkb)) {
2502 		grant_pending_convert(r, DLM_LOCK_IV, NULL);
2503 		if (_can_be_granted(r, lkb, 1)) {
2504 			grant_lock(r, lkb);
2505 			queue_cast(r, lkb, 0);
2506 			goto out;
2507 		}
2508 		/* else fall through and move to convert queue */
2509 	}
2510 
2511 	if (can_be_queued(lkb)) {
2512 		error = -EINPROGRESS;
2513 		del_lkb(r, lkb);
2514 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2515 		add_timeout(lkb);
2516 		goto out;
2517 	}
2518 
2519 	error = -EAGAIN;
2520 	queue_cast(r, lkb, -EAGAIN);
2521  out:
2522 	return error;
2523 }
2524 
2525 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2526 			       int error)
2527 {
2528 	switch (error) {
2529 	case 0:
2530 		grant_pending_locks(r);
2531 		/* grant_pending_locks also sends basts */
2532 		break;
2533 	case -EAGAIN:
2534 		if (force_blocking_asts(lkb))
2535 			send_blocking_asts_all(r, lkb);
2536 		break;
2537 	case -EINPROGRESS:
2538 		send_blocking_asts(r, lkb);
2539 		break;
2540 	}
2541 }
2542 
2543 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2544 {
2545 	remove_lock(r, lkb);
2546 	queue_cast(r, lkb, -DLM_EUNLOCK);
2547 	return -DLM_EUNLOCK;
2548 }
2549 
2550 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2551 			      int error)
2552 {
2553 	grant_pending_locks(r);
2554 }
2555 
2556 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2557 
2558 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2559 {
2560 	int error;
2561 
2562 	error = revert_lock(r, lkb);
2563 	if (error) {
2564 		queue_cast(r, lkb, -DLM_ECANCEL);
2565 		return -DLM_ECANCEL;
2566 	}
2567 	return 0;
2568 }
2569 
2570 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2571 			      int error)
2572 {
2573 	if (error)
2574 		grant_pending_locks(r);
2575 }
2576 
2577 /*
2578  * Four stage 3 varieties:
2579  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2580  */
2581 
2582 /* add a new lkb to a possibly new rsb, called by requesting process */
2583 
2584 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2585 {
2586 	int error;
2587 
2588 	/* set_master: sets lkb nodeid from r */
2589 
2590 	error = set_master(r, lkb);
2591 	if (error < 0)
2592 		goto out;
2593 	if (error) {
2594 		error = 0;
2595 		goto out;
2596 	}
2597 
2598 	if (is_remote(r)) {
2599 		/* receive_request() calls do_request() on remote node */
2600 		error = send_request(r, lkb);
2601 	} else {
2602 		error = do_request(r, lkb);
2603 		/* for remote locks the request_reply is sent
2604 		   between do_request and do_request_effects */
2605 		do_request_effects(r, lkb, error);
2606 	}
2607  out:
2608 	return error;
2609 }
2610 
2611 /* change some property of an existing lkb, e.g. mode */
2612 
2613 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2614 {
2615 	int error;
2616 
2617 	if (is_remote(r)) {
2618 		/* receive_convert() calls do_convert() on remote node */
2619 		error = send_convert(r, lkb);
2620 	} else {
2621 		error = do_convert(r, lkb);
2622 		/* for remote locks the convert_reply is sent
2623 		   between do_convert and do_convert_effects */
2624 		do_convert_effects(r, lkb, error);
2625 	}
2626 
2627 	return error;
2628 }
2629 
2630 /* remove an existing lkb from the granted queue */
2631 
2632 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2633 {
2634 	int error;
2635 
2636 	if (is_remote(r)) {
2637 		/* receive_unlock() calls do_unlock() on remote node */
2638 		error = send_unlock(r, lkb);
2639 	} else {
2640 		error = do_unlock(r, lkb);
2641 		/* for remote locks the unlock_reply is sent
2642 		   between do_unlock and do_unlock_effects */
2643 		do_unlock_effects(r, lkb, error);
2644 	}
2645 
2646 	return error;
2647 }
2648 
2649 /* remove an existing lkb from the convert or wait queue */
2650 
2651 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2652 {
2653 	int error;
2654 
2655 	if (is_remote(r)) {
2656 		/* receive_cancel() calls do_cancel() on remote node */
2657 		error = send_cancel(r, lkb);
2658 	} else {
2659 		error = do_cancel(r, lkb);
2660 		/* for remote locks the cancel_reply is sent
2661 		   between do_cancel and do_cancel_effects */
2662 		do_cancel_effects(r, lkb, error);
2663 	}
2664 
2665 	return error;
2666 }
2667 
2668 /*
2669  * Four stage 2 varieties:
2670  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2671  */
2672 
2673 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2674 			int len, struct dlm_args *args)
2675 {
2676 	struct dlm_rsb *r;
2677 	int error;
2678 
2679 	error = validate_lock_args(ls, lkb, args);
2680 	if (error)
2681 		goto out;
2682 
2683 	error = find_rsb(ls, name, len, R_CREATE, &r);
2684 	if (error)
2685 		goto out;
2686 
2687 	lock_rsb(r);
2688 
2689 	attach_lkb(r, lkb);
2690 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2691 
2692 	error = _request_lock(r, lkb);
2693 
2694 	unlock_rsb(r);
2695 	put_rsb(r);
2696 
2697  out:
2698 	return error;
2699 }
2700 
2701 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2702 			struct dlm_args *args)
2703 {
2704 	struct dlm_rsb *r;
2705 	int error;
2706 
2707 	r = lkb->lkb_resource;
2708 
2709 	hold_rsb(r);
2710 	lock_rsb(r);
2711 
2712 	error = validate_lock_args(ls, lkb, args);
2713 	if (error)
2714 		goto out;
2715 
2716 	error = _convert_lock(r, lkb);
2717  out:
2718 	unlock_rsb(r);
2719 	put_rsb(r);
2720 	return error;
2721 }
2722 
2723 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2724 		       struct dlm_args *args)
2725 {
2726 	struct dlm_rsb *r;
2727 	int error;
2728 
2729 	r = lkb->lkb_resource;
2730 
2731 	hold_rsb(r);
2732 	lock_rsb(r);
2733 
2734 	error = validate_unlock_args(lkb, args);
2735 	if (error)
2736 		goto out;
2737 
2738 	error = _unlock_lock(r, lkb);
2739  out:
2740 	unlock_rsb(r);
2741 	put_rsb(r);
2742 	return error;
2743 }
2744 
2745 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2746 		       struct dlm_args *args)
2747 {
2748 	struct dlm_rsb *r;
2749 	int error;
2750 
2751 	r = lkb->lkb_resource;
2752 
2753 	hold_rsb(r);
2754 	lock_rsb(r);
2755 
2756 	error = validate_unlock_args(lkb, args);
2757 	if (error)
2758 		goto out;
2759 
2760 	error = _cancel_lock(r, lkb);
2761  out:
2762 	unlock_rsb(r);
2763 	put_rsb(r);
2764 	return error;
2765 }
2766 
2767 /*
2768  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2769  */
2770 
2771 int dlm_lock(dlm_lockspace_t *lockspace,
2772 	     int mode,
2773 	     struct dlm_lksb *lksb,
2774 	     uint32_t flags,
2775 	     void *name,
2776 	     unsigned int namelen,
2777 	     uint32_t parent_lkid,
2778 	     void (*ast) (void *astarg),
2779 	     void *astarg,
2780 	     void (*bast) (void *astarg, int mode))
2781 {
2782 	struct dlm_ls *ls;
2783 	struct dlm_lkb *lkb;
2784 	struct dlm_args args;
2785 	int error, convert = flags & DLM_LKF_CONVERT;
2786 
2787 	ls = dlm_find_lockspace_local(lockspace);
2788 	if (!ls)
2789 		return -EINVAL;
2790 
2791 	dlm_lock_recovery(ls);
2792 
2793 	if (convert)
2794 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
2795 	else
2796 		error = create_lkb(ls, &lkb);
2797 
2798 	if (error)
2799 		goto out;
2800 
2801 	error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2802 			      astarg, bast, &args);
2803 	if (error)
2804 		goto out_put;
2805 
2806 	if (convert)
2807 		error = convert_lock(ls, lkb, &args);
2808 	else
2809 		error = request_lock(ls, lkb, name, namelen, &args);
2810 
2811 	if (error == -EINPROGRESS)
2812 		error = 0;
2813  out_put:
2814 	if (convert || error)
2815 		__put_lkb(ls, lkb);
2816 	if (error == -EAGAIN || error == -EDEADLK)
2817 		error = 0;
2818  out:
2819 	dlm_unlock_recovery(ls);
2820 	dlm_put_lockspace(ls);
2821 	return error;
2822 }
2823 
2824 int dlm_unlock(dlm_lockspace_t *lockspace,
2825 	       uint32_t lkid,
2826 	       uint32_t flags,
2827 	       struct dlm_lksb *lksb,
2828 	       void *astarg)
2829 {
2830 	struct dlm_ls *ls;
2831 	struct dlm_lkb *lkb;
2832 	struct dlm_args args;
2833 	int error;
2834 
2835 	ls = dlm_find_lockspace_local(lockspace);
2836 	if (!ls)
2837 		return -EINVAL;
2838 
2839 	dlm_lock_recovery(ls);
2840 
2841 	error = find_lkb(ls, lkid, &lkb);
2842 	if (error)
2843 		goto out;
2844 
2845 	error = set_unlock_args(flags, astarg, &args);
2846 	if (error)
2847 		goto out_put;
2848 
2849 	if (flags & DLM_LKF_CANCEL)
2850 		error = cancel_lock(ls, lkb, &args);
2851 	else
2852 		error = unlock_lock(ls, lkb, &args);
2853 
2854 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2855 		error = 0;
2856 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2857 		error = 0;
2858  out_put:
2859 	dlm_put_lkb(lkb);
2860  out:
2861 	dlm_unlock_recovery(ls);
2862 	dlm_put_lockspace(ls);
2863 	return error;
2864 }
2865 
2866 /*
2867  * send/receive routines for remote operations and replies
2868  *
2869  * send_args
2870  * send_common
2871  * send_request			receive_request
2872  * send_convert			receive_convert
2873  * send_unlock			receive_unlock
2874  * send_cancel			receive_cancel
2875  * send_grant			receive_grant
2876  * send_bast			receive_bast
2877  * send_lookup			receive_lookup
2878  * send_remove			receive_remove
2879  *
2880  * 				send_common_reply
2881  * receive_request_reply	send_request_reply
2882  * receive_convert_reply	send_convert_reply
2883  * receive_unlock_reply		send_unlock_reply
2884  * receive_cancel_reply		send_cancel_reply
2885  * receive_lookup_reply		send_lookup_reply
2886  */
2887 
2888 static int _create_message(struct dlm_ls *ls, int mb_len,
2889 			   int to_nodeid, int mstype,
2890 			   struct dlm_message **ms_ret,
2891 			   struct dlm_mhandle **mh_ret)
2892 {
2893 	struct dlm_message *ms;
2894 	struct dlm_mhandle *mh;
2895 	char *mb;
2896 
2897 	/* get_buffer gives us a message handle (mh) that we need to
2898 	   pass into lowcomms_commit and a message buffer (mb) that we
2899 	   write our data into */
2900 
2901 	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2902 	if (!mh)
2903 		return -ENOBUFS;
2904 
2905 	memset(mb, 0, mb_len);
2906 
2907 	ms = (struct dlm_message *) mb;
2908 
2909 	ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2910 	ms->m_header.h_lockspace = ls->ls_global_id;
2911 	ms->m_header.h_nodeid = dlm_our_nodeid();
2912 	ms->m_header.h_length = mb_len;
2913 	ms->m_header.h_cmd = DLM_MSG;
2914 
2915 	ms->m_type = mstype;
2916 
2917 	*mh_ret = mh;
2918 	*ms_ret = ms;
2919 	return 0;
2920 }
2921 
2922 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2923 			  int to_nodeid, int mstype,
2924 			  struct dlm_message **ms_ret,
2925 			  struct dlm_mhandle **mh_ret)
2926 {
2927 	int mb_len = sizeof(struct dlm_message);
2928 
2929 	switch (mstype) {
2930 	case DLM_MSG_REQUEST:
2931 	case DLM_MSG_LOOKUP:
2932 	case DLM_MSG_REMOVE:
2933 		mb_len += r->res_length;
2934 		break;
2935 	case DLM_MSG_CONVERT:
2936 	case DLM_MSG_UNLOCK:
2937 	case DLM_MSG_REQUEST_REPLY:
2938 	case DLM_MSG_CONVERT_REPLY:
2939 	case DLM_MSG_GRANT:
2940 		if (lkb && lkb->lkb_lvbptr)
2941 			mb_len += r->res_ls->ls_lvblen;
2942 		break;
2943 	}
2944 
2945 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2946 			       ms_ret, mh_ret);
2947 }
2948 
2949 /* further lowcomms enhancements or alternate implementations may make
2950    the return value from this function useful at some point */
2951 
2952 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2953 {
2954 	dlm_message_out(ms);
2955 	dlm_lowcomms_commit_buffer(mh);
2956 	return 0;
2957 }
2958 
2959 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2960 		      struct dlm_message *ms)
2961 {
2962 	ms->m_nodeid   = lkb->lkb_nodeid;
2963 	ms->m_pid      = lkb->lkb_ownpid;
2964 	ms->m_lkid     = lkb->lkb_id;
2965 	ms->m_remid    = lkb->lkb_remid;
2966 	ms->m_exflags  = lkb->lkb_exflags;
2967 	ms->m_sbflags  = lkb->lkb_sbflags;
2968 	ms->m_flags    = lkb->lkb_flags;
2969 	ms->m_lvbseq   = lkb->lkb_lvbseq;
2970 	ms->m_status   = lkb->lkb_status;
2971 	ms->m_grmode   = lkb->lkb_grmode;
2972 	ms->m_rqmode   = lkb->lkb_rqmode;
2973 	ms->m_hash     = r->res_hash;
2974 
2975 	/* m_result and m_bastmode are set from function args,
2976 	   not from lkb fields */
2977 
2978 	if (lkb->lkb_bastfn)
2979 		ms->m_asts |= DLM_CB_BAST;
2980 	if (lkb->lkb_astfn)
2981 		ms->m_asts |= DLM_CB_CAST;
2982 
2983 	/* compare with switch in create_message; send_remove() doesn't
2984 	   use send_args() */
2985 
2986 	switch (ms->m_type) {
2987 	case DLM_MSG_REQUEST:
2988 	case DLM_MSG_LOOKUP:
2989 		memcpy(ms->m_extra, r->res_name, r->res_length);
2990 		break;
2991 	case DLM_MSG_CONVERT:
2992 	case DLM_MSG_UNLOCK:
2993 	case DLM_MSG_REQUEST_REPLY:
2994 	case DLM_MSG_CONVERT_REPLY:
2995 	case DLM_MSG_GRANT:
2996 		if (!lkb->lkb_lvbptr)
2997 			break;
2998 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2999 		break;
3000 	}
3001 }
3002 
3003 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3004 {
3005 	struct dlm_message *ms;
3006 	struct dlm_mhandle *mh;
3007 	int to_nodeid, error;
3008 
3009 	to_nodeid = r->res_nodeid;
3010 
3011 	error = add_to_waiters(lkb, mstype, to_nodeid);
3012 	if (error)
3013 		return error;
3014 
3015 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3016 	if (error)
3017 		goto fail;
3018 
3019 	send_args(r, lkb, ms);
3020 
3021 	error = send_message(mh, ms);
3022 	if (error)
3023 		goto fail;
3024 	return 0;
3025 
3026  fail:
3027 	remove_from_waiters(lkb, msg_reply_type(mstype));
3028 	return error;
3029 }
3030 
3031 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3032 {
3033 	return send_common(r, lkb, DLM_MSG_REQUEST);
3034 }
3035 
3036 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3037 {
3038 	int error;
3039 
3040 	error = send_common(r, lkb, DLM_MSG_CONVERT);
3041 
3042 	/* down conversions go without a reply from the master */
3043 	if (!error && down_conversion(lkb)) {
3044 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3045 		r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
3046 		r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3047 		r->res_ls->ls_stub_ms.m_result = 0;
3048 		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3049 	}
3050 
3051 	return error;
3052 }
3053 
3054 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3055    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3056    that the master is still correct. */
3057 
3058 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3059 {
3060 	return send_common(r, lkb, DLM_MSG_UNLOCK);
3061 }
3062 
3063 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3064 {
3065 	return send_common(r, lkb, DLM_MSG_CANCEL);
3066 }
3067 
3068 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3069 {
3070 	struct dlm_message *ms;
3071 	struct dlm_mhandle *mh;
3072 	int to_nodeid, error;
3073 
3074 	to_nodeid = lkb->lkb_nodeid;
3075 
3076 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3077 	if (error)
3078 		goto out;
3079 
3080 	send_args(r, lkb, ms);
3081 
3082 	ms->m_result = 0;
3083 
3084 	error = send_message(mh, ms);
3085  out:
3086 	return error;
3087 }
3088 
3089 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3090 {
3091 	struct dlm_message *ms;
3092 	struct dlm_mhandle *mh;
3093 	int to_nodeid, error;
3094 
3095 	to_nodeid = lkb->lkb_nodeid;
3096 
3097 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3098 	if (error)
3099 		goto out;
3100 
3101 	send_args(r, lkb, ms);
3102 
3103 	ms->m_bastmode = mode;
3104 
3105 	error = send_message(mh, ms);
3106  out:
3107 	return error;
3108 }
3109 
3110 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3111 {
3112 	struct dlm_message *ms;
3113 	struct dlm_mhandle *mh;
3114 	int to_nodeid, error;
3115 
3116 	to_nodeid = dlm_dir_nodeid(r);
3117 
3118 	error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3119 	if (error)
3120 		return error;
3121 
3122 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3123 	if (error)
3124 		goto fail;
3125 
3126 	send_args(r, lkb, ms);
3127 
3128 	error = send_message(mh, ms);
3129 	if (error)
3130 		goto fail;
3131 	return 0;
3132 
3133  fail:
3134 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3135 	return error;
3136 }
3137 
3138 static int send_remove(struct dlm_rsb *r)
3139 {
3140 	struct dlm_message *ms;
3141 	struct dlm_mhandle *mh;
3142 	int to_nodeid, error;
3143 
3144 	to_nodeid = dlm_dir_nodeid(r);
3145 
3146 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3147 	if (error)
3148 		goto out;
3149 
3150 	memcpy(ms->m_extra, r->res_name, r->res_length);
3151 	ms->m_hash = r->res_hash;
3152 
3153 	error = send_message(mh, ms);
3154  out:
3155 	return error;
3156 }
3157 
3158 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3159 			     int mstype, int rv)
3160 {
3161 	struct dlm_message *ms;
3162 	struct dlm_mhandle *mh;
3163 	int to_nodeid, error;
3164 
3165 	to_nodeid = lkb->lkb_nodeid;
3166 
3167 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3168 	if (error)
3169 		goto out;
3170 
3171 	send_args(r, lkb, ms);
3172 
3173 	ms->m_result = rv;
3174 
3175 	error = send_message(mh, ms);
3176  out:
3177 	return error;
3178 }
3179 
3180 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3181 {
3182 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3183 }
3184 
3185 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3186 {
3187 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3188 }
3189 
3190 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3191 {
3192 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3193 }
3194 
3195 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3196 {
3197 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3198 }
3199 
3200 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3201 			     int ret_nodeid, int rv)
3202 {
3203 	struct dlm_rsb *r = &ls->ls_stub_rsb;
3204 	struct dlm_message *ms;
3205 	struct dlm_mhandle *mh;
3206 	int error, nodeid = ms_in->m_header.h_nodeid;
3207 
3208 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3209 	if (error)
3210 		goto out;
3211 
3212 	ms->m_lkid = ms_in->m_lkid;
3213 	ms->m_result = rv;
3214 	ms->m_nodeid = ret_nodeid;
3215 
3216 	error = send_message(mh, ms);
3217  out:
3218 	return error;
3219 }
3220 
3221 /* which args we save from a received message depends heavily on the type
3222    of message, unlike the send side where we can safely send everything about
3223    the lkb for any type of message */
3224 
3225 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3226 {
3227 	lkb->lkb_exflags = ms->m_exflags;
3228 	lkb->lkb_sbflags = ms->m_sbflags;
3229 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3230 		         (ms->m_flags & 0x0000FFFF);
3231 }
3232 
3233 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3234 {
3235 	if (ms->m_flags == DLM_IFL_STUB_MS)
3236 		return;
3237 
3238 	lkb->lkb_sbflags = ms->m_sbflags;
3239 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3240 		         (ms->m_flags & 0x0000FFFF);
3241 }
3242 
3243 static int receive_extralen(struct dlm_message *ms)
3244 {
3245 	return (ms->m_header.h_length - sizeof(struct dlm_message));
3246 }
3247 
3248 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3249 		       struct dlm_message *ms)
3250 {
3251 	int len;
3252 
3253 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3254 		if (!lkb->lkb_lvbptr)
3255 			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3256 		if (!lkb->lkb_lvbptr)
3257 			return -ENOMEM;
3258 		len = receive_extralen(ms);
3259 		if (len > DLM_RESNAME_MAXLEN)
3260 			len = DLM_RESNAME_MAXLEN;
3261 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3262 	}
3263 	return 0;
3264 }
3265 
3266 static void fake_bastfn(void *astparam, int mode)
3267 {
3268 	log_print("fake_bastfn should not be called");
3269 }
3270 
3271 static void fake_astfn(void *astparam)
3272 {
3273 	log_print("fake_astfn should not be called");
3274 }
3275 
3276 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3277 				struct dlm_message *ms)
3278 {
3279 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3280 	lkb->lkb_ownpid = ms->m_pid;
3281 	lkb->lkb_remid = ms->m_lkid;
3282 	lkb->lkb_grmode = DLM_LOCK_IV;
3283 	lkb->lkb_rqmode = ms->m_rqmode;
3284 
3285 	lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3286 	lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3287 
3288 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3289 		/* lkb was just created so there won't be an lvb yet */
3290 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3291 		if (!lkb->lkb_lvbptr)
3292 			return -ENOMEM;
3293 	}
3294 
3295 	return 0;
3296 }
3297 
3298 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3299 				struct dlm_message *ms)
3300 {
3301 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3302 		return -EBUSY;
3303 
3304 	if (receive_lvb(ls, lkb, ms))
3305 		return -ENOMEM;
3306 
3307 	lkb->lkb_rqmode = ms->m_rqmode;
3308 	lkb->lkb_lvbseq = ms->m_lvbseq;
3309 
3310 	return 0;
3311 }
3312 
3313 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3314 			       struct dlm_message *ms)
3315 {
3316 	if (receive_lvb(ls, lkb, ms))
3317 		return -ENOMEM;
3318 	return 0;
3319 }
3320 
3321 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3322    uses to send a reply and that the remote end uses to process the reply. */
3323 
3324 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3325 {
3326 	struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3327 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3328 	lkb->lkb_remid = ms->m_lkid;
3329 }
3330 
3331 /* This is called after the rsb is locked so that we can safely inspect
3332    fields in the lkb. */
3333 
3334 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3335 {
3336 	int from = ms->m_header.h_nodeid;
3337 	int error = 0;
3338 
3339 	switch (ms->m_type) {
3340 	case DLM_MSG_CONVERT:
3341 	case DLM_MSG_UNLOCK:
3342 	case DLM_MSG_CANCEL:
3343 		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3344 			error = -EINVAL;
3345 		break;
3346 
3347 	case DLM_MSG_CONVERT_REPLY:
3348 	case DLM_MSG_UNLOCK_REPLY:
3349 	case DLM_MSG_CANCEL_REPLY:
3350 	case DLM_MSG_GRANT:
3351 	case DLM_MSG_BAST:
3352 		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3353 			error = -EINVAL;
3354 		break;
3355 
3356 	case DLM_MSG_REQUEST_REPLY:
3357 		if (!is_process_copy(lkb))
3358 			error = -EINVAL;
3359 		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3360 			error = -EINVAL;
3361 		break;
3362 
3363 	default:
3364 		error = -EINVAL;
3365 	}
3366 
3367 	if (error)
3368 		log_error(lkb->lkb_resource->res_ls,
3369 			  "ignore invalid message %d from %d %x %x %x %d",
3370 			  ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3371 			  lkb->lkb_flags, lkb->lkb_nodeid);
3372 	return error;
3373 }
3374 
3375 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3376 {
3377 	struct dlm_lkb *lkb;
3378 	struct dlm_rsb *r;
3379 	int error, namelen;
3380 
3381 	error = create_lkb(ls, &lkb);
3382 	if (error)
3383 		goto fail;
3384 
3385 	receive_flags(lkb, ms);
3386 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
3387 	error = receive_request_args(ls, lkb, ms);
3388 	if (error) {
3389 		__put_lkb(ls, lkb);
3390 		goto fail;
3391 	}
3392 
3393 	namelen = receive_extralen(ms);
3394 
3395 	error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3396 	if (error) {
3397 		__put_lkb(ls, lkb);
3398 		goto fail;
3399 	}
3400 
3401 	lock_rsb(r);
3402 
3403 	attach_lkb(r, lkb);
3404 	error = do_request(r, lkb);
3405 	send_request_reply(r, lkb, error);
3406 	do_request_effects(r, lkb, error);
3407 
3408 	unlock_rsb(r);
3409 	put_rsb(r);
3410 
3411 	if (error == -EINPROGRESS)
3412 		error = 0;
3413 	if (error)
3414 		dlm_put_lkb(lkb);
3415 	return;
3416 
3417  fail:
3418 	setup_stub_lkb(ls, ms);
3419 	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3420 }
3421 
3422 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3423 {
3424 	struct dlm_lkb *lkb;
3425 	struct dlm_rsb *r;
3426 	int error, reply = 1;
3427 
3428 	error = find_lkb(ls, ms->m_remid, &lkb);
3429 	if (error)
3430 		goto fail;
3431 
3432 	r = lkb->lkb_resource;
3433 
3434 	hold_rsb(r);
3435 	lock_rsb(r);
3436 
3437 	error = validate_message(lkb, ms);
3438 	if (error)
3439 		goto out;
3440 
3441 	receive_flags(lkb, ms);
3442 
3443 	error = receive_convert_args(ls, lkb, ms);
3444 	if (error) {
3445 		send_convert_reply(r, lkb, error);
3446 		goto out;
3447 	}
3448 
3449 	reply = !down_conversion(lkb);
3450 
3451 	error = do_convert(r, lkb);
3452 	if (reply)
3453 		send_convert_reply(r, lkb, error);
3454 	do_convert_effects(r, lkb, error);
3455  out:
3456 	unlock_rsb(r);
3457 	put_rsb(r);
3458 	dlm_put_lkb(lkb);
3459 	return;
3460 
3461  fail:
3462 	setup_stub_lkb(ls, ms);
3463 	send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3464 }
3465 
3466 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3467 {
3468 	struct dlm_lkb *lkb;
3469 	struct dlm_rsb *r;
3470 	int error;
3471 
3472 	error = find_lkb(ls, ms->m_remid, &lkb);
3473 	if (error)
3474 		goto fail;
3475 
3476 	r = lkb->lkb_resource;
3477 
3478 	hold_rsb(r);
3479 	lock_rsb(r);
3480 
3481 	error = validate_message(lkb, ms);
3482 	if (error)
3483 		goto out;
3484 
3485 	receive_flags(lkb, ms);
3486 
3487 	error = receive_unlock_args(ls, lkb, ms);
3488 	if (error) {
3489 		send_unlock_reply(r, lkb, error);
3490 		goto out;
3491 	}
3492 
3493 	error = do_unlock(r, lkb);
3494 	send_unlock_reply(r, lkb, error);
3495 	do_unlock_effects(r, lkb, error);
3496  out:
3497 	unlock_rsb(r);
3498 	put_rsb(r);
3499 	dlm_put_lkb(lkb);
3500 	return;
3501 
3502  fail:
3503 	setup_stub_lkb(ls, ms);
3504 	send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3505 }
3506 
3507 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3508 {
3509 	struct dlm_lkb *lkb;
3510 	struct dlm_rsb *r;
3511 	int error;
3512 
3513 	error = find_lkb(ls, ms->m_remid, &lkb);
3514 	if (error)
3515 		goto fail;
3516 
3517 	receive_flags(lkb, ms);
3518 
3519 	r = lkb->lkb_resource;
3520 
3521 	hold_rsb(r);
3522 	lock_rsb(r);
3523 
3524 	error = validate_message(lkb, ms);
3525 	if (error)
3526 		goto out;
3527 
3528 	error = do_cancel(r, lkb);
3529 	send_cancel_reply(r, lkb, error);
3530 	do_cancel_effects(r, lkb, error);
3531  out:
3532 	unlock_rsb(r);
3533 	put_rsb(r);
3534 	dlm_put_lkb(lkb);
3535 	return;
3536 
3537  fail:
3538 	setup_stub_lkb(ls, ms);
3539 	send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3540 }
3541 
3542 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3543 {
3544 	struct dlm_lkb *lkb;
3545 	struct dlm_rsb *r;
3546 	int error;
3547 
3548 	error = find_lkb(ls, ms->m_remid, &lkb);
3549 	if (error) {
3550 		log_debug(ls, "receive_grant from %d no lkb %x",
3551 			  ms->m_header.h_nodeid, ms->m_remid);
3552 		return;
3553 	}
3554 
3555 	r = lkb->lkb_resource;
3556 
3557 	hold_rsb(r);
3558 	lock_rsb(r);
3559 
3560 	error = validate_message(lkb, ms);
3561 	if (error)
3562 		goto out;
3563 
3564 	receive_flags_reply(lkb, ms);
3565 	if (is_altmode(lkb))
3566 		munge_altmode(lkb, ms);
3567 	grant_lock_pc(r, lkb, ms);
3568 	queue_cast(r, lkb, 0);
3569  out:
3570 	unlock_rsb(r);
3571 	put_rsb(r);
3572 	dlm_put_lkb(lkb);
3573 }
3574 
3575 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3576 {
3577 	struct dlm_lkb *lkb;
3578 	struct dlm_rsb *r;
3579 	int error;
3580 
3581 	error = find_lkb(ls, ms->m_remid, &lkb);
3582 	if (error) {
3583 		log_debug(ls, "receive_bast from %d no lkb %x",
3584 			  ms->m_header.h_nodeid, ms->m_remid);
3585 		return;
3586 	}
3587 
3588 	r = lkb->lkb_resource;
3589 
3590 	hold_rsb(r);
3591 	lock_rsb(r);
3592 
3593 	error = validate_message(lkb, ms);
3594 	if (error)
3595 		goto out;
3596 
3597 	queue_bast(r, lkb, ms->m_bastmode);
3598  out:
3599 	unlock_rsb(r);
3600 	put_rsb(r);
3601 	dlm_put_lkb(lkb);
3602 }
3603 
3604 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3605 {
3606 	int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3607 
3608 	from_nodeid = ms->m_header.h_nodeid;
3609 	our_nodeid = dlm_our_nodeid();
3610 
3611 	len = receive_extralen(ms);
3612 
3613 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3614 	if (dir_nodeid != our_nodeid) {
3615 		log_error(ls, "lookup dir_nodeid %d from %d",
3616 			  dir_nodeid, from_nodeid);
3617 		error = -EINVAL;
3618 		ret_nodeid = -1;
3619 		goto out;
3620 	}
3621 
3622 	error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3623 
3624 	/* Optimization: we're master so treat lookup as a request */
3625 	if (!error && ret_nodeid == our_nodeid) {
3626 		receive_request(ls, ms);
3627 		return;
3628 	}
3629  out:
3630 	send_lookup_reply(ls, ms, ret_nodeid, error);
3631 }
3632 
3633 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3634 {
3635 	int len, dir_nodeid, from_nodeid;
3636 
3637 	from_nodeid = ms->m_header.h_nodeid;
3638 
3639 	len = receive_extralen(ms);
3640 
3641 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3642 	if (dir_nodeid != dlm_our_nodeid()) {
3643 		log_error(ls, "remove dir entry dir_nodeid %d from %d",
3644 			  dir_nodeid, from_nodeid);
3645 		return;
3646 	}
3647 
3648 	dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3649 }
3650 
3651 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3652 {
3653 	do_purge(ls, ms->m_nodeid, ms->m_pid);
3654 }
3655 
3656 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3657 {
3658 	struct dlm_lkb *lkb;
3659 	struct dlm_rsb *r;
3660 	int error, mstype, result;
3661 
3662 	error = find_lkb(ls, ms->m_remid, &lkb);
3663 	if (error) {
3664 		log_debug(ls, "receive_request_reply from %d no lkb %x",
3665 			  ms->m_header.h_nodeid, ms->m_remid);
3666 		return;
3667 	}
3668 
3669 	r = lkb->lkb_resource;
3670 	hold_rsb(r);
3671 	lock_rsb(r);
3672 
3673 	error = validate_message(lkb, ms);
3674 	if (error)
3675 		goto out;
3676 
3677 	mstype = lkb->lkb_wait_type;
3678 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3679 	if (error)
3680 		goto out;
3681 
3682 	/* Optimization: the dir node was also the master, so it took our
3683 	   lookup as a request and sent request reply instead of lookup reply */
3684 	if (mstype == DLM_MSG_LOOKUP) {
3685 		r->res_nodeid = ms->m_header.h_nodeid;
3686 		lkb->lkb_nodeid = r->res_nodeid;
3687 	}
3688 
3689 	/* this is the value returned from do_request() on the master */
3690 	result = ms->m_result;
3691 
3692 	switch (result) {
3693 	case -EAGAIN:
3694 		/* request would block (be queued) on remote master */
3695 		queue_cast(r, lkb, -EAGAIN);
3696 		confirm_master(r, -EAGAIN);
3697 		unhold_lkb(lkb); /* undoes create_lkb() */
3698 		break;
3699 
3700 	case -EINPROGRESS:
3701 	case 0:
3702 		/* request was queued or granted on remote master */
3703 		receive_flags_reply(lkb, ms);
3704 		lkb->lkb_remid = ms->m_lkid;
3705 		if (is_altmode(lkb))
3706 			munge_altmode(lkb, ms);
3707 		if (result) {
3708 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
3709 			add_timeout(lkb);
3710 		} else {
3711 			grant_lock_pc(r, lkb, ms);
3712 			queue_cast(r, lkb, 0);
3713 		}
3714 		confirm_master(r, result);
3715 		break;
3716 
3717 	case -EBADR:
3718 	case -ENOTBLK:
3719 		/* find_rsb failed to find rsb or rsb wasn't master */
3720 		log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3721 			  lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3722 		r->res_nodeid = -1;
3723 		lkb->lkb_nodeid = -1;
3724 
3725 		if (is_overlap(lkb)) {
3726 			/* we'll ignore error in cancel/unlock reply */
3727 			queue_cast_overlap(r, lkb);
3728 			confirm_master(r, result);
3729 			unhold_lkb(lkb); /* undoes create_lkb() */
3730 		} else
3731 			_request_lock(r, lkb);
3732 		break;
3733 
3734 	default:
3735 		log_error(ls, "receive_request_reply %x error %d",
3736 			  lkb->lkb_id, result);
3737 	}
3738 
3739 	if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3740 		log_debug(ls, "receive_request_reply %x result %d unlock",
3741 			  lkb->lkb_id, result);
3742 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3743 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3744 		send_unlock(r, lkb);
3745 	} else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3746 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3747 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3748 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3749 		send_cancel(r, lkb);
3750 	} else {
3751 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3752 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3753 	}
3754  out:
3755 	unlock_rsb(r);
3756 	put_rsb(r);
3757 	dlm_put_lkb(lkb);
3758 }
3759 
3760 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3761 				    struct dlm_message *ms)
3762 {
3763 	/* this is the value returned from do_convert() on the master */
3764 	switch (ms->m_result) {
3765 	case -EAGAIN:
3766 		/* convert would block (be queued) on remote master */
3767 		queue_cast(r, lkb, -EAGAIN);
3768 		break;
3769 
3770 	case -EDEADLK:
3771 		receive_flags_reply(lkb, ms);
3772 		revert_lock_pc(r, lkb);
3773 		queue_cast(r, lkb, -EDEADLK);
3774 		break;
3775 
3776 	case -EINPROGRESS:
3777 		/* convert was queued on remote master */
3778 		receive_flags_reply(lkb, ms);
3779 		if (is_demoted(lkb))
3780 			munge_demoted(lkb);
3781 		del_lkb(r, lkb);
3782 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3783 		add_timeout(lkb);
3784 		break;
3785 
3786 	case 0:
3787 		/* convert was granted on remote master */
3788 		receive_flags_reply(lkb, ms);
3789 		if (is_demoted(lkb))
3790 			munge_demoted(lkb);
3791 		grant_lock_pc(r, lkb, ms);
3792 		queue_cast(r, lkb, 0);
3793 		break;
3794 
3795 	default:
3796 		log_error(r->res_ls, "receive_convert_reply %x error %d",
3797 			  lkb->lkb_id, ms->m_result);
3798 	}
3799 }
3800 
3801 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3802 {
3803 	struct dlm_rsb *r = lkb->lkb_resource;
3804 	int error;
3805 
3806 	hold_rsb(r);
3807 	lock_rsb(r);
3808 
3809 	error = validate_message(lkb, ms);
3810 	if (error)
3811 		goto out;
3812 
3813 	/* stub reply can happen with waiters_mutex held */
3814 	error = remove_from_waiters_ms(lkb, ms);
3815 	if (error)
3816 		goto out;
3817 
3818 	__receive_convert_reply(r, lkb, ms);
3819  out:
3820 	unlock_rsb(r);
3821 	put_rsb(r);
3822 }
3823 
3824 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3825 {
3826 	struct dlm_lkb *lkb;
3827 	int error;
3828 
3829 	error = find_lkb(ls, ms->m_remid, &lkb);
3830 	if (error) {
3831 		log_debug(ls, "receive_convert_reply from %d no lkb %x",
3832 			  ms->m_header.h_nodeid, ms->m_remid);
3833 		return;
3834 	}
3835 
3836 	_receive_convert_reply(lkb, ms);
3837 	dlm_put_lkb(lkb);
3838 }
3839 
3840 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3841 {
3842 	struct dlm_rsb *r = lkb->lkb_resource;
3843 	int error;
3844 
3845 	hold_rsb(r);
3846 	lock_rsb(r);
3847 
3848 	error = validate_message(lkb, ms);
3849 	if (error)
3850 		goto out;
3851 
3852 	/* stub reply can happen with waiters_mutex held */
3853 	error = remove_from_waiters_ms(lkb, ms);
3854 	if (error)
3855 		goto out;
3856 
3857 	/* this is the value returned from do_unlock() on the master */
3858 
3859 	switch (ms->m_result) {
3860 	case -DLM_EUNLOCK:
3861 		receive_flags_reply(lkb, ms);
3862 		remove_lock_pc(r, lkb);
3863 		queue_cast(r, lkb, -DLM_EUNLOCK);
3864 		break;
3865 	case -ENOENT:
3866 		break;
3867 	default:
3868 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
3869 			  lkb->lkb_id, ms->m_result);
3870 	}
3871  out:
3872 	unlock_rsb(r);
3873 	put_rsb(r);
3874 }
3875 
3876 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3877 {
3878 	struct dlm_lkb *lkb;
3879 	int error;
3880 
3881 	error = find_lkb(ls, ms->m_remid, &lkb);
3882 	if (error) {
3883 		log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3884 			  ms->m_header.h_nodeid, ms->m_remid);
3885 		return;
3886 	}
3887 
3888 	_receive_unlock_reply(lkb, ms);
3889 	dlm_put_lkb(lkb);
3890 }
3891 
3892 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3893 {
3894 	struct dlm_rsb *r = lkb->lkb_resource;
3895 	int error;
3896 
3897 	hold_rsb(r);
3898 	lock_rsb(r);
3899 
3900 	error = validate_message(lkb, ms);
3901 	if (error)
3902 		goto out;
3903 
3904 	/* stub reply can happen with waiters_mutex held */
3905 	error = remove_from_waiters_ms(lkb, ms);
3906 	if (error)
3907 		goto out;
3908 
3909 	/* this is the value returned from do_cancel() on the master */
3910 
3911 	switch (ms->m_result) {
3912 	case -DLM_ECANCEL:
3913 		receive_flags_reply(lkb, ms);
3914 		revert_lock_pc(r, lkb);
3915 		queue_cast(r, lkb, -DLM_ECANCEL);
3916 		break;
3917 	case 0:
3918 		break;
3919 	default:
3920 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
3921 			  lkb->lkb_id, ms->m_result);
3922 	}
3923  out:
3924 	unlock_rsb(r);
3925 	put_rsb(r);
3926 }
3927 
3928 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3929 {
3930 	struct dlm_lkb *lkb;
3931 	int error;
3932 
3933 	error = find_lkb(ls, ms->m_remid, &lkb);
3934 	if (error) {
3935 		log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3936 			  ms->m_header.h_nodeid, ms->m_remid);
3937 		return;
3938 	}
3939 
3940 	_receive_cancel_reply(lkb, ms);
3941 	dlm_put_lkb(lkb);
3942 }
3943 
3944 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3945 {
3946 	struct dlm_lkb *lkb;
3947 	struct dlm_rsb *r;
3948 	int error, ret_nodeid;
3949 
3950 	error = find_lkb(ls, ms->m_lkid, &lkb);
3951 	if (error) {
3952 		log_error(ls, "receive_lookup_reply no lkb");
3953 		return;
3954 	}
3955 
3956 	/* ms->m_result is the value returned by dlm_dir_lookup on dir node
3957 	   FIXME: will a non-zero error ever be returned? */
3958 
3959 	r = lkb->lkb_resource;
3960 	hold_rsb(r);
3961 	lock_rsb(r);
3962 
3963 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3964 	if (error)
3965 		goto out;
3966 
3967 	ret_nodeid = ms->m_nodeid;
3968 	if (ret_nodeid == dlm_our_nodeid()) {
3969 		r->res_nodeid = 0;
3970 		ret_nodeid = 0;
3971 		r->res_first_lkid = 0;
3972 	} else {
3973 		/* set_master() will copy res_nodeid to lkb_nodeid */
3974 		r->res_nodeid = ret_nodeid;
3975 	}
3976 
3977 	if (is_overlap(lkb)) {
3978 		log_debug(ls, "receive_lookup_reply %x unlock %x",
3979 			  lkb->lkb_id, lkb->lkb_flags);
3980 		queue_cast_overlap(r, lkb);
3981 		unhold_lkb(lkb); /* undoes create_lkb() */
3982 		goto out_list;
3983 	}
3984 
3985 	_request_lock(r, lkb);
3986 
3987  out_list:
3988 	if (!ret_nodeid)
3989 		process_lookup_list(r);
3990  out:
3991 	unlock_rsb(r);
3992 	put_rsb(r);
3993 	dlm_put_lkb(lkb);
3994 }
3995 
3996 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3997 {
3998 	if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3999 		log_debug(ls, "ignore non-member message %d from %d %x %x %d",
4000 			  ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4001 			  ms->m_remid, ms->m_result);
4002 		return;
4003 	}
4004 
4005 	switch (ms->m_type) {
4006 
4007 	/* messages sent to a master node */
4008 
4009 	case DLM_MSG_REQUEST:
4010 		receive_request(ls, ms);
4011 		break;
4012 
4013 	case DLM_MSG_CONVERT:
4014 		receive_convert(ls, ms);
4015 		break;
4016 
4017 	case DLM_MSG_UNLOCK:
4018 		receive_unlock(ls, ms);
4019 		break;
4020 
4021 	case DLM_MSG_CANCEL:
4022 		receive_cancel(ls, ms);
4023 		break;
4024 
4025 	/* messages sent from a master node (replies to above) */
4026 
4027 	case DLM_MSG_REQUEST_REPLY:
4028 		receive_request_reply(ls, ms);
4029 		break;
4030 
4031 	case DLM_MSG_CONVERT_REPLY:
4032 		receive_convert_reply(ls, ms);
4033 		break;
4034 
4035 	case DLM_MSG_UNLOCK_REPLY:
4036 		receive_unlock_reply(ls, ms);
4037 		break;
4038 
4039 	case DLM_MSG_CANCEL_REPLY:
4040 		receive_cancel_reply(ls, ms);
4041 		break;
4042 
4043 	/* messages sent from a master node (only two types of async msg) */
4044 
4045 	case DLM_MSG_GRANT:
4046 		receive_grant(ls, ms);
4047 		break;
4048 
4049 	case DLM_MSG_BAST:
4050 		receive_bast(ls, ms);
4051 		break;
4052 
4053 	/* messages sent to a dir node */
4054 
4055 	case DLM_MSG_LOOKUP:
4056 		receive_lookup(ls, ms);
4057 		break;
4058 
4059 	case DLM_MSG_REMOVE:
4060 		receive_remove(ls, ms);
4061 		break;
4062 
4063 	/* messages sent from a dir node (remove has no reply) */
4064 
4065 	case DLM_MSG_LOOKUP_REPLY:
4066 		receive_lookup_reply(ls, ms);
4067 		break;
4068 
4069 	/* other messages */
4070 
4071 	case DLM_MSG_PURGE:
4072 		receive_purge(ls, ms);
4073 		break;
4074 
4075 	default:
4076 		log_error(ls, "unknown message type %d", ms->m_type);
4077 	}
4078 }
4079 
4080 /* If the lockspace is in recovery mode (locking stopped), then normal
4081    messages are saved on the requestqueue for processing after recovery is
4082    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4083    messages off the requestqueue before we process new ones. This occurs right
4084    after recovery completes when we transition from saving all messages on
4085    requestqueue, to processing all the saved messages, to processing new
4086    messages as they arrive. */
4087 
4088 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4089 				int nodeid)
4090 {
4091 	if (dlm_locking_stopped(ls)) {
4092 		dlm_add_requestqueue(ls, nodeid, ms);
4093 	} else {
4094 		dlm_wait_requestqueue(ls);
4095 		_receive_message(ls, ms);
4096 	}
4097 }
4098 
4099 /* This is called by dlm_recoverd to process messages that were saved on
4100    the requestqueue. */
4101 
4102 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
4103 {
4104 	_receive_message(ls, ms);
4105 }
4106 
4107 /* This is called by the midcomms layer when something is received for
4108    the lockspace.  It could be either a MSG (normal message sent as part of
4109    standard locking activity) or an RCOM (recovery message sent as part of
4110    lockspace recovery). */
4111 
4112 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4113 {
4114 	struct dlm_header *hd = &p->header;
4115 	struct dlm_ls *ls;
4116 	int type = 0;
4117 
4118 	switch (hd->h_cmd) {
4119 	case DLM_MSG:
4120 		dlm_message_in(&p->message);
4121 		type = p->message.m_type;
4122 		break;
4123 	case DLM_RCOM:
4124 		dlm_rcom_in(&p->rcom);
4125 		type = p->rcom.rc_type;
4126 		break;
4127 	default:
4128 		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4129 		return;
4130 	}
4131 
4132 	if (hd->h_nodeid != nodeid) {
4133 		log_print("invalid h_nodeid %d from %d lockspace %x",
4134 			  hd->h_nodeid, nodeid, hd->h_lockspace);
4135 		return;
4136 	}
4137 
4138 	ls = dlm_find_lockspace_global(hd->h_lockspace);
4139 	if (!ls) {
4140 		if (dlm_config.ci_log_debug)
4141 			log_print("invalid lockspace %x from %d cmd %d type %d",
4142 				  hd->h_lockspace, nodeid, hd->h_cmd, type);
4143 
4144 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4145 			dlm_send_ls_not_ready(nodeid, &p->rcom);
4146 		return;
4147 	}
4148 
4149 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4150 	   be inactive (in this ls) before transitioning to recovery mode */
4151 
4152 	down_read(&ls->ls_recv_active);
4153 	if (hd->h_cmd == DLM_MSG)
4154 		dlm_receive_message(ls, &p->message, nodeid);
4155 	else
4156 		dlm_receive_rcom(ls, &p->rcom, nodeid);
4157 	up_read(&ls->ls_recv_active);
4158 
4159 	dlm_put_lockspace(ls);
4160 }
4161 
4162 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4163 				   struct dlm_message *ms_stub)
4164 {
4165 	if (middle_conversion(lkb)) {
4166 		hold_lkb(lkb);
4167 		memset(ms_stub, 0, sizeof(struct dlm_message));
4168 		ms_stub->m_flags = DLM_IFL_STUB_MS;
4169 		ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4170 		ms_stub->m_result = -EINPROGRESS;
4171 		ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4172 		_receive_convert_reply(lkb, ms_stub);
4173 
4174 		/* Same special case as in receive_rcom_lock_args() */
4175 		lkb->lkb_grmode = DLM_LOCK_IV;
4176 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4177 		unhold_lkb(lkb);
4178 
4179 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4180 		lkb->lkb_flags |= DLM_IFL_RESEND;
4181 	}
4182 
4183 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4184 	   conversions are async; there's no reply from the remote master */
4185 }
4186 
4187 /* A waiting lkb needs recovery if the master node has failed, or
4188    the master node is changing (only when no directory is used) */
4189 
4190 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4191 {
4192 	if (dlm_is_removed(ls, lkb->lkb_nodeid))
4193 		return 1;
4194 
4195 	if (!dlm_no_directory(ls))
4196 		return 0;
4197 
4198 	if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4199 		return 1;
4200 
4201 	return 0;
4202 }
4203 
4204 /* Recovery for locks that are waiting for replies from nodes that are now
4205    gone.  We can just complete unlocks and cancels by faking a reply from the
4206    dead node.  Requests and up-conversions we flag to be resent after
4207    recovery.  Down-conversions can just be completed with a fake reply like
4208    unlocks.  Conversions between PR and CW need special attention. */
4209 
4210 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4211 {
4212 	struct dlm_lkb *lkb, *safe;
4213 	struct dlm_message *ms_stub;
4214 	int wait_type, stub_unlock_result, stub_cancel_result;
4215 
4216 	ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
4217 	if (!ms_stub) {
4218 		log_error(ls, "dlm_recover_waiters_pre no mem");
4219 		return;
4220 	}
4221 
4222 	mutex_lock(&ls->ls_waiters_mutex);
4223 
4224 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4225 
4226 		/* exclude debug messages about unlocks because there can be so
4227 		   many and they aren't very interesting */
4228 
4229 		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4230 			log_debug(ls, "recover_waiter %x nodeid %d "
4231 				  "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
4232 				  lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
4233 		}
4234 
4235 		/* all outstanding lookups, regardless of destination  will be
4236 		   resent after recovery is done */
4237 
4238 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4239 			lkb->lkb_flags |= DLM_IFL_RESEND;
4240 			continue;
4241 		}
4242 
4243 		if (!waiter_needs_recovery(ls, lkb))
4244 			continue;
4245 
4246 		wait_type = lkb->lkb_wait_type;
4247 		stub_unlock_result = -DLM_EUNLOCK;
4248 		stub_cancel_result = -DLM_ECANCEL;
4249 
4250 		/* Main reply may have been received leaving a zero wait_type,
4251 		   but a reply for the overlapping op may not have been
4252 		   received.  In that case we need to fake the appropriate
4253 		   reply for the overlap op. */
4254 
4255 		if (!wait_type) {
4256 			if (is_overlap_cancel(lkb)) {
4257 				wait_type = DLM_MSG_CANCEL;
4258 				if (lkb->lkb_grmode == DLM_LOCK_IV)
4259 					stub_cancel_result = 0;
4260 			}
4261 			if (is_overlap_unlock(lkb)) {
4262 				wait_type = DLM_MSG_UNLOCK;
4263 				if (lkb->lkb_grmode == DLM_LOCK_IV)
4264 					stub_unlock_result = -ENOENT;
4265 			}
4266 
4267 			log_debug(ls, "rwpre overlap %x %x %d %d %d",
4268 				  lkb->lkb_id, lkb->lkb_flags, wait_type,
4269 				  stub_cancel_result, stub_unlock_result);
4270 		}
4271 
4272 		switch (wait_type) {
4273 
4274 		case DLM_MSG_REQUEST:
4275 			lkb->lkb_flags |= DLM_IFL_RESEND;
4276 			break;
4277 
4278 		case DLM_MSG_CONVERT:
4279 			recover_convert_waiter(ls, lkb, ms_stub);
4280 			break;
4281 
4282 		case DLM_MSG_UNLOCK:
4283 			hold_lkb(lkb);
4284 			memset(ms_stub, 0, sizeof(struct dlm_message));
4285 			ms_stub->m_flags = DLM_IFL_STUB_MS;
4286 			ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
4287 			ms_stub->m_result = stub_unlock_result;
4288 			ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4289 			_receive_unlock_reply(lkb, ms_stub);
4290 			dlm_put_lkb(lkb);
4291 			break;
4292 
4293 		case DLM_MSG_CANCEL:
4294 			hold_lkb(lkb);
4295 			memset(ms_stub, 0, sizeof(struct dlm_message));
4296 			ms_stub->m_flags = DLM_IFL_STUB_MS;
4297 			ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
4298 			ms_stub->m_result = stub_cancel_result;
4299 			ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4300 			_receive_cancel_reply(lkb, ms_stub);
4301 			dlm_put_lkb(lkb);
4302 			break;
4303 
4304 		default:
4305 			log_error(ls, "invalid lkb wait_type %d %d",
4306 				  lkb->lkb_wait_type, wait_type);
4307 		}
4308 		schedule();
4309 	}
4310 	mutex_unlock(&ls->ls_waiters_mutex);
4311 	kfree(ms_stub);
4312 }
4313 
4314 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4315 {
4316 	struct dlm_lkb *lkb;
4317 	int found = 0;
4318 
4319 	mutex_lock(&ls->ls_waiters_mutex);
4320 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4321 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
4322 			hold_lkb(lkb);
4323 			found = 1;
4324 			break;
4325 		}
4326 	}
4327 	mutex_unlock(&ls->ls_waiters_mutex);
4328 
4329 	if (!found)
4330 		lkb = NULL;
4331 	return lkb;
4332 }
4333 
4334 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4335    master or dir-node for r.  Processing the lkb may result in it being placed
4336    back on waiters. */
4337 
4338 /* We do this after normal locking has been enabled and any saved messages
4339    (in requestqueue) have been processed.  We should be confident that at
4340    this point we won't get or process a reply to any of these waiting
4341    operations.  But, new ops may be coming in on the rsbs/locks here from
4342    userspace or remotely. */
4343 
4344 /* there may have been an overlap unlock/cancel prior to recovery or after
4345    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4346    overlap flag would just have been set and nothing new sent.  we can be
4347    confident here than any replies to either the initial op or overlap ops
4348    prior to recovery have been received. */
4349 
4350 int dlm_recover_waiters_post(struct dlm_ls *ls)
4351 {
4352 	struct dlm_lkb *lkb;
4353 	struct dlm_rsb *r;
4354 	int error = 0, mstype, err, oc, ou;
4355 
4356 	while (1) {
4357 		if (dlm_locking_stopped(ls)) {
4358 			log_debug(ls, "recover_waiters_post aborted");
4359 			error = -EINTR;
4360 			break;
4361 		}
4362 
4363 		lkb = find_resend_waiter(ls);
4364 		if (!lkb)
4365 			break;
4366 
4367 		r = lkb->lkb_resource;
4368 		hold_rsb(r);
4369 		lock_rsb(r);
4370 
4371 		mstype = lkb->lkb_wait_type;
4372 		oc = is_overlap_cancel(lkb);
4373 		ou = is_overlap_unlock(lkb);
4374 		err = 0;
4375 
4376 		log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
4377 			  lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
4378 
4379 		/* At this point we assume that we won't get a reply to any
4380 		   previous op or overlap op on this lock.  First, do a big
4381 		   remove_from_waiters() for all previous ops. */
4382 
4383 		lkb->lkb_flags &= ~DLM_IFL_RESEND;
4384 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4385 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4386 		lkb->lkb_wait_type = 0;
4387 		lkb->lkb_wait_count = 0;
4388 		mutex_lock(&ls->ls_waiters_mutex);
4389 		list_del_init(&lkb->lkb_wait_reply);
4390 		mutex_unlock(&ls->ls_waiters_mutex);
4391 		unhold_lkb(lkb); /* for waiters list */
4392 
4393 		if (oc || ou) {
4394 			/* do an unlock or cancel instead of resending */
4395 			switch (mstype) {
4396 			case DLM_MSG_LOOKUP:
4397 			case DLM_MSG_REQUEST:
4398 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4399 							-DLM_ECANCEL);
4400 				unhold_lkb(lkb); /* undoes create_lkb() */
4401 				break;
4402 			case DLM_MSG_CONVERT:
4403 				if (oc) {
4404 					queue_cast(r, lkb, -DLM_ECANCEL);
4405 				} else {
4406 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4407 					_unlock_lock(r, lkb);
4408 				}
4409 				break;
4410 			default:
4411 				err = 1;
4412 			}
4413 		} else {
4414 			switch (mstype) {
4415 			case DLM_MSG_LOOKUP:
4416 			case DLM_MSG_REQUEST:
4417 				_request_lock(r, lkb);
4418 				if (is_master(r))
4419 					confirm_master(r, 0);
4420 				break;
4421 			case DLM_MSG_CONVERT:
4422 				_convert_lock(r, lkb);
4423 				break;
4424 			default:
4425 				err = 1;
4426 			}
4427 		}
4428 
4429 		if (err)
4430 			log_error(ls, "recover_waiters_post %x %d %x %d %d",
4431 			  	  lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4432 		unlock_rsb(r);
4433 		put_rsb(r);
4434 		dlm_put_lkb(lkb);
4435 	}
4436 
4437 	return error;
4438 }
4439 
4440 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4441 			int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4442 {
4443 	struct dlm_ls *ls = r->res_ls;
4444 	struct dlm_lkb *lkb, *safe;
4445 
4446 	list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4447 		if (test(ls, lkb)) {
4448 			rsb_set_flag(r, RSB_LOCKS_PURGED);
4449 			del_lkb(r, lkb);
4450 			/* this put should free the lkb */
4451 			if (!dlm_put_lkb(lkb))
4452 				log_error(ls, "purged lkb not released");
4453 		}
4454 	}
4455 }
4456 
4457 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4458 {
4459 	return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4460 }
4461 
4462 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4463 {
4464 	return is_master_copy(lkb);
4465 }
4466 
4467 static void purge_dead_locks(struct dlm_rsb *r)
4468 {
4469 	purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4470 	purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4471 	purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4472 }
4473 
4474 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4475 {
4476 	purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4477 	purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4478 	purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4479 }
4480 
4481 /* Get rid of locks held by nodes that are gone. */
4482 
4483 int dlm_purge_locks(struct dlm_ls *ls)
4484 {
4485 	struct dlm_rsb *r;
4486 
4487 	log_debug(ls, "dlm_purge_locks");
4488 
4489 	down_write(&ls->ls_root_sem);
4490 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4491 		hold_rsb(r);
4492 		lock_rsb(r);
4493 		if (is_master(r))
4494 			purge_dead_locks(r);
4495 		unlock_rsb(r);
4496 		unhold_rsb(r);
4497 
4498 		schedule();
4499 	}
4500 	up_write(&ls->ls_root_sem);
4501 
4502 	return 0;
4503 }
4504 
4505 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4506 {
4507 	struct rb_node *n;
4508 	struct dlm_rsb *r, *r_ret = NULL;
4509 
4510 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
4511 	for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
4512 		r = rb_entry(n, struct dlm_rsb, res_hashnode);
4513 		if (!rsb_flag(r, RSB_LOCKS_PURGED))
4514 			continue;
4515 		hold_rsb(r);
4516 		rsb_clear_flag(r, RSB_LOCKS_PURGED);
4517 		r_ret = r;
4518 		break;
4519 	}
4520 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4521 	return r_ret;
4522 }
4523 
4524 void dlm_grant_after_purge(struct dlm_ls *ls)
4525 {
4526 	struct dlm_rsb *r;
4527 	int bucket = 0;
4528 
4529 	while (1) {
4530 		r = find_purged_rsb(ls, bucket);
4531 		if (!r) {
4532 			if (bucket == ls->ls_rsbtbl_size - 1)
4533 				break;
4534 			bucket++;
4535 			continue;
4536 		}
4537 		lock_rsb(r);
4538 		if (is_master(r)) {
4539 			grant_pending_locks(r);
4540 			confirm_master(r, 0);
4541 		}
4542 		unlock_rsb(r);
4543 		put_rsb(r);
4544 		schedule();
4545 	}
4546 }
4547 
4548 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4549 					 uint32_t remid)
4550 {
4551 	struct dlm_lkb *lkb;
4552 
4553 	list_for_each_entry(lkb, head, lkb_statequeue) {
4554 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4555 			return lkb;
4556 	}
4557 	return NULL;
4558 }
4559 
4560 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4561 				    uint32_t remid)
4562 {
4563 	struct dlm_lkb *lkb;
4564 
4565 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4566 	if (lkb)
4567 		return lkb;
4568 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4569 	if (lkb)
4570 		return lkb;
4571 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4572 	if (lkb)
4573 		return lkb;
4574 	return NULL;
4575 }
4576 
4577 /* needs at least dlm_rcom + rcom_lock */
4578 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4579 				  struct dlm_rsb *r, struct dlm_rcom *rc)
4580 {
4581 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4582 
4583 	lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4584 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4585 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4586 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4587 	lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4588 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
4589 	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4590 	lkb->lkb_rqmode = rl->rl_rqmode;
4591 	lkb->lkb_grmode = rl->rl_grmode;
4592 	/* don't set lkb_status because add_lkb wants to itself */
4593 
4594 	lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
4595 	lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
4596 
4597 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4598 		int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4599 			 sizeof(struct rcom_lock);
4600 		if (lvblen > ls->ls_lvblen)
4601 			return -EINVAL;
4602 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4603 		if (!lkb->lkb_lvbptr)
4604 			return -ENOMEM;
4605 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4606 	}
4607 
4608 	/* Conversions between PR and CW (middle modes) need special handling.
4609 	   The real granted mode of these converting locks cannot be determined
4610 	   until all locks have been rebuilt on the rsb (recover_conversion) */
4611 
4612 	if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4613 	    middle_conversion(lkb)) {
4614 		rl->rl_status = DLM_LKSTS_CONVERT;
4615 		lkb->lkb_grmode = DLM_LOCK_IV;
4616 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
4617 	}
4618 
4619 	return 0;
4620 }
4621 
4622 /* This lkb may have been recovered in a previous aborted recovery so we need
4623    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4624    If so we just send back a standard reply.  If not, we create a new lkb with
4625    the given values and send back our lkid.  We send back our lkid by sending
4626    back the rcom_lock struct we got but with the remid field filled in. */
4627 
4628 /* needs at least dlm_rcom + rcom_lock */
4629 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4630 {
4631 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4632 	struct dlm_rsb *r;
4633 	struct dlm_lkb *lkb;
4634 	int error;
4635 
4636 	if (rl->rl_parent_lkid) {
4637 		error = -EOPNOTSUPP;
4638 		goto out;
4639 	}
4640 
4641 	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4642 			 R_MASTER, &r);
4643 	if (error)
4644 		goto out;
4645 
4646 	lock_rsb(r);
4647 
4648 	lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4649 	if (lkb) {
4650 		error = -EEXIST;
4651 		goto out_remid;
4652 	}
4653 
4654 	error = create_lkb(ls, &lkb);
4655 	if (error)
4656 		goto out_unlock;
4657 
4658 	error = receive_rcom_lock_args(ls, lkb, r, rc);
4659 	if (error) {
4660 		__put_lkb(ls, lkb);
4661 		goto out_unlock;
4662 	}
4663 
4664 	attach_lkb(r, lkb);
4665 	add_lkb(r, lkb, rl->rl_status);
4666 	error = 0;
4667 
4668  out_remid:
4669 	/* this is the new value returned to the lock holder for
4670 	   saving in its process-copy lkb */
4671 	rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4672 
4673  out_unlock:
4674 	unlock_rsb(r);
4675 	put_rsb(r);
4676  out:
4677 	if (error)
4678 		log_debug(ls, "recover_master_copy %d %x", error,
4679 			  le32_to_cpu(rl->rl_lkid));
4680 	rl->rl_result = cpu_to_le32(error);
4681 	return error;
4682 }
4683 
4684 /* needs at least dlm_rcom + rcom_lock */
4685 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4686 {
4687 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4688 	struct dlm_rsb *r;
4689 	struct dlm_lkb *lkb;
4690 	int error;
4691 
4692 	error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4693 	if (error) {
4694 		log_error(ls, "recover_process_copy no lkid %x",
4695 				le32_to_cpu(rl->rl_lkid));
4696 		return error;
4697 	}
4698 
4699 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4700 
4701 	error = le32_to_cpu(rl->rl_result);
4702 
4703 	r = lkb->lkb_resource;
4704 	hold_rsb(r);
4705 	lock_rsb(r);
4706 
4707 	switch (error) {
4708 	case -EBADR:
4709 		/* There's a chance the new master received our lock before
4710 		   dlm_recover_master_reply(), this wouldn't happen if we did
4711 		   a barrier between recover_masters and recover_locks. */
4712 		log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4713 			  (unsigned long)r, r->res_name);
4714 		dlm_send_rcom_lock(r, lkb);
4715 		goto out;
4716 	case -EEXIST:
4717 		log_debug(ls, "master copy exists %x", lkb->lkb_id);
4718 		/* fall through */
4719 	case 0:
4720 		lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4721 		break;
4722 	default:
4723 		log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4724 			  error, lkb->lkb_id);
4725 	}
4726 
4727 	/* an ack for dlm_recover_locks() which waits for replies from
4728 	   all the locks it sends to new masters */
4729 	dlm_recovered_lock(r);
4730  out:
4731 	unlock_rsb(r);
4732 	put_rsb(r);
4733 	dlm_put_lkb(lkb);
4734 
4735 	return 0;
4736 }
4737 
4738 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4739 		     int mode, uint32_t flags, void *name, unsigned int namelen,
4740 		     unsigned long timeout_cs)
4741 {
4742 	struct dlm_lkb *lkb;
4743 	struct dlm_args args;
4744 	int error;
4745 
4746 	dlm_lock_recovery(ls);
4747 
4748 	error = create_lkb(ls, &lkb);
4749 	if (error) {
4750 		kfree(ua);
4751 		goto out;
4752 	}
4753 
4754 	if (flags & DLM_LKF_VALBLK) {
4755 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4756 		if (!ua->lksb.sb_lvbptr) {
4757 			kfree(ua);
4758 			__put_lkb(ls, lkb);
4759 			error = -ENOMEM;
4760 			goto out;
4761 		}
4762 	}
4763 
4764 	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
4765 	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
4766 	   lock and that lkb_astparam is the dlm_user_args structure. */
4767 
4768 	error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4769 			      fake_astfn, ua, fake_bastfn, &args);
4770 	lkb->lkb_flags |= DLM_IFL_USER;
4771 
4772 	if (error) {
4773 		__put_lkb(ls, lkb);
4774 		goto out;
4775 	}
4776 
4777 	error = request_lock(ls, lkb, name, namelen, &args);
4778 
4779 	switch (error) {
4780 	case 0:
4781 		break;
4782 	case -EINPROGRESS:
4783 		error = 0;
4784 		break;
4785 	case -EAGAIN:
4786 		error = 0;
4787 		/* fall through */
4788 	default:
4789 		__put_lkb(ls, lkb);
4790 		goto out;
4791 	}
4792 
4793 	/* add this new lkb to the per-process list of locks */
4794 	spin_lock(&ua->proc->locks_spin);
4795 	hold_lkb(lkb);
4796 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4797 	spin_unlock(&ua->proc->locks_spin);
4798  out:
4799 	dlm_unlock_recovery(ls);
4800 	return error;
4801 }
4802 
4803 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4804 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4805 		     unsigned long timeout_cs)
4806 {
4807 	struct dlm_lkb *lkb;
4808 	struct dlm_args args;
4809 	struct dlm_user_args *ua;
4810 	int error;
4811 
4812 	dlm_lock_recovery(ls);
4813 
4814 	error = find_lkb(ls, lkid, &lkb);
4815 	if (error)
4816 		goto out;
4817 
4818 	/* user can change the params on its lock when it converts it, or
4819 	   add an lvb that didn't exist before */
4820 
4821 	ua = lkb->lkb_ua;
4822 
4823 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4824 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4825 		if (!ua->lksb.sb_lvbptr) {
4826 			error = -ENOMEM;
4827 			goto out_put;
4828 		}
4829 	}
4830 	if (lvb_in && ua->lksb.sb_lvbptr)
4831 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4832 
4833 	ua->xid = ua_tmp->xid;
4834 	ua->castparam = ua_tmp->castparam;
4835 	ua->castaddr = ua_tmp->castaddr;
4836 	ua->bastparam = ua_tmp->bastparam;
4837 	ua->bastaddr = ua_tmp->bastaddr;
4838 	ua->user_lksb = ua_tmp->user_lksb;
4839 
4840 	error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4841 			      fake_astfn, ua, fake_bastfn, &args);
4842 	if (error)
4843 		goto out_put;
4844 
4845 	error = convert_lock(ls, lkb, &args);
4846 
4847 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4848 		error = 0;
4849  out_put:
4850 	dlm_put_lkb(lkb);
4851  out:
4852 	dlm_unlock_recovery(ls);
4853 	kfree(ua_tmp);
4854 	return error;
4855 }
4856 
4857 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4858 		    uint32_t flags, uint32_t lkid, char *lvb_in)
4859 {
4860 	struct dlm_lkb *lkb;
4861 	struct dlm_args args;
4862 	struct dlm_user_args *ua;
4863 	int error;
4864 
4865 	dlm_lock_recovery(ls);
4866 
4867 	error = find_lkb(ls, lkid, &lkb);
4868 	if (error)
4869 		goto out;
4870 
4871 	ua = lkb->lkb_ua;
4872 
4873 	if (lvb_in && ua->lksb.sb_lvbptr)
4874 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4875 	if (ua_tmp->castparam)
4876 		ua->castparam = ua_tmp->castparam;
4877 	ua->user_lksb = ua_tmp->user_lksb;
4878 
4879 	error = set_unlock_args(flags, ua, &args);
4880 	if (error)
4881 		goto out_put;
4882 
4883 	error = unlock_lock(ls, lkb, &args);
4884 
4885 	if (error == -DLM_EUNLOCK)
4886 		error = 0;
4887 	/* from validate_unlock_args() */
4888 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4889 		error = 0;
4890 	if (error)
4891 		goto out_put;
4892 
4893 	spin_lock(&ua->proc->locks_spin);
4894 	/* dlm_user_add_cb() may have already taken lkb off the proc list */
4895 	if (!list_empty(&lkb->lkb_ownqueue))
4896 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4897 	spin_unlock(&ua->proc->locks_spin);
4898  out_put:
4899 	dlm_put_lkb(lkb);
4900  out:
4901 	dlm_unlock_recovery(ls);
4902 	kfree(ua_tmp);
4903 	return error;
4904 }
4905 
4906 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4907 		    uint32_t flags, uint32_t lkid)
4908 {
4909 	struct dlm_lkb *lkb;
4910 	struct dlm_args args;
4911 	struct dlm_user_args *ua;
4912 	int error;
4913 
4914 	dlm_lock_recovery(ls);
4915 
4916 	error = find_lkb(ls, lkid, &lkb);
4917 	if (error)
4918 		goto out;
4919 
4920 	ua = lkb->lkb_ua;
4921 	if (ua_tmp->castparam)
4922 		ua->castparam = ua_tmp->castparam;
4923 	ua->user_lksb = ua_tmp->user_lksb;
4924 
4925 	error = set_unlock_args(flags, ua, &args);
4926 	if (error)
4927 		goto out_put;
4928 
4929 	error = cancel_lock(ls, lkb, &args);
4930 
4931 	if (error == -DLM_ECANCEL)
4932 		error = 0;
4933 	/* from validate_unlock_args() */
4934 	if (error == -EBUSY)
4935 		error = 0;
4936  out_put:
4937 	dlm_put_lkb(lkb);
4938  out:
4939 	dlm_unlock_recovery(ls);
4940 	kfree(ua_tmp);
4941 	return error;
4942 }
4943 
4944 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4945 {
4946 	struct dlm_lkb *lkb;
4947 	struct dlm_args args;
4948 	struct dlm_user_args *ua;
4949 	struct dlm_rsb *r;
4950 	int error;
4951 
4952 	dlm_lock_recovery(ls);
4953 
4954 	error = find_lkb(ls, lkid, &lkb);
4955 	if (error)
4956 		goto out;
4957 
4958 	ua = lkb->lkb_ua;
4959 
4960 	error = set_unlock_args(flags, ua, &args);
4961 	if (error)
4962 		goto out_put;
4963 
4964 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4965 
4966 	r = lkb->lkb_resource;
4967 	hold_rsb(r);
4968 	lock_rsb(r);
4969 
4970 	error = validate_unlock_args(lkb, &args);
4971 	if (error)
4972 		goto out_r;
4973 	lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4974 
4975 	error = _cancel_lock(r, lkb);
4976  out_r:
4977 	unlock_rsb(r);
4978 	put_rsb(r);
4979 
4980 	if (error == -DLM_ECANCEL)
4981 		error = 0;
4982 	/* from validate_unlock_args() */
4983 	if (error == -EBUSY)
4984 		error = 0;
4985  out_put:
4986 	dlm_put_lkb(lkb);
4987  out:
4988 	dlm_unlock_recovery(ls);
4989 	return error;
4990 }
4991 
4992 /* lkb's that are removed from the waiters list by revert are just left on the
4993    orphans list with the granted orphan locks, to be freed by purge */
4994 
4995 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4996 {
4997 	struct dlm_args args;
4998 	int error;
4999 
5000 	hold_lkb(lkb);
5001 	mutex_lock(&ls->ls_orphans_mutex);
5002 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
5003 	mutex_unlock(&ls->ls_orphans_mutex);
5004 
5005 	set_unlock_args(0, lkb->lkb_ua, &args);
5006 
5007 	error = cancel_lock(ls, lkb, &args);
5008 	if (error == -DLM_ECANCEL)
5009 		error = 0;
5010 	return error;
5011 }
5012 
5013 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
5014    Regardless of what rsb queue the lock is on, it's removed and freed. */
5015 
5016 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5017 {
5018 	struct dlm_args args;
5019 	int error;
5020 
5021 	set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
5022 
5023 	error = unlock_lock(ls, lkb, &args);
5024 	if (error == -DLM_EUNLOCK)
5025 		error = 0;
5026 	return error;
5027 }
5028 
5029 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
5030    (which does lock_rsb) due to deadlock with receiving a message that does
5031    lock_rsb followed by dlm_user_add_cb() */
5032 
5033 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
5034 				     struct dlm_user_proc *proc)
5035 {
5036 	struct dlm_lkb *lkb = NULL;
5037 
5038 	mutex_lock(&ls->ls_clear_proc_locks);
5039 	if (list_empty(&proc->locks))
5040 		goto out;
5041 
5042 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
5043 	list_del_init(&lkb->lkb_ownqueue);
5044 
5045 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5046 		lkb->lkb_flags |= DLM_IFL_ORPHAN;
5047 	else
5048 		lkb->lkb_flags |= DLM_IFL_DEAD;
5049  out:
5050 	mutex_unlock(&ls->ls_clear_proc_locks);
5051 	return lkb;
5052 }
5053 
5054 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
5055    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
5056    which we clear here. */
5057 
5058 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
5059    list, and no more device_writes should add lkb's to proc->locks list; so we
5060    shouldn't need to take asts_spin or locks_spin here.  this assumes that
5061    device reads/writes/closes are serialized -- FIXME: we may need to serialize
5062    them ourself. */
5063 
5064 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5065 {
5066 	struct dlm_lkb *lkb, *safe;
5067 
5068 	dlm_lock_recovery(ls);
5069 
5070 	while (1) {
5071 		lkb = del_proc_lock(ls, proc);
5072 		if (!lkb)
5073 			break;
5074 		del_timeout(lkb);
5075 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5076 			orphan_proc_lock(ls, lkb);
5077 		else
5078 			unlock_proc_lock(ls, lkb);
5079 
5080 		/* this removes the reference for the proc->locks list
5081 		   added by dlm_user_request, it may result in the lkb
5082 		   being freed */
5083 
5084 		dlm_put_lkb(lkb);
5085 	}
5086 
5087 	mutex_lock(&ls->ls_clear_proc_locks);
5088 
5089 	/* in-progress unlocks */
5090 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5091 		list_del_init(&lkb->lkb_ownqueue);
5092 		lkb->lkb_flags |= DLM_IFL_DEAD;
5093 		dlm_put_lkb(lkb);
5094 	}
5095 
5096 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5097 		memset(&lkb->lkb_callbacks, 0,
5098 		       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5099 		list_del_init(&lkb->lkb_cb_list);
5100 		dlm_put_lkb(lkb);
5101 	}
5102 
5103 	mutex_unlock(&ls->ls_clear_proc_locks);
5104 	dlm_unlock_recovery(ls);
5105 }
5106 
5107 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5108 {
5109 	struct dlm_lkb *lkb, *safe;
5110 
5111 	while (1) {
5112 		lkb = NULL;
5113 		spin_lock(&proc->locks_spin);
5114 		if (!list_empty(&proc->locks)) {
5115 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
5116 					 lkb_ownqueue);
5117 			list_del_init(&lkb->lkb_ownqueue);
5118 		}
5119 		spin_unlock(&proc->locks_spin);
5120 
5121 		if (!lkb)
5122 			break;
5123 
5124 		lkb->lkb_flags |= DLM_IFL_DEAD;
5125 		unlock_proc_lock(ls, lkb);
5126 		dlm_put_lkb(lkb); /* ref from proc->locks list */
5127 	}
5128 
5129 	spin_lock(&proc->locks_spin);
5130 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5131 		list_del_init(&lkb->lkb_ownqueue);
5132 		lkb->lkb_flags |= DLM_IFL_DEAD;
5133 		dlm_put_lkb(lkb);
5134 	}
5135 	spin_unlock(&proc->locks_spin);
5136 
5137 	spin_lock(&proc->asts_spin);
5138 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5139 		memset(&lkb->lkb_callbacks, 0,
5140 		       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5141 		list_del_init(&lkb->lkb_cb_list);
5142 		dlm_put_lkb(lkb);
5143 	}
5144 	spin_unlock(&proc->asts_spin);
5145 }
5146 
5147 /* pid of 0 means purge all orphans */
5148 
5149 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
5150 {
5151 	struct dlm_lkb *lkb, *safe;
5152 
5153 	mutex_lock(&ls->ls_orphans_mutex);
5154 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
5155 		if (pid && lkb->lkb_ownpid != pid)
5156 			continue;
5157 		unlock_proc_lock(ls, lkb);
5158 		list_del_init(&lkb->lkb_ownqueue);
5159 		dlm_put_lkb(lkb);
5160 	}
5161 	mutex_unlock(&ls->ls_orphans_mutex);
5162 }
5163 
5164 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
5165 {
5166 	struct dlm_message *ms;
5167 	struct dlm_mhandle *mh;
5168 	int error;
5169 
5170 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
5171 				DLM_MSG_PURGE, &ms, &mh);
5172 	if (error)
5173 		return error;
5174 	ms->m_nodeid = nodeid;
5175 	ms->m_pid = pid;
5176 
5177 	return send_message(mh, ms);
5178 }
5179 
5180 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5181 		   int nodeid, int pid)
5182 {
5183 	int error = 0;
5184 
5185 	if (nodeid != dlm_our_nodeid()) {
5186 		error = send_purge(ls, nodeid, pid);
5187 	} else {
5188 		dlm_lock_recovery(ls);
5189 		if (pid == current->pid)
5190 			purge_proc_locks(ls, proc);
5191 		else
5192 			do_purge(ls, nodeid, pid);
5193 		dlm_unlock_recovery(ls);
5194 	}
5195 	return error;
5196 }
5197 
5198