xref: /openbmc/linux/fs/dlm/lock.c (revision 64405360)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 /* Central locking logic has four stages:
14 
15    dlm_lock()
16    dlm_unlock()
17 
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22 
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27 
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32 
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35 
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40 
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43 
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46 
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49 
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53 
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "lowcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77 
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 				    struct dlm_message *ms);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void del_timeout(struct dlm_lkb *lkb);
93 
94 /*
95  * Lock compatibilty matrix - thanks Steve
96  * UN = Unlocked state. Not really a state, used as a flag
97  * PD = Padding. Used to make the matrix a nice power of two in size
98  * Other states are the same as the VMS DLM.
99  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100  */
101 
102 static const int __dlm_compat_matrix[8][8] = {
103       /* UN NL CR CW PR PW EX PD */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112 };
113 
114 /*
115  * This defines the direction of transfer of LVB data.
116  * Granted mode is the row; requested mode is the column.
117  * Usage: matrix[grmode+1][rqmode+1]
118  * 1 = LVB is returned to the caller
119  * 0 = LVB is written to the resource
120  * -1 = nothing happens to the LVB
121  */
122 
123 const int dlm_lvb_operations[8][8] = {
124         /* UN   NL  CR  CW  PR  PW  EX  PD*/
125         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133 };
134 
135 #define modes_compat(gr, rq) \
136 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137 
138 int dlm_modes_compat(int mode1, int mode2)
139 {
140 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142 
143 /*
144  * Compatibility matrix for conversions with QUECVT set.
145  * Granted mode is the row; requested mode is the column.
146  * Usage: matrix[grmode+1][rqmode+1]
147  */
148 
149 static const int __quecvt_compat_matrix[8][8] = {
150       /* UN NL CR CW PR PW EX PD */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159 };
160 
161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
164 	       "     status %d rqmode %d grmode %d wait_type %d\n",
165 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
167 	       lkb->lkb_grmode, lkb->lkb_wait_type);
168 }
169 
170 static void dlm_print_rsb(struct dlm_rsb *r)
171 {
172 	printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
173 	       r->res_nodeid, r->res_flags, r->res_first_lkid,
174 	       r->res_recover_locks_count, r->res_name);
175 }
176 
177 void dlm_dump_rsb(struct dlm_rsb *r)
178 {
179 	struct dlm_lkb *lkb;
180 
181 	dlm_print_rsb(r);
182 
183 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
184 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
185 	printk(KERN_ERR "rsb lookup list\n");
186 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
187 		dlm_print_lkb(lkb);
188 	printk(KERN_ERR "rsb grant queue:\n");
189 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
190 		dlm_print_lkb(lkb);
191 	printk(KERN_ERR "rsb convert queue:\n");
192 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
193 		dlm_print_lkb(lkb);
194 	printk(KERN_ERR "rsb wait queue:\n");
195 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
196 		dlm_print_lkb(lkb);
197 }
198 
199 /* Threads cannot use the lockspace while it's being recovered */
200 
201 static inline void dlm_lock_recovery(struct dlm_ls *ls)
202 {
203 	down_read(&ls->ls_in_recovery);
204 }
205 
206 void dlm_unlock_recovery(struct dlm_ls *ls)
207 {
208 	up_read(&ls->ls_in_recovery);
209 }
210 
211 int dlm_lock_recovery_try(struct dlm_ls *ls)
212 {
213 	return down_read_trylock(&ls->ls_in_recovery);
214 }
215 
216 static inline int can_be_queued(struct dlm_lkb *lkb)
217 {
218 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
219 }
220 
221 static inline int force_blocking_asts(struct dlm_lkb *lkb)
222 {
223 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
224 }
225 
226 static inline int is_demoted(struct dlm_lkb *lkb)
227 {
228 	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
229 }
230 
231 static inline int is_altmode(struct dlm_lkb *lkb)
232 {
233 	return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
234 }
235 
236 static inline int is_granted(struct dlm_lkb *lkb)
237 {
238 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
239 }
240 
241 static inline int is_remote(struct dlm_rsb *r)
242 {
243 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
244 	return !!r->res_nodeid;
245 }
246 
247 static inline int is_process_copy(struct dlm_lkb *lkb)
248 {
249 	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
250 }
251 
252 static inline int is_master_copy(struct dlm_lkb *lkb)
253 {
254 	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
255 		DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
256 	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
257 }
258 
259 static inline int middle_conversion(struct dlm_lkb *lkb)
260 {
261 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
262 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
263 		return 1;
264 	return 0;
265 }
266 
267 static inline int down_conversion(struct dlm_lkb *lkb)
268 {
269 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
270 }
271 
272 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
273 {
274 	return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
275 }
276 
277 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
278 {
279 	return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
280 }
281 
282 static inline int is_overlap(struct dlm_lkb *lkb)
283 {
284 	return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
285 				  DLM_IFL_OVERLAP_CANCEL));
286 }
287 
288 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
289 {
290 	if (is_master_copy(lkb))
291 		return;
292 
293 	del_timeout(lkb);
294 
295 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296 
297 	/* if the operation was a cancel, then return -DLM_ECANCEL, if a
298 	   timeout caused the cancel then return -ETIMEDOUT */
299 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
300 		lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
301 		rv = -ETIMEDOUT;
302 	}
303 
304 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
305 		lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
306 		rv = -EDEADLK;
307 	}
308 
309 	dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
310 }
311 
312 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
313 {
314 	queue_cast(r, lkb,
315 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
316 }
317 
318 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
319 {
320 	if (is_master_copy(lkb)) {
321 		send_bast(r, lkb, rqmode);
322 	} else {
323 		dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
324 	}
325 }
326 
327 /*
328  * Basic operations on rsb's and lkb's
329  */
330 
331 static int pre_rsb_struct(struct dlm_ls *ls)
332 {
333 	struct dlm_rsb *r1, *r2;
334 	int count = 0;
335 
336 	spin_lock(&ls->ls_new_rsb_spin);
337 	if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
338 		spin_unlock(&ls->ls_new_rsb_spin);
339 		return 0;
340 	}
341 	spin_unlock(&ls->ls_new_rsb_spin);
342 
343 	r1 = dlm_allocate_rsb(ls);
344 	r2 = dlm_allocate_rsb(ls);
345 
346 	spin_lock(&ls->ls_new_rsb_spin);
347 	if (r1) {
348 		list_add(&r1->res_hashchain, &ls->ls_new_rsb);
349 		ls->ls_new_rsb_count++;
350 	}
351 	if (r2) {
352 		list_add(&r2->res_hashchain, &ls->ls_new_rsb);
353 		ls->ls_new_rsb_count++;
354 	}
355 	count = ls->ls_new_rsb_count;
356 	spin_unlock(&ls->ls_new_rsb_spin);
357 
358 	if (!count)
359 		return -ENOMEM;
360 	return 0;
361 }
362 
363 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
364    unlock any spinlocks, go back and call pre_rsb_struct again.
365    Otherwise, take an rsb off the list and return it. */
366 
367 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
368 			  struct dlm_rsb **r_ret)
369 {
370 	struct dlm_rsb *r;
371 	int count;
372 
373 	spin_lock(&ls->ls_new_rsb_spin);
374 	if (list_empty(&ls->ls_new_rsb)) {
375 		count = ls->ls_new_rsb_count;
376 		spin_unlock(&ls->ls_new_rsb_spin);
377 		log_debug(ls, "find_rsb retry %d %d %s",
378 			  count, dlm_config.ci_new_rsb_count, name);
379 		return -EAGAIN;
380 	}
381 
382 	r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
383 	list_del(&r->res_hashchain);
384 	/* Convert the empty list_head to a NULL rb_node for tree usage: */
385 	memset(&r->res_hashnode, 0, sizeof(struct rb_node));
386 	ls->ls_new_rsb_count--;
387 	spin_unlock(&ls->ls_new_rsb_spin);
388 
389 	r->res_ls = ls;
390 	r->res_length = len;
391 	memcpy(r->res_name, name, len);
392 	mutex_init(&r->res_mutex);
393 
394 	INIT_LIST_HEAD(&r->res_lookup);
395 	INIT_LIST_HEAD(&r->res_grantqueue);
396 	INIT_LIST_HEAD(&r->res_convertqueue);
397 	INIT_LIST_HEAD(&r->res_waitqueue);
398 	INIT_LIST_HEAD(&r->res_root_list);
399 	INIT_LIST_HEAD(&r->res_recover_list);
400 
401 	*r_ret = r;
402 	return 0;
403 }
404 
405 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
406 {
407 	char maxname[DLM_RESNAME_MAXLEN];
408 
409 	memset(maxname, 0, DLM_RESNAME_MAXLEN);
410 	memcpy(maxname, name, nlen);
411 	return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
412 }
413 
414 static int search_rsb_tree(struct rb_root *tree, char *name, int len,
415 			   unsigned int flags, struct dlm_rsb **r_ret)
416 {
417 	struct rb_node *node = tree->rb_node;
418 	struct dlm_rsb *r;
419 	int error = 0;
420 	int rc;
421 
422 	while (node) {
423 		r = rb_entry(node, struct dlm_rsb, res_hashnode);
424 		rc = rsb_cmp(r, name, len);
425 		if (rc < 0)
426 			node = node->rb_left;
427 		else if (rc > 0)
428 			node = node->rb_right;
429 		else
430 			goto found;
431 	}
432 	*r_ret = NULL;
433 	return -EBADR;
434 
435  found:
436 	if (r->res_nodeid && (flags & R_MASTER))
437 		error = -ENOTBLK;
438 	*r_ret = r;
439 	return error;
440 }
441 
442 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
443 {
444 	struct rb_node **newn = &tree->rb_node;
445 	struct rb_node *parent = NULL;
446 	int rc;
447 
448 	while (*newn) {
449 		struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
450 					       res_hashnode);
451 
452 		parent = *newn;
453 		rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
454 		if (rc < 0)
455 			newn = &parent->rb_left;
456 		else if (rc > 0)
457 			newn = &parent->rb_right;
458 		else {
459 			log_print("rsb_insert match");
460 			dlm_dump_rsb(rsb);
461 			dlm_dump_rsb(cur);
462 			return -EEXIST;
463 		}
464 	}
465 
466 	rb_link_node(&rsb->res_hashnode, parent, newn);
467 	rb_insert_color(&rsb->res_hashnode, tree);
468 	return 0;
469 }
470 
471 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
472 		       unsigned int flags, struct dlm_rsb **r_ret)
473 {
474 	struct dlm_rsb *r;
475 	int error;
476 
477 	error = search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
478 	if (!error) {
479 		kref_get(&r->res_ref);
480 		goto out;
481 	}
482 	error = search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
483 	if (error)
484 		goto out;
485 
486 	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
487 	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
488 	if (error)
489 		return error;
490 
491 	if (dlm_no_directory(ls))
492 		goto out;
493 
494 	if (r->res_nodeid == -1) {
495 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
496 		r->res_first_lkid = 0;
497 	} else if (r->res_nodeid > 0) {
498 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
499 		r->res_first_lkid = 0;
500 	} else {
501 		DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
502 		DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
503 	}
504  out:
505 	*r_ret = r;
506 	return error;
507 }
508 
509 /*
510  * Find rsb in rsbtbl and potentially create/add one
511  *
512  * Delaying the release of rsb's has a similar benefit to applications keeping
513  * NL locks on an rsb, but without the guarantee that the cached master value
514  * will still be valid when the rsb is reused.  Apps aren't always smart enough
515  * to keep NL locks on an rsb that they may lock again shortly; this can lead
516  * to excessive master lookups and removals if we don't delay the release.
517  *
518  * Searching for an rsb means looking through both the normal list and toss
519  * list.  When found on the toss list the rsb is moved to the normal list with
520  * ref count of 1; when found on normal list the ref count is incremented.
521  */
522 
523 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
524 		    unsigned int flags, struct dlm_rsb **r_ret)
525 {
526 	struct dlm_rsb *r = NULL;
527 	uint32_t hash, bucket;
528 	int error;
529 
530 	if (namelen > DLM_RESNAME_MAXLEN) {
531 		error = -EINVAL;
532 		goto out;
533 	}
534 
535 	if (dlm_no_directory(ls))
536 		flags |= R_CREATE;
537 
538 	hash = jhash(name, namelen, 0);
539 	bucket = hash & (ls->ls_rsbtbl_size - 1);
540 
541  retry:
542 	if (flags & R_CREATE) {
543 		error = pre_rsb_struct(ls);
544 		if (error < 0)
545 			goto out;
546 	}
547 
548 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
549 
550 	error = _search_rsb(ls, name, namelen, bucket, flags, &r);
551 	if (!error)
552 		goto out_unlock;
553 
554 	if (error == -EBADR && !(flags & R_CREATE))
555 		goto out_unlock;
556 
557 	/* the rsb was found but wasn't a master copy */
558 	if (error == -ENOTBLK)
559 		goto out_unlock;
560 
561 	error = get_rsb_struct(ls, name, namelen, &r);
562 	if (error == -EAGAIN) {
563 		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
564 		goto retry;
565 	}
566 	if (error)
567 		goto out_unlock;
568 
569 	r->res_hash = hash;
570 	r->res_bucket = bucket;
571 	r->res_nodeid = -1;
572 	kref_init(&r->res_ref);
573 
574 	/* With no directory, the master can be set immediately */
575 	if (dlm_no_directory(ls)) {
576 		int nodeid = dlm_dir_nodeid(r);
577 		if (nodeid == dlm_our_nodeid())
578 			nodeid = 0;
579 		r->res_nodeid = nodeid;
580 	}
581 	error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
582  out_unlock:
583 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
584  out:
585 	*r_ret = r;
586 	return error;
587 }
588 
589 /* This is only called to add a reference when the code already holds
590    a valid reference to the rsb, so there's no need for locking. */
591 
592 static inline void hold_rsb(struct dlm_rsb *r)
593 {
594 	kref_get(&r->res_ref);
595 }
596 
597 void dlm_hold_rsb(struct dlm_rsb *r)
598 {
599 	hold_rsb(r);
600 }
601 
602 static void toss_rsb(struct kref *kref)
603 {
604 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
605 	struct dlm_ls *ls = r->res_ls;
606 
607 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
608 	kref_init(&r->res_ref);
609 	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
610 	rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
611 	r->res_toss_time = jiffies;
612 	if (r->res_lvbptr) {
613 		dlm_free_lvb(r->res_lvbptr);
614 		r->res_lvbptr = NULL;
615 	}
616 }
617 
618 /* When all references to the rsb are gone it's transferred to
619    the tossed list for later disposal. */
620 
621 static void put_rsb(struct dlm_rsb *r)
622 {
623 	struct dlm_ls *ls = r->res_ls;
624 	uint32_t bucket = r->res_bucket;
625 
626 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
627 	kref_put(&r->res_ref, toss_rsb);
628 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
629 }
630 
631 void dlm_put_rsb(struct dlm_rsb *r)
632 {
633 	put_rsb(r);
634 }
635 
636 /* See comment for unhold_lkb */
637 
638 static void unhold_rsb(struct dlm_rsb *r)
639 {
640 	int rv;
641 	rv = kref_put(&r->res_ref, toss_rsb);
642 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
643 }
644 
645 static void kill_rsb(struct kref *kref)
646 {
647 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
648 
649 	/* All work is done after the return from kref_put() so we
650 	   can release the write_lock before the remove and free. */
651 
652 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
653 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
654 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
655 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
656 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
657 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
658 }
659 
660 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
661    The rsb must exist as long as any lkb's for it do. */
662 
663 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
664 {
665 	hold_rsb(r);
666 	lkb->lkb_resource = r;
667 }
668 
669 static void detach_lkb(struct dlm_lkb *lkb)
670 {
671 	if (lkb->lkb_resource) {
672 		put_rsb(lkb->lkb_resource);
673 		lkb->lkb_resource = NULL;
674 	}
675 }
676 
677 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
678 {
679 	struct dlm_lkb *lkb;
680 	int rv, id;
681 
682 	lkb = dlm_allocate_lkb(ls);
683 	if (!lkb)
684 		return -ENOMEM;
685 
686 	lkb->lkb_nodeid = -1;
687 	lkb->lkb_grmode = DLM_LOCK_IV;
688 	kref_init(&lkb->lkb_ref);
689 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
690 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
691 	INIT_LIST_HEAD(&lkb->lkb_time_list);
692 	INIT_LIST_HEAD(&lkb->lkb_cb_list);
693 	mutex_init(&lkb->lkb_cb_mutex);
694 	INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
695 
696  retry:
697 	rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
698 	if (!rv)
699 		return -ENOMEM;
700 
701 	spin_lock(&ls->ls_lkbidr_spin);
702 	rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
703 	if (!rv)
704 		lkb->lkb_id = id;
705 	spin_unlock(&ls->ls_lkbidr_spin);
706 
707 	if (rv == -EAGAIN)
708 		goto retry;
709 
710 	if (rv < 0) {
711 		log_error(ls, "create_lkb idr error %d", rv);
712 		return rv;
713 	}
714 
715 	*lkb_ret = lkb;
716 	return 0;
717 }
718 
719 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
720 {
721 	struct dlm_lkb *lkb;
722 
723 	spin_lock(&ls->ls_lkbidr_spin);
724 	lkb = idr_find(&ls->ls_lkbidr, lkid);
725 	if (lkb)
726 		kref_get(&lkb->lkb_ref);
727 	spin_unlock(&ls->ls_lkbidr_spin);
728 
729 	*lkb_ret = lkb;
730 	return lkb ? 0 : -ENOENT;
731 }
732 
733 static void kill_lkb(struct kref *kref)
734 {
735 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
736 
737 	/* All work is done after the return from kref_put() so we
738 	   can release the write_lock before the detach_lkb */
739 
740 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
741 }
742 
743 /* __put_lkb() is used when an lkb may not have an rsb attached to
744    it so we need to provide the lockspace explicitly */
745 
746 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
747 {
748 	uint32_t lkid = lkb->lkb_id;
749 
750 	spin_lock(&ls->ls_lkbidr_spin);
751 	if (kref_put(&lkb->lkb_ref, kill_lkb)) {
752 		idr_remove(&ls->ls_lkbidr, lkid);
753 		spin_unlock(&ls->ls_lkbidr_spin);
754 
755 		detach_lkb(lkb);
756 
757 		/* for local/process lkbs, lvbptr points to caller's lksb */
758 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
759 			dlm_free_lvb(lkb->lkb_lvbptr);
760 		dlm_free_lkb(lkb);
761 		return 1;
762 	} else {
763 		spin_unlock(&ls->ls_lkbidr_spin);
764 		return 0;
765 	}
766 }
767 
768 int dlm_put_lkb(struct dlm_lkb *lkb)
769 {
770 	struct dlm_ls *ls;
771 
772 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
773 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
774 
775 	ls = lkb->lkb_resource->res_ls;
776 	return __put_lkb(ls, lkb);
777 }
778 
779 /* This is only called to add a reference when the code already holds
780    a valid reference to the lkb, so there's no need for locking. */
781 
782 static inline void hold_lkb(struct dlm_lkb *lkb)
783 {
784 	kref_get(&lkb->lkb_ref);
785 }
786 
787 /* This is called when we need to remove a reference and are certain
788    it's not the last ref.  e.g. del_lkb is always called between a
789    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
790    put_lkb would work fine, but would involve unnecessary locking */
791 
792 static inline void unhold_lkb(struct dlm_lkb *lkb)
793 {
794 	int rv;
795 	rv = kref_put(&lkb->lkb_ref, kill_lkb);
796 	DLM_ASSERT(!rv, dlm_print_lkb(lkb););
797 }
798 
799 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
800 			    int mode)
801 {
802 	struct dlm_lkb *lkb = NULL;
803 
804 	list_for_each_entry(lkb, head, lkb_statequeue)
805 		if (lkb->lkb_rqmode < mode)
806 			break;
807 
808 	__list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
809 }
810 
811 /* add/remove lkb to rsb's grant/convert/wait queue */
812 
813 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
814 {
815 	kref_get(&lkb->lkb_ref);
816 
817 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
818 
819 	lkb->lkb_timestamp = ktime_get();
820 
821 	lkb->lkb_status = status;
822 
823 	switch (status) {
824 	case DLM_LKSTS_WAITING:
825 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
826 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
827 		else
828 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
829 		break;
830 	case DLM_LKSTS_GRANTED:
831 		/* convention says granted locks kept in order of grmode */
832 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
833 				lkb->lkb_grmode);
834 		break;
835 	case DLM_LKSTS_CONVERT:
836 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
837 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
838 		else
839 			list_add_tail(&lkb->lkb_statequeue,
840 				      &r->res_convertqueue);
841 		break;
842 	default:
843 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
844 	}
845 }
846 
847 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
848 {
849 	lkb->lkb_status = 0;
850 	list_del(&lkb->lkb_statequeue);
851 	unhold_lkb(lkb);
852 }
853 
854 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
855 {
856 	hold_lkb(lkb);
857 	del_lkb(r, lkb);
858 	add_lkb(r, lkb, sts);
859 	unhold_lkb(lkb);
860 }
861 
862 static int msg_reply_type(int mstype)
863 {
864 	switch (mstype) {
865 	case DLM_MSG_REQUEST:
866 		return DLM_MSG_REQUEST_REPLY;
867 	case DLM_MSG_CONVERT:
868 		return DLM_MSG_CONVERT_REPLY;
869 	case DLM_MSG_UNLOCK:
870 		return DLM_MSG_UNLOCK_REPLY;
871 	case DLM_MSG_CANCEL:
872 		return DLM_MSG_CANCEL_REPLY;
873 	case DLM_MSG_LOOKUP:
874 		return DLM_MSG_LOOKUP_REPLY;
875 	}
876 	return -1;
877 }
878 
879 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
880 {
881 	int i;
882 
883 	for (i = 0; i < num_nodes; i++) {
884 		if (!warned[i]) {
885 			warned[i] = nodeid;
886 			return 0;
887 		}
888 		if (warned[i] == nodeid)
889 			return 1;
890 	}
891 	return 0;
892 }
893 
894 void dlm_scan_waiters(struct dlm_ls *ls)
895 {
896 	struct dlm_lkb *lkb;
897 	ktime_t zero = ktime_set(0, 0);
898 	s64 us;
899 	s64 debug_maxus = 0;
900 	u32 debug_scanned = 0;
901 	u32 debug_expired = 0;
902 	int num_nodes = 0;
903 	int *warned = NULL;
904 
905 	if (!dlm_config.ci_waitwarn_us)
906 		return;
907 
908 	mutex_lock(&ls->ls_waiters_mutex);
909 
910 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
911 		if (ktime_equal(lkb->lkb_wait_time, zero))
912 			continue;
913 
914 		debug_scanned++;
915 
916 		us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
917 
918 		if (us < dlm_config.ci_waitwarn_us)
919 			continue;
920 
921 		lkb->lkb_wait_time = zero;
922 
923 		debug_expired++;
924 		if (us > debug_maxus)
925 			debug_maxus = us;
926 
927 		if (!num_nodes) {
928 			num_nodes = ls->ls_num_nodes;
929 			warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
930 		}
931 		if (!warned)
932 			continue;
933 		if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
934 			continue;
935 
936 		log_error(ls, "waitwarn %x %lld %d us check connection to "
937 			  "node %d", lkb->lkb_id, (long long)us,
938 			  dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
939 	}
940 	mutex_unlock(&ls->ls_waiters_mutex);
941 	kfree(warned);
942 
943 	if (debug_expired)
944 		log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
945 			  debug_scanned, debug_expired,
946 			  dlm_config.ci_waitwarn_us, (long long)debug_maxus);
947 }
948 
949 /* add/remove lkb from global waiters list of lkb's waiting for
950    a reply from a remote node */
951 
952 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
953 {
954 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
955 	int error = 0;
956 
957 	mutex_lock(&ls->ls_waiters_mutex);
958 
959 	if (is_overlap_unlock(lkb) ||
960 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
961 		error = -EINVAL;
962 		goto out;
963 	}
964 
965 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
966 		switch (mstype) {
967 		case DLM_MSG_UNLOCK:
968 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
969 			break;
970 		case DLM_MSG_CANCEL:
971 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
972 			break;
973 		default:
974 			error = -EBUSY;
975 			goto out;
976 		}
977 		lkb->lkb_wait_count++;
978 		hold_lkb(lkb);
979 
980 		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
981 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
982 			  lkb->lkb_wait_count, lkb->lkb_flags);
983 		goto out;
984 	}
985 
986 	DLM_ASSERT(!lkb->lkb_wait_count,
987 		   dlm_print_lkb(lkb);
988 		   printk("wait_count %d\n", lkb->lkb_wait_count););
989 
990 	lkb->lkb_wait_count++;
991 	lkb->lkb_wait_type = mstype;
992 	lkb->lkb_wait_time = ktime_get();
993 	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
994 	hold_lkb(lkb);
995 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
996  out:
997 	if (error)
998 		log_error(ls, "addwait error %x %d flags %x %d %d %s",
999 			  lkb->lkb_id, error, lkb->lkb_flags, mstype,
1000 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1001 	mutex_unlock(&ls->ls_waiters_mutex);
1002 	return error;
1003 }
1004 
1005 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1006    list as part of process_requestqueue (e.g. a lookup that has an optimized
1007    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1008    set RESEND and dlm_recover_waiters_post() */
1009 
1010 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1011 				struct dlm_message *ms)
1012 {
1013 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1014 	int overlap_done = 0;
1015 
1016 	if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1017 		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1018 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1019 		overlap_done = 1;
1020 		goto out_del;
1021 	}
1022 
1023 	if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1024 		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1025 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1026 		overlap_done = 1;
1027 		goto out_del;
1028 	}
1029 
1030 	/* Cancel state was preemptively cleared by a successful convert,
1031 	   see next comment, nothing to do. */
1032 
1033 	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1034 	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1035 		log_debug(ls, "remwait %x cancel_reply wait_type %d",
1036 			  lkb->lkb_id, lkb->lkb_wait_type);
1037 		return -1;
1038 	}
1039 
1040 	/* Remove for the convert reply, and premptively remove for the
1041 	   cancel reply.  A convert has been granted while there's still
1042 	   an outstanding cancel on it (the cancel is moot and the result
1043 	   in the cancel reply should be 0).  We preempt the cancel reply
1044 	   because the app gets the convert result and then can follow up
1045 	   with another op, like convert.  This subsequent op would see the
1046 	   lingering state of the cancel and fail with -EBUSY. */
1047 
1048 	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1049 	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1050 	    is_overlap_cancel(lkb) && ms && !ms->m_result) {
1051 		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1052 			  lkb->lkb_id);
1053 		lkb->lkb_wait_type = 0;
1054 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1055 		lkb->lkb_wait_count--;
1056 		goto out_del;
1057 	}
1058 
1059 	/* N.B. type of reply may not always correspond to type of original
1060 	   msg due to lookup->request optimization, verify others? */
1061 
1062 	if (lkb->lkb_wait_type) {
1063 		lkb->lkb_wait_type = 0;
1064 		goto out_del;
1065 	}
1066 
1067 	log_error(ls, "remwait error %x reply %d flags %x no wait_type",
1068 		  lkb->lkb_id, mstype, lkb->lkb_flags);
1069 	return -1;
1070 
1071  out_del:
1072 	/* the force-unlock/cancel has completed and we haven't recvd a reply
1073 	   to the op that was in progress prior to the unlock/cancel; we
1074 	   give up on any reply to the earlier op.  FIXME: not sure when/how
1075 	   this would happen */
1076 
1077 	if (overlap_done && lkb->lkb_wait_type) {
1078 		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1079 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
1080 		lkb->lkb_wait_count--;
1081 		lkb->lkb_wait_type = 0;
1082 	}
1083 
1084 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1085 
1086 	lkb->lkb_flags &= ~DLM_IFL_RESEND;
1087 	lkb->lkb_wait_count--;
1088 	if (!lkb->lkb_wait_count)
1089 		list_del_init(&lkb->lkb_wait_reply);
1090 	unhold_lkb(lkb);
1091 	return 0;
1092 }
1093 
1094 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1095 {
1096 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1097 	int error;
1098 
1099 	mutex_lock(&ls->ls_waiters_mutex);
1100 	error = _remove_from_waiters(lkb, mstype, NULL);
1101 	mutex_unlock(&ls->ls_waiters_mutex);
1102 	return error;
1103 }
1104 
1105 /* Handles situations where we might be processing a "fake" or "stub" reply in
1106    which we can't try to take waiters_mutex again. */
1107 
1108 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1109 {
1110 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1111 	int error;
1112 
1113 	if (ms->m_flags != DLM_IFL_STUB_MS)
1114 		mutex_lock(&ls->ls_waiters_mutex);
1115 	error = _remove_from_waiters(lkb, ms->m_type, ms);
1116 	if (ms->m_flags != DLM_IFL_STUB_MS)
1117 		mutex_unlock(&ls->ls_waiters_mutex);
1118 	return error;
1119 }
1120 
1121 static void dir_remove(struct dlm_rsb *r)
1122 {
1123 	int to_nodeid;
1124 
1125 	if (dlm_no_directory(r->res_ls))
1126 		return;
1127 
1128 	to_nodeid = dlm_dir_nodeid(r);
1129 	if (to_nodeid != dlm_our_nodeid())
1130 		send_remove(r);
1131 	else
1132 		dlm_dir_remove_entry(r->res_ls, to_nodeid,
1133 				     r->res_name, r->res_length);
1134 }
1135 
1136 /* FIXME: make this more efficient */
1137 
1138 static int shrink_bucket(struct dlm_ls *ls, int b)
1139 {
1140 	struct rb_node *n;
1141 	struct dlm_rsb *r;
1142 	int count = 0, found;
1143 
1144 	for (;;) {
1145 		found = 0;
1146 		spin_lock(&ls->ls_rsbtbl[b].lock);
1147 		for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
1148 			r = rb_entry(n, struct dlm_rsb, res_hashnode);
1149 			if (!time_after_eq(jiffies, r->res_toss_time +
1150 					   dlm_config.ci_toss_secs * HZ))
1151 				continue;
1152 			found = 1;
1153 			break;
1154 		}
1155 
1156 		if (!found) {
1157 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1158 			break;
1159 		}
1160 
1161 		if (kref_put(&r->res_ref, kill_rsb)) {
1162 			rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1163 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1164 
1165 			if (is_master(r))
1166 				dir_remove(r);
1167 			dlm_free_rsb(r);
1168 			count++;
1169 		} else {
1170 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1171 			log_error(ls, "tossed rsb in use %s", r->res_name);
1172 		}
1173 	}
1174 
1175 	return count;
1176 }
1177 
1178 void dlm_scan_rsbs(struct dlm_ls *ls)
1179 {
1180 	int i;
1181 
1182 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1183 		shrink_bucket(ls, i);
1184 		if (dlm_locking_stopped(ls))
1185 			break;
1186 		cond_resched();
1187 	}
1188 }
1189 
1190 static void add_timeout(struct dlm_lkb *lkb)
1191 {
1192 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1193 
1194 	if (is_master_copy(lkb))
1195 		return;
1196 
1197 	if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1198 	    !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1199 		lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1200 		goto add_it;
1201 	}
1202 	if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1203 		goto add_it;
1204 	return;
1205 
1206  add_it:
1207 	DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1208 	mutex_lock(&ls->ls_timeout_mutex);
1209 	hold_lkb(lkb);
1210 	list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1211 	mutex_unlock(&ls->ls_timeout_mutex);
1212 }
1213 
1214 static void del_timeout(struct dlm_lkb *lkb)
1215 {
1216 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1217 
1218 	mutex_lock(&ls->ls_timeout_mutex);
1219 	if (!list_empty(&lkb->lkb_time_list)) {
1220 		list_del_init(&lkb->lkb_time_list);
1221 		unhold_lkb(lkb);
1222 	}
1223 	mutex_unlock(&ls->ls_timeout_mutex);
1224 }
1225 
1226 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1227    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1228    and then lock rsb because of lock ordering in add_timeout.  We may need
1229    to specify some special timeout-related bits in the lkb that are just to
1230    be accessed under the timeout_mutex. */
1231 
1232 void dlm_scan_timeout(struct dlm_ls *ls)
1233 {
1234 	struct dlm_rsb *r;
1235 	struct dlm_lkb *lkb;
1236 	int do_cancel, do_warn;
1237 	s64 wait_us;
1238 
1239 	for (;;) {
1240 		if (dlm_locking_stopped(ls))
1241 			break;
1242 
1243 		do_cancel = 0;
1244 		do_warn = 0;
1245 		mutex_lock(&ls->ls_timeout_mutex);
1246 		list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1247 
1248 			wait_us = ktime_to_us(ktime_sub(ktime_get(),
1249 					      		lkb->lkb_timestamp));
1250 
1251 			if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1252 			    wait_us >= (lkb->lkb_timeout_cs * 10000))
1253 				do_cancel = 1;
1254 
1255 			if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1256 			    wait_us >= dlm_config.ci_timewarn_cs * 10000)
1257 				do_warn = 1;
1258 
1259 			if (!do_cancel && !do_warn)
1260 				continue;
1261 			hold_lkb(lkb);
1262 			break;
1263 		}
1264 		mutex_unlock(&ls->ls_timeout_mutex);
1265 
1266 		if (!do_cancel && !do_warn)
1267 			break;
1268 
1269 		r = lkb->lkb_resource;
1270 		hold_rsb(r);
1271 		lock_rsb(r);
1272 
1273 		if (do_warn) {
1274 			/* clear flag so we only warn once */
1275 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1276 			if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1277 				del_timeout(lkb);
1278 			dlm_timeout_warn(lkb);
1279 		}
1280 
1281 		if (do_cancel) {
1282 			log_debug(ls, "timeout cancel %x node %d %s",
1283 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1284 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1285 			lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1286 			del_timeout(lkb);
1287 			_cancel_lock(r, lkb);
1288 		}
1289 
1290 		unlock_rsb(r);
1291 		unhold_rsb(r);
1292 		dlm_put_lkb(lkb);
1293 	}
1294 }
1295 
1296 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1297    dlm_recoverd before checking/setting ls_recover_begin. */
1298 
1299 void dlm_adjust_timeouts(struct dlm_ls *ls)
1300 {
1301 	struct dlm_lkb *lkb;
1302 	u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1303 
1304 	ls->ls_recover_begin = 0;
1305 	mutex_lock(&ls->ls_timeout_mutex);
1306 	list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1307 		lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1308 	mutex_unlock(&ls->ls_timeout_mutex);
1309 
1310 	if (!dlm_config.ci_waitwarn_us)
1311 		return;
1312 
1313 	mutex_lock(&ls->ls_waiters_mutex);
1314 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1315 		if (ktime_to_us(lkb->lkb_wait_time))
1316 			lkb->lkb_wait_time = ktime_get();
1317 	}
1318 	mutex_unlock(&ls->ls_waiters_mutex);
1319 }
1320 
1321 /* lkb is master or local copy */
1322 
1323 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1324 {
1325 	int b, len = r->res_ls->ls_lvblen;
1326 
1327 	/* b=1 lvb returned to caller
1328 	   b=0 lvb written to rsb or invalidated
1329 	   b=-1 do nothing */
1330 
1331 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1332 
1333 	if (b == 1) {
1334 		if (!lkb->lkb_lvbptr)
1335 			return;
1336 
1337 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1338 			return;
1339 
1340 		if (!r->res_lvbptr)
1341 			return;
1342 
1343 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1344 		lkb->lkb_lvbseq = r->res_lvbseq;
1345 
1346 	} else if (b == 0) {
1347 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1348 			rsb_set_flag(r, RSB_VALNOTVALID);
1349 			return;
1350 		}
1351 
1352 		if (!lkb->lkb_lvbptr)
1353 			return;
1354 
1355 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1356 			return;
1357 
1358 		if (!r->res_lvbptr)
1359 			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1360 
1361 		if (!r->res_lvbptr)
1362 			return;
1363 
1364 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1365 		r->res_lvbseq++;
1366 		lkb->lkb_lvbseq = r->res_lvbseq;
1367 		rsb_clear_flag(r, RSB_VALNOTVALID);
1368 	}
1369 
1370 	if (rsb_flag(r, RSB_VALNOTVALID))
1371 		lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1372 }
1373 
1374 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1375 {
1376 	if (lkb->lkb_grmode < DLM_LOCK_PW)
1377 		return;
1378 
1379 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1380 		rsb_set_flag(r, RSB_VALNOTVALID);
1381 		return;
1382 	}
1383 
1384 	if (!lkb->lkb_lvbptr)
1385 		return;
1386 
1387 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1388 		return;
1389 
1390 	if (!r->res_lvbptr)
1391 		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1392 
1393 	if (!r->res_lvbptr)
1394 		return;
1395 
1396 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1397 	r->res_lvbseq++;
1398 	rsb_clear_flag(r, RSB_VALNOTVALID);
1399 }
1400 
1401 /* lkb is process copy (pc) */
1402 
1403 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1404 			    struct dlm_message *ms)
1405 {
1406 	int b;
1407 
1408 	if (!lkb->lkb_lvbptr)
1409 		return;
1410 
1411 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1412 		return;
1413 
1414 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1415 	if (b == 1) {
1416 		int len = receive_extralen(ms);
1417 		if (len > DLM_RESNAME_MAXLEN)
1418 			len = DLM_RESNAME_MAXLEN;
1419 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1420 		lkb->lkb_lvbseq = ms->m_lvbseq;
1421 	}
1422 }
1423 
1424 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1425    remove_lock -- used for unlock, removes lkb from granted
1426    revert_lock -- used for cancel, moves lkb from convert to granted
1427    grant_lock  -- used for request and convert, adds lkb to granted or
1428                   moves lkb from convert or waiting to granted
1429 
1430    Each of these is used for master or local copy lkb's.  There is
1431    also a _pc() variation used to make the corresponding change on
1432    a process copy (pc) lkb. */
1433 
1434 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1435 {
1436 	del_lkb(r, lkb);
1437 	lkb->lkb_grmode = DLM_LOCK_IV;
1438 	/* this unhold undoes the original ref from create_lkb()
1439 	   so this leads to the lkb being freed */
1440 	unhold_lkb(lkb);
1441 }
1442 
1443 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1444 {
1445 	set_lvb_unlock(r, lkb);
1446 	_remove_lock(r, lkb);
1447 }
1448 
1449 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1450 {
1451 	_remove_lock(r, lkb);
1452 }
1453 
1454 /* returns: 0 did nothing
1455 	    1 moved lock to granted
1456 	   -1 removed lock */
1457 
1458 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1459 {
1460 	int rv = 0;
1461 
1462 	lkb->lkb_rqmode = DLM_LOCK_IV;
1463 
1464 	switch (lkb->lkb_status) {
1465 	case DLM_LKSTS_GRANTED:
1466 		break;
1467 	case DLM_LKSTS_CONVERT:
1468 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1469 		rv = 1;
1470 		break;
1471 	case DLM_LKSTS_WAITING:
1472 		del_lkb(r, lkb);
1473 		lkb->lkb_grmode = DLM_LOCK_IV;
1474 		/* this unhold undoes the original ref from create_lkb()
1475 		   so this leads to the lkb being freed */
1476 		unhold_lkb(lkb);
1477 		rv = -1;
1478 		break;
1479 	default:
1480 		log_print("invalid status for revert %d", lkb->lkb_status);
1481 	}
1482 	return rv;
1483 }
1484 
1485 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1486 {
1487 	return revert_lock(r, lkb);
1488 }
1489 
1490 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1491 {
1492 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1493 		lkb->lkb_grmode = lkb->lkb_rqmode;
1494 		if (lkb->lkb_status)
1495 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1496 		else
1497 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1498 	}
1499 
1500 	lkb->lkb_rqmode = DLM_LOCK_IV;
1501 }
1502 
1503 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1504 {
1505 	set_lvb_lock(r, lkb);
1506 	_grant_lock(r, lkb);
1507 	lkb->lkb_highbast = 0;
1508 }
1509 
1510 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1511 			  struct dlm_message *ms)
1512 {
1513 	set_lvb_lock_pc(r, lkb, ms);
1514 	_grant_lock(r, lkb);
1515 }
1516 
1517 /* called by grant_pending_locks() which means an async grant message must
1518    be sent to the requesting node in addition to granting the lock if the
1519    lkb belongs to a remote node. */
1520 
1521 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1522 {
1523 	grant_lock(r, lkb);
1524 	if (is_master_copy(lkb))
1525 		send_grant(r, lkb);
1526 	else
1527 		queue_cast(r, lkb, 0);
1528 }
1529 
1530 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1531    change the granted/requested modes.  We're munging things accordingly in
1532    the process copy.
1533    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1534    conversion deadlock
1535    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1536    compatible with other granted locks */
1537 
1538 static void munge_demoted(struct dlm_lkb *lkb)
1539 {
1540 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1541 		log_print("munge_demoted %x invalid modes gr %d rq %d",
1542 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1543 		return;
1544 	}
1545 
1546 	lkb->lkb_grmode = DLM_LOCK_NL;
1547 }
1548 
1549 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1550 {
1551 	if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1552 	    ms->m_type != DLM_MSG_GRANT) {
1553 		log_print("munge_altmode %x invalid reply type %d",
1554 			  lkb->lkb_id, ms->m_type);
1555 		return;
1556 	}
1557 
1558 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1559 		lkb->lkb_rqmode = DLM_LOCK_PR;
1560 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1561 		lkb->lkb_rqmode = DLM_LOCK_CW;
1562 	else {
1563 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1564 		dlm_print_lkb(lkb);
1565 	}
1566 }
1567 
1568 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1569 {
1570 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1571 					   lkb_statequeue);
1572 	if (lkb->lkb_id == first->lkb_id)
1573 		return 1;
1574 
1575 	return 0;
1576 }
1577 
1578 /* Check if the given lkb conflicts with another lkb on the queue. */
1579 
1580 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1581 {
1582 	struct dlm_lkb *this;
1583 
1584 	list_for_each_entry(this, head, lkb_statequeue) {
1585 		if (this == lkb)
1586 			continue;
1587 		if (!modes_compat(this, lkb))
1588 			return 1;
1589 	}
1590 	return 0;
1591 }
1592 
1593 /*
1594  * "A conversion deadlock arises with a pair of lock requests in the converting
1595  * queue for one resource.  The granted mode of each lock blocks the requested
1596  * mode of the other lock."
1597  *
1598  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1599  * convert queue from being granted, then deadlk/demote lkb.
1600  *
1601  * Example:
1602  * Granted Queue: empty
1603  * Convert Queue: NL->EX (first lock)
1604  *                PR->EX (second lock)
1605  *
1606  * The first lock can't be granted because of the granted mode of the second
1607  * lock and the second lock can't be granted because it's not first in the
1608  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1609  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1610  * flag set and return DEMOTED in the lksb flags.
1611  *
1612  * Originally, this function detected conv-deadlk in a more limited scope:
1613  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1614  * - if lkb1 was the first entry in the queue (not just earlier), and was
1615  *   blocked by the granted mode of lkb2, and there was nothing on the
1616  *   granted queue preventing lkb1 from being granted immediately, i.e.
1617  *   lkb2 was the only thing preventing lkb1 from being granted.
1618  *
1619  * That second condition meant we'd only say there was conv-deadlk if
1620  * resolving it (by demotion) would lead to the first lock on the convert
1621  * queue being granted right away.  It allowed conversion deadlocks to exist
1622  * between locks on the convert queue while they couldn't be granted anyway.
1623  *
1624  * Now, we detect and take action on conversion deadlocks immediately when
1625  * they're created, even if they may not be immediately consequential.  If
1626  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1627  * mode that would prevent lkb1's conversion from being granted, we do a
1628  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1629  * I think this means that the lkb_is_ahead condition below should always
1630  * be zero, i.e. there will never be conv-deadlk between two locks that are
1631  * both already on the convert queue.
1632  */
1633 
1634 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1635 {
1636 	struct dlm_lkb *lkb1;
1637 	int lkb_is_ahead = 0;
1638 
1639 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1640 		if (lkb1 == lkb2) {
1641 			lkb_is_ahead = 1;
1642 			continue;
1643 		}
1644 
1645 		if (!lkb_is_ahead) {
1646 			if (!modes_compat(lkb2, lkb1))
1647 				return 1;
1648 		} else {
1649 			if (!modes_compat(lkb2, lkb1) &&
1650 			    !modes_compat(lkb1, lkb2))
1651 				return 1;
1652 		}
1653 	}
1654 	return 0;
1655 }
1656 
1657 /*
1658  * Return 1 if the lock can be granted, 0 otherwise.
1659  * Also detect and resolve conversion deadlocks.
1660  *
1661  * lkb is the lock to be granted
1662  *
1663  * now is 1 if the function is being called in the context of the
1664  * immediate request, it is 0 if called later, after the lock has been
1665  * queued.
1666  *
1667  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1668  */
1669 
1670 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1671 {
1672 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1673 
1674 	/*
1675 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1676 	 * a new request for a NL mode lock being blocked.
1677 	 *
1678 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1679 	 * request, then it would be granted.  In essence, the use of this flag
1680 	 * tells the Lock Manager to expedite theis request by not considering
1681 	 * what may be in the CONVERTING or WAITING queues...  As of this
1682 	 * writing, the EXPEDITE flag can be used only with new requests for NL
1683 	 * mode locks.  This flag is not valid for conversion requests.
1684 	 *
1685 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1686 	 * conversion or used with a non-NL requested mode.  We also know an
1687 	 * EXPEDITE request is always granted immediately, so now must always
1688 	 * be 1.  The full condition to grant an expedite request: (now &&
1689 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1690 	 * therefore be shortened to just checking the flag.
1691 	 */
1692 
1693 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1694 		return 1;
1695 
1696 	/*
1697 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1698 	 * added to the remaining conditions.
1699 	 */
1700 
1701 	if (queue_conflict(&r->res_grantqueue, lkb))
1702 		goto out;
1703 
1704 	/*
1705 	 * 6-3: By default, a conversion request is immediately granted if the
1706 	 * requested mode is compatible with the modes of all other granted
1707 	 * locks
1708 	 */
1709 
1710 	if (queue_conflict(&r->res_convertqueue, lkb))
1711 		goto out;
1712 
1713 	/*
1714 	 * 6-5: But the default algorithm for deciding whether to grant or
1715 	 * queue conversion requests does not by itself guarantee that such
1716 	 * requests are serviced on a "first come first serve" basis.  This, in
1717 	 * turn, can lead to a phenomenon known as "indefinate postponement".
1718 	 *
1719 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1720 	 * the system service employed to request a lock conversion.  This flag
1721 	 * forces certain conversion requests to be queued, even if they are
1722 	 * compatible with the granted modes of other locks on the same
1723 	 * resource.  Thus, the use of this flag results in conversion requests
1724 	 * being ordered on a "first come first servce" basis.
1725 	 *
1726 	 * DCT: This condition is all about new conversions being able to occur
1727 	 * "in place" while the lock remains on the granted queue (assuming
1728 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1729 	 * doesn't _have_ to go onto the convert queue where it's processed in
1730 	 * order.  The "now" variable is necessary to distinguish converts
1731 	 * being received and processed for the first time now, because once a
1732 	 * convert is moved to the conversion queue the condition below applies
1733 	 * requiring fifo granting.
1734 	 */
1735 
1736 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1737 		return 1;
1738 
1739 	/*
1740 	 * The NOORDER flag is set to avoid the standard vms rules on grant
1741 	 * order.
1742 	 */
1743 
1744 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1745 		return 1;
1746 
1747 	/*
1748 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1749 	 * granted until all other conversion requests ahead of it are granted
1750 	 * and/or canceled.
1751 	 */
1752 
1753 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1754 		return 1;
1755 
1756 	/*
1757 	 * 6-4: By default, a new request is immediately granted only if all
1758 	 * three of the following conditions are satisfied when the request is
1759 	 * issued:
1760 	 * - The queue of ungranted conversion requests for the resource is
1761 	 *   empty.
1762 	 * - The queue of ungranted new requests for the resource is empty.
1763 	 * - The mode of the new request is compatible with the most
1764 	 *   restrictive mode of all granted locks on the resource.
1765 	 */
1766 
1767 	if (now && !conv && list_empty(&r->res_convertqueue) &&
1768 	    list_empty(&r->res_waitqueue))
1769 		return 1;
1770 
1771 	/*
1772 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
1773 	 * it cannot be granted until the queue of ungranted conversion
1774 	 * requests is empty, all ungranted new requests ahead of it are
1775 	 * granted and/or canceled, and it is compatible with the granted mode
1776 	 * of the most restrictive lock granted on the resource.
1777 	 */
1778 
1779 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
1780 	    first_in_list(lkb, &r->res_waitqueue))
1781 		return 1;
1782  out:
1783 	return 0;
1784 }
1785 
1786 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1787 			  int *err)
1788 {
1789 	int rv;
1790 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1791 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1792 
1793 	if (err)
1794 		*err = 0;
1795 
1796 	rv = _can_be_granted(r, lkb, now);
1797 	if (rv)
1798 		goto out;
1799 
1800 	/*
1801 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1802 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1803 	 * cancels one of the locks.
1804 	 */
1805 
1806 	if (is_convert && can_be_queued(lkb) &&
1807 	    conversion_deadlock_detect(r, lkb)) {
1808 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1809 			lkb->lkb_grmode = DLM_LOCK_NL;
1810 			lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1811 		} else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1812 			if (err)
1813 				*err = -EDEADLK;
1814 			else {
1815 				log_print("can_be_granted deadlock %x now %d",
1816 					  lkb->lkb_id, now);
1817 				dlm_dump_rsb(r);
1818 			}
1819 		}
1820 		goto out;
1821 	}
1822 
1823 	/*
1824 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1825 	 * to grant a request in a mode other than the normal rqmode.  It's a
1826 	 * simple way to provide a big optimization to applications that can
1827 	 * use them.
1828 	 */
1829 
1830 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1831 		alt = DLM_LOCK_PR;
1832 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1833 		alt = DLM_LOCK_CW;
1834 
1835 	if (alt) {
1836 		lkb->lkb_rqmode = alt;
1837 		rv = _can_be_granted(r, lkb, now);
1838 		if (rv)
1839 			lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1840 		else
1841 			lkb->lkb_rqmode = rqmode;
1842 	}
1843  out:
1844 	return rv;
1845 }
1846 
1847 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1848    for locks pending on the convert list.  Once verified (watch for these
1849    log_prints), we should be able to just call _can_be_granted() and not
1850    bother with the demote/deadlk cases here (and there's no easy way to deal
1851    with a deadlk here, we'd have to generate something like grant_lock with
1852    the deadlk error.) */
1853 
1854 /* Returns the highest requested mode of all blocked conversions; sets
1855    cw if there's a blocked conversion to DLM_LOCK_CW. */
1856 
1857 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1858 {
1859 	struct dlm_lkb *lkb, *s;
1860 	int hi, demoted, quit, grant_restart, demote_restart;
1861 	int deadlk;
1862 
1863 	quit = 0;
1864  restart:
1865 	grant_restart = 0;
1866 	demote_restart = 0;
1867 	hi = DLM_LOCK_IV;
1868 
1869 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1870 		demoted = is_demoted(lkb);
1871 		deadlk = 0;
1872 
1873 		if (can_be_granted(r, lkb, 0, &deadlk)) {
1874 			grant_lock_pending(r, lkb);
1875 			grant_restart = 1;
1876 			continue;
1877 		}
1878 
1879 		if (!demoted && is_demoted(lkb)) {
1880 			log_print("WARN: pending demoted %x node %d %s",
1881 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1882 			demote_restart = 1;
1883 			continue;
1884 		}
1885 
1886 		if (deadlk) {
1887 			log_print("WARN: pending deadlock %x node %d %s",
1888 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1889 			dlm_dump_rsb(r);
1890 			continue;
1891 		}
1892 
1893 		hi = max_t(int, lkb->lkb_rqmode, hi);
1894 
1895 		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1896 			*cw = 1;
1897 	}
1898 
1899 	if (grant_restart)
1900 		goto restart;
1901 	if (demote_restart && !quit) {
1902 		quit = 1;
1903 		goto restart;
1904 	}
1905 
1906 	return max_t(int, high, hi);
1907 }
1908 
1909 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1910 {
1911 	struct dlm_lkb *lkb, *s;
1912 
1913 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1914 		if (can_be_granted(r, lkb, 0, NULL))
1915 			grant_lock_pending(r, lkb);
1916                 else {
1917 			high = max_t(int, lkb->lkb_rqmode, high);
1918 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
1919 				*cw = 1;
1920 		}
1921 	}
1922 
1923 	return high;
1924 }
1925 
1926 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1927    on either the convert or waiting queue.
1928    high is the largest rqmode of all locks blocked on the convert or
1929    waiting queue. */
1930 
1931 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1932 {
1933 	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1934 		if (gr->lkb_highbast < DLM_LOCK_EX)
1935 			return 1;
1936 		return 0;
1937 	}
1938 
1939 	if (gr->lkb_highbast < high &&
1940 	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1941 		return 1;
1942 	return 0;
1943 }
1944 
1945 static void grant_pending_locks(struct dlm_rsb *r)
1946 {
1947 	struct dlm_lkb *lkb, *s;
1948 	int high = DLM_LOCK_IV;
1949 	int cw = 0;
1950 
1951 	DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1952 
1953 	high = grant_pending_convert(r, high, &cw);
1954 	high = grant_pending_wait(r, high, &cw);
1955 
1956 	if (high == DLM_LOCK_IV)
1957 		return;
1958 
1959 	/*
1960 	 * If there are locks left on the wait/convert queue then send blocking
1961 	 * ASTs to granted locks based on the largest requested mode (high)
1962 	 * found above.
1963 	 */
1964 
1965 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1966 		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1967 			if (cw && high == DLM_LOCK_PR &&
1968 			    lkb->lkb_grmode == DLM_LOCK_PR)
1969 				queue_bast(r, lkb, DLM_LOCK_CW);
1970 			else
1971 				queue_bast(r, lkb, high);
1972 			lkb->lkb_highbast = high;
1973 		}
1974 	}
1975 }
1976 
1977 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1978 {
1979 	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1980 	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1981 		if (gr->lkb_highbast < DLM_LOCK_EX)
1982 			return 1;
1983 		return 0;
1984 	}
1985 
1986 	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1987 		return 1;
1988 	return 0;
1989 }
1990 
1991 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1992 			    struct dlm_lkb *lkb)
1993 {
1994 	struct dlm_lkb *gr;
1995 
1996 	list_for_each_entry(gr, head, lkb_statequeue) {
1997 		/* skip self when sending basts to convertqueue */
1998 		if (gr == lkb)
1999 			continue;
2000 		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2001 			queue_bast(r, gr, lkb->lkb_rqmode);
2002 			gr->lkb_highbast = lkb->lkb_rqmode;
2003 		}
2004 	}
2005 }
2006 
2007 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2008 {
2009 	send_bast_queue(r, &r->res_grantqueue, lkb);
2010 }
2011 
2012 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2013 {
2014 	send_bast_queue(r, &r->res_grantqueue, lkb);
2015 	send_bast_queue(r, &r->res_convertqueue, lkb);
2016 }
2017 
2018 /* set_master(r, lkb) -- set the master nodeid of a resource
2019 
2020    The purpose of this function is to set the nodeid field in the given
2021    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2022    known, it can just be copied to the lkb and the function will return
2023    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2024    before it can be copied to the lkb.
2025 
2026    When the rsb nodeid is being looked up remotely, the initial lkb
2027    causing the lookup is kept on the ls_waiters list waiting for the
2028    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2029    on the rsb's res_lookup list until the master is verified.
2030 
2031    Return values:
2032    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2033    1: the rsb master is not available and the lkb has been placed on
2034       a wait queue
2035 */
2036 
2037 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2038 {
2039 	struct dlm_ls *ls = r->res_ls;
2040 	int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
2041 
2042 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2043 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2044 		r->res_first_lkid = lkb->lkb_id;
2045 		lkb->lkb_nodeid = r->res_nodeid;
2046 		return 0;
2047 	}
2048 
2049 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2050 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2051 		return 1;
2052 	}
2053 
2054 	if (r->res_nodeid == 0) {
2055 		lkb->lkb_nodeid = 0;
2056 		return 0;
2057 	}
2058 
2059 	if (r->res_nodeid > 0) {
2060 		lkb->lkb_nodeid = r->res_nodeid;
2061 		return 0;
2062 	}
2063 
2064 	DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
2065 
2066 	dir_nodeid = dlm_dir_nodeid(r);
2067 
2068 	if (dir_nodeid != our_nodeid) {
2069 		r->res_first_lkid = lkb->lkb_id;
2070 		send_lookup(r, lkb);
2071 		return 1;
2072 	}
2073 
2074 	for (i = 0; i < 2; i++) {
2075 		/* It's possible for dlm_scand to remove an old rsb for
2076 		   this same resource from the toss list, us to create
2077 		   a new one, look up the master locally, and find it
2078 		   already exists just before dlm_scand does the
2079 		   dir_remove() on the previous rsb. */
2080 
2081 		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
2082 				       r->res_length, &ret_nodeid);
2083 		if (!error)
2084 			break;
2085 		log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
2086 		schedule();
2087 	}
2088 	if (error && error != -EEXIST)
2089 		return error;
2090 
2091 	if (ret_nodeid == our_nodeid) {
2092 		r->res_first_lkid = 0;
2093 		r->res_nodeid = 0;
2094 		lkb->lkb_nodeid = 0;
2095 	} else {
2096 		r->res_first_lkid = lkb->lkb_id;
2097 		r->res_nodeid = ret_nodeid;
2098 		lkb->lkb_nodeid = ret_nodeid;
2099 	}
2100 	return 0;
2101 }
2102 
2103 static void process_lookup_list(struct dlm_rsb *r)
2104 {
2105 	struct dlm_lkb *lkb, *safe;
2106 
2107 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2108 		list_del_init(&lkb->lkb_rsb_lookup);
2109 		_request_lock(r, lkb);
2110 		schedule();
2111 	}
2112 }
2113 
2114 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2115 
2116 static void confirm_master(struct dlm_rsb *r, int error)
2117 {
2118 	struct dlm_lkb *lkb;
2119 
2120 	if (!r->res_first_lkid)
2121 		return;
2122 
2123 	switch (error) {
2124 	case 0:
2125 	case -EINPROGRESS:
2126 		r->res_first_lkid = 0;
2127 		process_lookup_list(r);
2128 		break;
2129 
2130 	case -EAGAIN:
2131 	case -EBADR:
2132 	case -ENOTBLK:
2133 		/* the remote request failed and won't be retried (it was
2134 		   a NOQUEUE, or has been canceled/unlocked); make a waiting
2135 		   lkb the first_lkid */
2136 
2137 		r->res_first_lkid = 0;
2138 
2139 		if (!list_empty(&r->res_lookup)) {
2140 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2141 					 lkb_rsb_lookup);
2142 			list_del_init(&lkb->lkb_rsb_lookup);
2143 			r->res_first_lkid = lkb->lkb_id;
2144 			_request_lock(r, lkb);
2145 		}
2146 		break;
2147 
2148 	default:
2149 		log_error(r->res_ls, "confirm_master unknown error %d", error);
2150 	}
2151 }
2152 
2153 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2154 			 int namelen, unsigned long timeout_cs,
2155 			 void (*ast) (void *astparam),
2156 			 void *astparam,
2157 			 void (*bast) (void *astparam, int mode),
2158 			 struct dlm_args *args)
2159 {
2160 	int rv = -EINVAL;
2161 
2162 	/* check for invalid arg usage */
2163 
2164 	if (mode < 0 || mode > DLM_LOCK_EX)
2165 		goto out;
2166 
2167 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2168 		goto out;
2169 
2170 	if (flags & DLM_LKF_CANCEL)
2171 		goto out;
2172 
2173 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2174 		goto out;
2175 
2176 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2177 		goto out;
2178 
2179 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2180 		goto out;
2181 
2182 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2183 		goto out;
2184 
2185 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2186 		goto out;
2187 
2188 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2189 		goto out;
2190 
2191 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2192 		goto out;
2193 
2194 	if (!ast || !lksb)
2195 		goto out;
2196 
2197 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2198 		goto out;
2199 
2200 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2201 		goto out;
2202 
2203 	/* these args will be copied to the lkb in validate_lock_args,
2204 	   it cannot be done now because when converting locks, fields in
2205 	   an active lkb cannot be modified before locking the rsb */
2206 
2207 	args->flags = flags;
2208 	args->astfn = ast;
2209 	args->astparam = astparam;
2210 	args->bastfn = bast;
2211 	args->timeout = timeout_cs;
2212 	args->mode = mode;
2213 	args->lksb = lksb;
2214 	rv = 0;
2215  out:
2216 	return rv;
2217 }
2218 
2219 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2220 {
2221 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2222  		      DLM_LKF_FORCEUNLOCK))
2223 		return -EINVAL;
2224 
2225 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2226 		return -EINVAL;
2227 
2228 	args->flags = flags;
2229 	args->astparam = astarg;
2230 	return 0;
2231 }
2232 
2233 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2234 			      struct dlm_args *args)
2235 {
2236 	int rv = -EINVAL;
2237 
2238 	if (args->flags & DLM_LKF_CONVERT) {
2239 		if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2240 			goto out;
2241 
2242 		if (args->flags & DLM_LKF_QUECVT &&
2243 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2244 			goto out;
2245 
2246 		rv = -EBUSY;
2247 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2248 			goto out;
2249 
2250 		if (lkb->lkb_wait_type)
2251 			goto out;
2252 
2253 		if (is_overlap(lkb))
2254 			goto out;
2255 	}
2256 
2257 	lkb->lkb_exflags = args->flags;
2258 	lkb->lkb_sbflags = 0;
2259 	lkb->lkb_astfn = args->astfn;
2260 	lkb->lkb_astparam = args->astparam;
2261 	lkb->lkb_bastfn = args->bastfn;
2262 	lkb->lkb_rqmode = args->mode;
2263 	lkb->lkb_lksb = args->lksb;
2264 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2265 	lkb->lkb_ownpid = (int) current->pid;
2266 	lkb->lkb_timeout_cs = args->timeout;
2267 	rv = 0;
2268  out:
2269 	if (rv)
2270 		log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2271 			  rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2272 			  lkb->lkb_status, lkb->lkb_wait_type,
2273 			  lkb->lkb_resource->res_name);
2274 	return rv;
2275 }
2276 
2277 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2278    for success */
2279 
2280 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2281    because there may be a lookup in progress and it's valid to do
2282    cancel/unlockf on it */
2283 
2284 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2285 {
2286 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2287 	int rv = -EINVAL;
2288 
2289 	if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2290 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2291 		dlm_print_lkb(lkb);
2292 		goto out;
2293 	}
2294 
2295 	/* an lkb may still exist even though the lock is EOL'ed due to a
2296 	   cancel, unlock or failed noqueue request; an app can't use these
2297 	   locks; return same error as if the lkid had not been found at all */
2298 
2299 	if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2300 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2301 		rv = -ENOENT;
2302 		goto out;
2303 	}
2304 
2305 	/* an lkb may be waiting for an rsb lookup to complete where the
2306 	   lookup was initiated by another lock */
2307 
2308 	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2309 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2310 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2311 			list_del_init(&lkb->lkb_rsb_lookup);
2312 			queue_cast(lkb->lkb_resource, lkb,
2313 				   args->flags & DLM_LKF_CANCEL ?
2314 				   -DLM_ECANCEL : -DLM_EUNLOCK);
2315 			unhold_lkb(lkb); /* undoes create_lkb() */
2316 		}
2317 		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2318 		rv = -EBUSY;
2319 		goto out;
2320 	}
2321 
2322 	/* cancel not allowed with another cancel/unlock in progress */
2323 
2324 	if (args->flags & DLM_LKF_CANCEL) {
2325 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2326 			goto out;
2327 
2328 		if (is_overlap(lkb))
2329 			goto out;
2330 
2331 		/* don't let scand try to do a cancel */
2332 		del_timeout(lkb);
2333 
2334 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2335 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2336 			rv = -EBUSY;
2337 			goto out;
2338 		}
2339 
2340 		/* there's nothing to cancel */
2341 		if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2342 		    !lkb->lkb_wait_type) {
2343 			rv = -EBUSY;
2344 			goto out;
2345 		}
2346 
2347 		switch (lkb->lkb_wait_type) {
2348 		case DLM_MSG_LOOKUP:
2349 		case DLM_MSG_REQUEST:
2350 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2351 			rv = -EBUSY;
2352 			goto out;
2353 		case DLM_MSG_UNLOCK:
2354 		case DLM_MSG_CANCEL:
2355 			goto out;
2356 		}
2357 		/* add_to_waiters() will set OVERLAP_CANCEL */
2358 		goto out_ok;
2359 	}
2360 
2361 	/* do we need to allow a force-unlock if there's a normal unlock
2362 	   already in progress?  in what conditions could the normal unlock
2363 	   fail such that we'd want to send a force-unlock to be sure? */
2364 
2365 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2366 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2367 			goto out;
2368 
2369 		if (is_overlap_unlock(lkb))
2370 			goto out;
2371 
2372 		/* don't let scand try to do a cancel */
2373 		del_timeout(lkb);
2374 
2375 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2376 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2377 			rv = -EBUSY;
2378 			goto out;
2379 		}
2380 
2381 		switch (lkb->lkb_wait_type) {
2382 		case DLM_MSG_LOOKUP:
2383 		case DLM_MSG_REQUEST:
2384 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2385 			rv = -EBUSY;
2386 			goto out;
2387 		case DLM_MSG_UNLOCK:
2388 			goto out;
2389 		}
2390 		/* add_to_waiters() will set OVERLAP_UNLOCK */
2391 		goto out_ok;
2392 	}
2393 
2394 	/* normal unlock not allowed if there's any op in progress */
2395 	rv = -EBUSY;
2396 	if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2397 		goto out;
2398 
2399  out_ok:
2400 	/* an overlapping op shouldn't blow away exflags from other op */
2401 	lkb->lkb_exflags |= args->flags;
2402 	lkb->lkb_sbflags = 0;
2403 	lkb->lkb_astparam = args->astparam;
2404 	rv = 0;
2405  out:
2406 	if (rv)
2407 		log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2408 			  lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2409 			  args->flags, lkb->lkb_wait_type,
2410 			  lkb->lkb_resource->res_name);
2411 	return rv;
2412 }
2413 
2414 /*
2415  * Four stage 4 varieties:
2416  * do_request(), do_convert(), do_unlock(), do_cancel()
2417  * These are called on the master node for the given lock and
2418  * from the central locking logic.
2419  */
2420 
2421 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2422 {
2423 	int error = 0;
2424 
2425 	if (can_be_granted(r, lkb, 1, NULL)) {
2426 		grant_lock(r, lkb);
2427 		queue_cast(r, lkb, 0);
2428 		goto out;
2429 	}
2430 
2431 	if (can_be_queued(lkb)) {
2432 		error = -EINPROGRESS;
2433 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
2434 		add_timeout(lkb);
2435 		goto out;
2436 	}
2437 
2438 	error = -EAGAIN;
2439 	queue_cast(r, lkb, -EAGAIN);
2440  out:
2441 	return error;
2442 }
2443 
2444 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2445 			       int error)
2446 {
2447 	switch (error) {
2448 	case -EAGAIN:
2449 		if (force_blocking_asts(lkb))
2450 			send_blocking_asts_all(r, lkb);
2451 		break;
2452 	case -EINPROGRESS:
2453 		send_blocking_asts(r, lkb);
2454 		break;
2455 	}
2456 }
2457 
2458 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2459 {
2460 	int error = 0;
2461 	int deadlk = 0;
2462 
2463 	/* changing an existing lock may allow others to be granted */
2464 
2465 	if (can_be_granted(r, lkb, 1, &deadlk)) {
2466 		grant_lock(r, lkb);
2467 		queue_cast(r, lkb, 0);
2468 		goto out;
2469 	}
2470 
2471 	/* can_be_granted() detected that this lock would block in a conversion
2472 	   deadlock, so we leave it on the granted queue and return EDEADLK in
2473 	   the ast for the convert. */
2474 
2475 	if (deadlk) {
2476 		/* it's left on the granted queue */
2477 		revert_lock(r, lkb);
2478 		queue_cast(r, lkb, -EDEADLK);
2479 		error = -EDEADLK;
2480 		goto out;
2481 	}
2482 
2483 	/* is_demoted() means the can_be_granted() above set the grmode
2484 	   to NL, and left us on the granted queue.  This auto-demotion
2485 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
2486 	   now grantable.  We have to try to grant other converting locks
2487 	   before we try again to grant this one. */
2488 
2489 	if (is_demoted(lkb)) {
2490 		grant_pending_convert(r, DLM_LOCK_IV, NULL);
2491 		if (_can_be_granted(r, lkb, 1)) {
2492 			grant_lock(r, lkb);
2493 			queue_cast(r, lkb, 0);
2494 			goto out;
2495 		}
2496 		/* else fall through and move to convert queue */
2497 	}
2498 
2499 	if (can_be_queued(lkb)) {
2500 		error = -EINPROGRESS;
2501 		del_lkb(r, lkb);
2502 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2503 		add_timeout(lkb);
2504 		goto out;
2505 	}
2506 
2507 	error = -EAGAIN;
2508 	queue_cast(r, lkb, -EAGAIN);
2509  out:
2510 	return error;
2511 }
2512 
2513 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2514 			       int error)
2515 {
2516 	switch (error) {
2517 	case 0:
2518 		grant_pending_locks(r);
2519 		/* grant_pending_locks also sends basts */
2520 		break;
2521 	case -EAGAIN:
2522 		if (force_blocking_asts(lkb))
2523 			send_blocking_asts_all(r, lkb);
2524 		break;
2525 	case -EINPROGRESS:
2526 		send_blocking_asts(r, lkb);
2527 		break;
2528 	}
2529 }
2530 
2531 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2532 {
2533 	remove_lock(r, lkb);
2534 	queue_cast(r, lkb, -DLM_EUNLOCK);
2535 	return -DLM_EUNLOCK;
2536 }
2537 
2538 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2539 			      int error)
2540 {
2541 	grant_pending_locks(r);
2542 }
2543 
2544 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2545 
2546 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2547 {
2548 	int error;
2549 
2550 	error = revert_lock(r, lkb);
2551 	if (error) {
2552 		queue_cast(r, lkb, -DLM_ECANCEL);
2553 		return -DLM_ECANCEL;
2554 	}
2555 	return 0;
2556 }
2557 
2558 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2559 			      int error)
2560 {
2561 	if (error)
2562 		grant_pending_locks(r);
2563 }
2564 
2565 /*
2566  * Four stage 3 varieties:
2567  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2568  */
2569 
2570 /* add a new lkb to a possibly new rsb, called by requesting process */
2571 
2572 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2573 {
2574 	int error;
2575 
2576 	/* set_master: sets lkb nodeid from r */
2577 
2578 	error = set_master(r, lkb);
2579 	if (error < 0)
2580 		goto out;
2581 	if (error) {
2582 		error = 0;
2583 		goto out;
2584 	}
2585 
2586 	if (is_remote(r)) {
2587 		/* receive_request() calls do_request() on remote node */
2588 		error = send_request(r, lkb);
2589 	} else {
2590 		error = do_request(r, lkb);
2591 		/* for remote locks the request_reply is sent
2592 		   between do_request and do_request_effects */
2593 		do_request_effects(r, lkb, error);
2594 	}
2595  out:
2596 	return error;
2597 }
2598 
2599 /* change some property of an existing lkb, e.g. mode */
2600 
2601 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2602 {
2603 	int error;
2604 
2605 	if (is_remote(r)) {
2606 		/* receive_convert() calls do_convert() on remote node */
2607 		error = send_convert(r, lkb);
2608 	} else {
2609 		error = do_convert(r, lkb);
2610 		/* for remote locks the convert_reply is sent
2611 		   between do_convert and do_convert_effects */
2612 		do_convert_effects(r, lkb, error);
2613 	}
2614 
2615 	return error;
2616 }
2617 
2618 /* remove an existing lkb from the granted queue */
2619 
2620 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2621 {
2622 	int error;
2623 
2624 	if (is_remote(r)) {
2625 		/* receive_unlock() calls do_unlock() on remote node */
2626 		error = send_unlock(r, lkb);
2627 	} else {
2628 		error = do_unlock(r, lkb);
2629 		/* for remote locks the unlock_reply is sent
2630 		   between do_unlock and do_unlock_effects */
2631 		do_unlock_effects(r, lkb, error);
2632 	}
2633 
2634 	return error;
2635 }
2636 
2637 /* remove an existing lkb from the convert or wait queue */
2638 
2639 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2640 {
2641 	int error;
2642 
2643 	if (is_remote(r)) {
2644 		/* receive_cancel() calls do_cancel() on remote node */
2645 		error = send_cancel(r, lkb);
2646 	} else {
2647 		error = do_cancel(r, lkb);
2648 		/* for remote locks the cancel_reply is sent
2649 		   between do_cancel and do_cancel_effects */
2650 		do_cancel_effects(r, lkb, error);
2651 	}
2652 
2653 	return error;
2654 }
2655 
2656 /*
2657  * Four stage 2 varieties:
2658  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2659  */
2660 
2661 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2662 			int len, struct dlm_args *args)
2663 {
2664 	struct dlm_rsb *r;
2665 	int error;
2666 
2667 	error = validate_lock_args(ls, lkb, args);
2668 	if (error)
2669 		goto out;
2670 
2671 	error = find_rsb(ls, name, len, R_CREATE, &r);
2672 	if (error)
2673 		goto out;
2674 
2675 	lock_rsb(r);
2676 
2677 	attach_lkb(r, lkb);
2678 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2679 
2680 	error = _request_lock(r, lkb);
2681 
2682 	unlock_rsb(r);
2683 	put_rsb(r);
2684 
2685  out:
2686 	return error;
2687 }
2688 
2689 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2690 			struct dlm_args *args)
2691 {
2692 	struct dlm_rsb *r;
2693 	int error;
2694 
2695 	r = lkb->lkb_resource;
2696 
2697 	hold_rsb(r);
2698 	lock_rsb(r);
2699 
2700 	error = validate_lock_args(ls, lkb, args);
2701 	if (error)
2702 		goto out;
2703 
2704 	error = _convert_lock(r, lkb);
2705  out:
2706 	unlock_rsb(r);
2707 	put_rsb(r);
2708 	return error;
2709 }
2710 
2711 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2712 		       struct dlm_args *args)
2713 {
2714 	struct dlm_rsb *r;
2715 	int error;
2716 
2717 	r = lkb->lkb_resource;
2718 
2719 	hold_rsb(r);
2720 	lock_rsb(r);
2721 
2722 	error = validate_unlock_args(lkb, args);
2723 	if (error)
2724 		goto out;
2725 
2726 	error = _unlock_lock(r, lkb);
2727  out:
2728 	unlock_rsb(r);
2729 	put_rsb(r);
2730 	return error;
2731 }
2732 
2733 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2734 		       struct dlm_args *args)
2735 {
2736 	struct dlm_rsb *r;
2737 	int error;
2738 
2739 	r = lkb->lkb_resource;
2740 
2741 	hold_rsb(r);
2742 	lock_rsb(r);
2743 
2744 	error = validate_unlock_args(lkb, args);
2745 	if (error)
2746 		goto out;
2747 
2748 	error = _cancel_lock(r, lkb);
2749  out:
2750 	unlock_rsb(r);
2751 	put_rsb(r);
2752 	return error;
2753 }
2754 
2755 /*
2756  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2757  */
2758 
2759 int dlm_lock(dlm_lockspace_t *lockspace,
2760 	     int mode,
2761 	     struct dlm_lksb *lksb,
2762 	     uint32_t flags,
2763 	     void *name,
2764 	     unsigned int namelen,
2765 	     uint32_t parent_lkid,
2766 	     void (*ast) (void *astarg),
2767 	     void *astarg,
2768 	     void (*bast) (void *astarg, int mode))
2769 {
2770 	struct dlm_ls *ls;
2771 	struct dlm_lkb *lkb;
2772 	struct dlm_args args;
2773 	int error, convert = flags & DLM_LKF_CONVERT;
2774 
2775 	ls = dlm_find_lockspace_local(lockspace);
2776 	if (!ls)
2777 		return -EINVAL;
2778 
2779 	dlm_lock_recovery(ls);
2780 
2781 	if (convert)
2782 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
2783 	else
2784 		error = create_lkb(ls, &lkb);
2785 
2786 	if (error)
2787 		goto out;
2788 
2789 	error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2790 			      astarg, bast, &args);
2791 	if (error)
2792 		goto out_put;
2793 
2794 	if (convert)
2795 		error = convert_lock(ls, lkb, &args);
2796 	else
2797 		error = request_lock(ls, lkb, name, namelen, &args);
2798 
2799 	if (error == -EINPROGRESS)
2800 		error = 0;
2801  out_put:
2802 	if (convert || error)
2803 		__put_lkb(ls, lkb);
2804 	if (error == -EAGAIN || error == -EDEADLK)
2805 		error = 0;
2806  out:
2807 	dlm_unlock_recovery(ls);
2808 	dlm_put_lockspace(ls);
2809 	return error;
2810 }
2811 
2812 int dlm_unlock(dlm_lockspace_t *lockspace,
2813 	       uint32_t lkid,
2814 	       uint32_t flags,
2815 	       struct dlm_lksb *lksb,
2816 	       void *astarg)
2817 {
2818 	struct dlm_ls *ls;
2819 	struct dlm_lkb *lkb;
2820 	struct dlm_args args;
2821 	int error;
2822 
2823 	ls = dlm_find_lockspace_local(lockspace);
2824 	if (!ls)
2825 		return -EINVAL;
2826 
2827 	dlm_lock_recovery(ls);
2828 
2829 	error = find_lkb(ls, lkid, &lkb);
2830 	if (error)
2831 		goto out;
2832 
2833 	error = set_unlock_args(flags, astarg, &args);
2834 	if (error)
2835 		goto out_put;
2836 
2837 	if (flags & DLM_LKF_CANCEL)
2838 		error = cancel_lock(ls, lkb, &args);
2839 	else
2840 		error = unlock_lock(ls, lkb, &args);
2841 
2842 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2843 		error = 0;
2844 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2845 		error = 0;
2846  out_put:
2847 	dlm_put_lkb(lkb);
2848  out:
2849 	dlm_unlock_recovery(ls);
2850 	dlm_put_lockspace(ls);
2851 	return error;
2852 }
2853 
2854 /*
2855  * send/receive routines for remote operations and replies
2856  *
2857  * send_args
2858  * send_common
2859  * send_request			receive_request
2860  * send_convert			receive_convert
2861  * send_unlock			receive_unlock
2862  * send_cancel			receive_cancel
2863  * send_grant			receive_grant
2864  * send_bast			receive_bast
2865  * send_lookup			receive_lookup
2866  * send_remove			receive_remove
2867  *
2868  * 				send_common_reply
2869  * receive_request_reply	send_request_reply
2870  * receive_convert_reply	send_convert_reply
2871  * receive_unlock_reply		send_unlock_reply
2872  * receive_cancel_reply		send_cancel_reply
2873  * receive_lookup_reply		send_lookup_reply
2874  */
2875 
2876 static int _create_message(struct dlm_ls *ls, int mb_len,
2877 			   int to_nodeid, int mstype,
2878 			   struct dlm_message **ms_ret,
2879 			   struct dlm_mhandle **mh_ret)
2880 {
2881 	struct dlm_message *ms;
2882 	struct dlm_mhandle *mh;
2883 	char *mb;
2884 
2885 	/* get_buffer gives us a message handle (mh) that we need to
2886 	   pass into lowcomms_commit and a message buffer (mb) that we
2887 	   write our data into */
2888 
2889 	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2890 	if (!mh)
2891 		return -ENOBUFS;
2892 
2893 	memset(mb, 0, mb_len);
2894 
2895 	ms = (struct dlm_message *) mb;
2896 
2897 	ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2898 	ms->m_header.h_lockspace = ls->ls_global_id;
2899 	ms->m_header.h_nodeid = dlm_our_nodeid();
2900 	ms->m_header.h_length = mb_len;
2901 	ms->m_header.h_cmd = DLM_MSG;
2902 
2903 	ms->m_type = mstype;
2904 
2905 	*mh_ret = mh;
2906 	*ms_ret = ms;
2907 	return 0;
2908 }
2909 
2910 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2911 			  int to_nodeid, int mstype,
2912 			  struct dlm_message **ms_ret,
2913 			  struct dlm_mhandle **mh_ret)
2914 {
2915 	int mb_len = sizeof(struct dlm_message);
2916 
2917 	switch (mstype) {
2918 	case DLM_MSG_REQUEST:
2919 	case DLM_MSG_LOOKUP:
2920 	case DLM_MSG_REMOVE:
2921 		mb_len += r->res_length;
2922 		break;
2923 	case DLM_MSG_CONVERT:
2924 	case DLM_MSG_UNLOCK:
2925 	case DLM_MSG_REQUEST_REPLY:
2926 	case DLM_MSG_CONVERT_REPLY:
2927 	case DLM_MSG_GRANT:
2928 		if (lkb && lkb->lkb_lvbptr)
2929 			mb_len += r->res_ls->ls_lvblen;
2930 		break;
2931 	}
2932 
2933 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2934 			       ms_ret, mh_ret);
2935 }
2936 
2937 /* further lowcomms enhancements or alternate implementations may make
2938    the return value from this function useful at some point */
2939 
2940 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2941 {
2942 	dlm_message_out(ms);
2943 	dlm_lowcomms_commit_buffer(mh);
2944 	return 0;
2945 }
2946 
2947 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2948 		      struct dlm_message *ms)
2949 {
2950 	ms->m_nodeid   = lkb->lkb_nodeid;
2951 	ms->m_pid      = lkb->lkb_ownpid;
2952 	ms->m_lkid     = lkb->lkb_id;
2953 	ms->m_remid    = lkb->lkb_remid;
2954 	ms->m_exflags  = lkb->lkb_exflags;
2955 	ms->m_sbflags  = lkb->lkb_sbflags;
2956 	ms->m_flags    = lkb->lkb_flags;
2957 	ms->m_lvbseq   = lkb->lkb_lvbseq;
2958 	ms->m_status   = lkb->lkb_status;
2959 	ms->m_grmode   = lkb->lkb_grmode;
2960 	ms->m_rqmode   = lkb->lkb_rqmode;
2961 	ms->m_hash     = r->res_hash;
2962 
2963 	/* m_result and m_bastmode are set from function args,
2964 	   not from lkb fields */
2965 
2966 	if (lkb->lkb_bastfn)
2967 		ms->m_asts |= DLM_CB_BAST;
2968 	if (lkb->lkb_astfn)
2969 		ms->m_asts |= DLM_CB_CAST;
2970 
2971 	/* compare with switch in create_message; send_remove() doesn't
2972 	   use send_args() */
2973 
2974 	switch (ms->m_type) {
2975 	case DLM_MSG_REQUEST:
2976 	case DLM_MSG_LOOKUP:
2977 		memcpy(ms->m_extra, r->res_name, r->res_length);
2978 		break;
2979 	case DLM_MSG_CONVERT:
2980 	case DLM_MSG_UNLOCK:
2981 	case DLM_MSG_REQUEST_REPLY:
2982 	case DLM_MSG_CONVERT_REPLY:
2983 	case DLM_MSG_GRANT:
2984 		if (!lkb->lkb_lvbptr)
2985 			break;
2986 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2987 		break;
2988 	}
2989 }
2990 
2991 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2992 {
2993 	struct dlm_message *ms;
2994 	struct dlm_mhandle *mh;
2995 	int to_nodeid, error;
2996 
2997 	to_nodeid = r->res_nodeid;
2998 
2999 	error = add_to_waiters(lkb, mstype, to_nodeid);
3000 	if (error)
3001 		return error;
3002 
3003 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3004 	if (error)
3005 		goto fail;
3006 
3007 	send_args(r, lkb, ms);
3008 
3009 	error = send_message(mh, ms);
3010 	if (error)
3011 		goto fail;
3012 	return 0;
3013 
3014  fail:
3015 	remove_from_waiters(lkb, msg_reply_type(mstype));
3016 	return error;
3017 }
3018 
3019 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3020 {
3021 	return send_common(r, lkb, DLM_MSG_REQUEST);
3022 }
3023 
3024 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3025 {
3026 	int error;
3027 
3028 	error = send_common(r, lkb, DLM_MSG_CONVERT);
3029 
3030 	/* down conversions go without a reply from the master */
3031 	if (!error && down_conversion(lkb)) {
3032 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3033 		r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
3034 		r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3035 		r->res_ls->ls_stub_ms.m_result = 0;
3036 		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3037 	}
3038 
3039 	return error;
3040 }
3041 
3042 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3043    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3044    that the master is still correct. */
3045 
3046 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3047 {
3048 	return send_common(r, lkb, DLM_MSG_UNLOCK);
3049 }
3050 
3051 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3052 {
3053 	return send_common(r, lkb, DLM_MSG_CANCEL);
3054 }
3055 
3056 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3057 {
3058 	struct dlm_message *ms;
3059 	struct dlm_mhandle *mh;
3060 	int to_nodeid, error;
3061 
3062 	to_nodeid = lkb->lkb_nodeid;
3063 
3064 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3065 	if (error)
3066 		goto out;
3067 
3068 	send_args(r, lkb, ms);
3069 
3070 	ms->m_result = 0;
3071 
3072 	error = send_message(mh, ms);
3073  out:
3074 	return error;
3075 }
3076 
3077 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3078 {
3079 	struct dlm_message *ms;
3080 	struct dlm_mhandle *mh;
3081 	int to_nodeid, error;
3082 
3083 	to_nodeid = lkb->lkb_nodeid;
3084 
3085 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3086 	if (error)
3087 		goto out;
3088 
3089 	send_args(r, lkb, ms);
3090 
3091 	ms->m_bastmode = mode;
3092 
3093 	error = send_message(mh, ms);
3094  out:
3095 	return error;
3096 }
3097 
3098 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3099 {
3100 	struct dlm_message *ms;
3101 	struct dlm_mhandle *mh;
3102 	int to_nodeid, error;
3103 
3104 	to_nodeid = dlm_dir_nodeid(r);
3105 
3106 	error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3107 	if (error)
3108 		return error;
3109 
3110 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3111 	if (error)
3112 		goto fail;
3113 
3114 	send_args(r, lkb, ms);
3115 
3116 	error = send_message(mh, ms);
3117 	if (error)
3118 		goto fail;
3119 	return 0;
3120 
3121  fail:
3122 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3123 	return error;
3124 }
3125 
3126 static int send_remove(struct dlm_rsb *r)
3127 {
3128 	struct dlm_message *ms;
3129 	struct dlm_mhandle *mh;
3130 	int to_nodeid, error;
3131 
3132 	to_nodeid = dlm_dir_nodeid(r);
3133 
3134 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3135 	if (error)
3136 		goto out;
3137 
3138 	memcpy(ms->m_extra, r->res_name, r->res_length);
3139 	ms->m_hash = r->res_hash;
3140 
3141 	error = send_message(mh, ms);
3142  out:
3143 	return error;
3144 }
3145 
3146 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3147 			     int mstype, int rv)
3148 {
3149 	struct dlm_message *ms;
3150 	struct dlm_mhandle *mh;
3151 	int to_nodeid, error;
3152 
3153 	to_nodeid = lkb->lkb_nodeid;
3154 
3155 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3156 	if (error)
3157 		goto out;
3158 
3159 	send_args(r, lkb, ms);
3160 
3161 	ms->m_result = rv;
3162 
3163 	error = send_message(mh, ms);
3164  out:
3165 	return error;
3166 }
3167 
3168 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3169 {
3170 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3171 }
3172 
3173 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3174 {
3175 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3176 }
3177 
3178 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3179 {
3180 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3181 }
3182 
3183 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3184 {
3185 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3186 }
3187 
3188 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3189 			     int ret_nodeid, int rv)
3190 {
3191 	struct dlm_rsb *r = &ls->ls_stub_rsb;
3192 	struct dlm_message *ms;
3193 	struct dlm_mhandle *mh;
3194 	int error, nodeid = ms_in->m_header.h_nodeid;
3195 
3196 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3197 	if (error)
3198 		goto out;
3199 
3200 	ms->m_lkid = ms_in->m_lkid;
3201 	ms->m_result = rv;
3202 	ms->m_nodeid = ret_nodeid;
3203 
3204 	error = send_message(mh, ms);
3205  out:
3206 	return error;
3207 }
3208 
3209 /* which args we save from a received message depends heavily on the type
3210    of message, unlike the send side where we can safely send everything about
3211    the lkb for any type of message */
3212 
3213 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3214 {
3215 	lkb->lkb_exflags = ms->m_exflags;
3216 	lkb->lkb_sbflags = ms->m_sbflags;
3217 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3218 		         (ms->m_flags & 0x0000FFFF);
3219 }
3220 
3221 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3222 {
3223 	if (ms->m_flags == DLM_IFL_STUB_MS)
3224 		return;
3225 
3226 	lkb->lkb_sbflags = ms->m_sbflags;
3227 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3228 		         (ms->m_flags & 0x0000FFFF);
3229 }
3230 
3231 static int receive_extralen(struct dlm_message *ms)
3232 {
3233 	return (ms->m_header.h_length - sizeof(struct dlm_message));
3234 }
3235 
3236 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3237 		       struct dlm_message *ms)
3238 {
3239 	int len;
3240 
3241 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3242 		if (!lkb->lkb_lvbptr)
3243 			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3244 		if (!lkb->lkb_lvbptr)
3245 			return -ENOMEM;
3246 		len = receive_extralen(ms);
3247 		if (len > DLM_RESNAME_MAXLEN)
3248 			len = DLM_RESNAME_MAXLEN;
3249 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3250 	}
3251 	return 0;
3252 }
3253 
3254 static void fake_bastfn(void *astparam, int mode)
3255 {
3256 	log_print("fake_bastfn should not be called");
3257 }
3258 
3259 static void fake_astfn(void *astparam)
3260 {
3261 	log_print("fake_astfn should not be called");
3262 }
3263 
3264 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3265 				struct dlm_message *ms)
3266 {
3267 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3268 	lkb->lkb_ownpid = ms->m_pid;
3269 	lkb->lkb_remid = ms->m_lkid;
3270 	lkb->lkb_grmode = DLM_LOCK_IV;
3271 	lkb->lkb_rqmode = ms->m_rqmode;
3272 
3273 	lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3274 	lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3275 
3276 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3277 		/* lkb was just created so there won't be an lvb yet */
3278 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3279 		if (!lkb->lkb_lvbptr)
3280 			return -ENOMEM;
3281 	}
3282 
3283 	return 0;
3284 }
3285 
3286 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3287 				struct dlm_message *ms)
3288 {
3289 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3290 		return -EBUSY;
3291 
3292 	if (receive_lvb(ls, lkb, ms))
3293 		return -ENOMEM;
3294 
3295 	lkb->lkb_rqmode = ms->m_rqmode;
3296 	lkb->lkb_lvbseq = ms->m_lvbseq;
3297 
3298 	return 0;
3299 }
3300 
3301 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3302 			       struct dlm_message *ms)
3303 {
3304 	if (receive_lvb(ls, lkb, ms))
3305 		return -ENOMEM;
3306 	return 0;
3307 }
3308 
3309 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3310    uses to send a reply and that the remote end uses to process the reply. */
3311 
3312 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3313 {
3314 	struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3315 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3316 	lkb->lkb_remid = ms->m_lkid;
3317 }
3318 
3319 /* This is called after the rsb is locked so that we can safely inspect
3320    fields in the lkb. */
3321 
3322 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3323 {
3324 	int from = ms->m_header.h_nodeid;
3325 	int error = 0;
3326 
3327 	switch (ms->m_type) {
3328 	case DLM_MSG_CONVERT:
3329 	case DLM_MSG_UNLOCK:
3330 	case DLM_MSG_CANCEL:
3331 		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3332 			error = -EINVAL;
3333 		break;
3334 
3335 	case DLM_MSG_CONVERT_REPLY:
3336 	case DLM_MSG_UNLOCK_REPLY:
3337 	case DLM_MSG_CANCEL_REPLY:
3338 	case DLM_MSG_GRANT:
3339 	case DLM_MSG_BAST:
3340 		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3341 			error = -EINVAL;
3342 		break;
3343 
3344 	case DLM_MSG_REQUEST_REPLY:
3345 		if (!is_process_copy(lkb))
3346 			error = -EINVAL;
3347 		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3348 			error = -EINVAL;
3349 		break;
3350 
3351 	default:
3352 		error = -EINVAL;
3353 	}
3354 
3355 	if (error)
3356 		log_error(lkb->lkb_resource->res_ls,
3357 			  "ignore invalid message %d from %d %x %x %x %d",
3358 			  ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3359 			  lkb->lkb_flags, lkb->lkb_nodeid);
3360 	return error;
3361 }
3362 
3363 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3364 {
3365 	struct dlm_lkb *lkb;
3366 	struct dlm_rsb *r;
3367 	int error, namelen;
3368 
3369 	error = create_lkb(ls, &lkb);
3370 	if (error)
3371 		goto fail;
3372 
3373 	receive_flags(lkb, ms);
3374 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
3375 	error = receive_request_args(ls, lkb, ms);
3376 	if (error) {
3377 		__put_lkb(ls, lkb);
3378 		goto fail;
3379 	}
3380 
3381 	namelen = receive_extralen(ms);
3382 
3383 	error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3384 	if (error) {
3385 		__put_lkb(ls, lkb);
3386 		goto fail;
3387 	}
3388 
3389 	lock_rsb(r);
3390 
3391 	attach_lkb(r, lkb);
3392 	error = do_request(r, lkb);
3393 	send_request_reply(r, lkb, error);
3394 	do_request_effects(r, lkb, error);
3395 
3396 	unlock_rsb(r);
3397 	put_rsb(r);
3398 
3399 	if (error == -EINPROGRESS)
3400 		error = 0;
3401 	if (error)
3402 		dlm_put_lkb(lkb);
3403 	return;
3404 
3405  fail:
3406 	setup_stub_lkb(ls, ms);
3407 	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3408 }
3409 
3410 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3411 {
3412 	struct dlm_lkb *lkb;
3413 	struct dlm_rsb *r;
3414 	int error, reply = 1;
3415 
3416 	error = find_lkb(ls, ms->m_remid, &lkb);
3417 	if (error)
3418 		goto fail;
3419 
3420 	r = lkb->lkb_resource;
3421 
3422 	hold_rsb(r);
3423 	lock_rsb(r);
3424 
3425 	error = validate_message(lkb, ms);
3426 	if (error)
3427 		goto out;
3428 
3429 	receive_flags(lkb, ms);
3430 
3431 	error = receive_convert_args(ls, lkb, ms);
3432 	if (error) {
3433 		send_convert_reply(r, lkb, error);
3434 		goto out;
3435 	}
3436 
3437 	reply = !down_conversion(lkb);
3438 
3439 	error = do_convert(r, lkb);
3440 	if (reply)
3441 		send_convert_reply(r, lkb, error);
3442 	do_convert_effects(r, lkb, error);
3443  out:
3444 	unlock_rsb(r);
3445 	put_rsb(r);
3446 	dlm_put_lkb(lkb);
3447 	return;
3448 
3449  fail:
3450 	setup_stub_lkb(ls, ms);
3451 	send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3452 }
3453 
3454 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3455 {
3456 	struct dlm_lkb *lkb;
3457 	struct dlm_rsb *r;
3458 	int error;
3459 
3460 	error = find_lkb(ls, ms->m_remid, &lkb);
3461 	if (error)
3462 		goto fail;
3463 
3464 	r = lkb->lkb_resource;
3465 
3466 	hold_rsb(r);
3467 	lock_rsb(r);
3468 
3469 	error = validate_message(lkb, ms);
3470 	if (error)
3471 		goto out;
3472 
3473 	receive_flags(lkb, ms);
3474 
3475 	error = receive_unlock_args(ls, lkb, ms);
3476 	if (error) {
3477 		send_unlock_reply(r, lkb, error);
3478 		goto out;
3479 	}
3480 
3481 	error = do_unlock(r, lkb);
3482 	send_unlock_reply(r, lkb, error);
3483 	do_unlock_effects(r, lkb, error);
3484  out:
3485 	unlock_rsb(r);
3486 	put_rsb(r);
3487 	dlm_put_lkb(lkb);
3488 	return;
3489 
3490  fail:
3491 	setup_stub_lkb(ls, ms);
3492 	send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3493 }
3494 
3495 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3496 {
3497 	struct dlm_lkb *lkb;
3498 	struct dlm_rsb *r;
3499 	int error;
3500 
3501 	error = find_lkb(ls, ms->m_remid, &lkb);
3502 	if (error)
3503 		goto fail;
3504 
3505 	receive_flags(lkb, ms);
3506 
3507 	r = lkb->lkb_resource;
3508 
3509 	hold_rsb(r);
3510 	lock_rsb(r);
3511 
3512 	error = validate_message(lkb, ms);
3513 	if (error)
3514 		goto out;
3515 
3516 	error = do_cancel(r, lkb);
3517 	send_cancel_reply(r, lkb, error);
3518 	do_cancel_effects(r, lkb, error);
3519  out:
3520 	unlock_rsb(r);
3521 	put_rsb(r);
3522 	dlm_put_lkb(lkb);
3523 	return;
3524 
3525  fail:
3526 	setup_stub_lkb(ls, ms);
3527 	send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3528 }
3529 
3530 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3531 {
3532 	struct dlm_lkb *lkb;
3533 	struct dlm_rsb *r;
3534 	int error;
3535 
3536 	error = find_lkb(ls, ms->m_remid, &lkb);
3537 	if (error) {
3538 		log_debug(ls, "receive_grant from %d no lkb %x",
3539 			  ms->m_header.h_nodeid, ms->m_remid);
3540 		return;
3541 	}
3542 
3543 	r = lkb->lkb_resource;
3544 
3545 	hold_rsb(r);
3546 	lock_rsb(r);
3547 
3548 	error = validate_message(lkb, ms);
3549 	if (error)
3550 		goto out;
3551 
3552 	receive_flags_reply(lkb, ms);
3553 	if (is_altmode(lkb))
3554 		munge_altmode(lkb, ms);
3555 	grant_lock_pc(r, lkb, ms);
3556 	queue_cast(r, lkb, 0);
3557  out:
3558 	unlock_rsb(r);
3559 	put_rsb(r);
3560 	dlm_put_lkb(lkb);
3561 }
3562 
3563 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3564 {
3565 	struct dlm_lkb *lkb;
3566 	struct dlm_rsb *r;
3567 	int error;
3568 
3569 	error = find_lkb(ls, ms->m_remid, &lkb);
3570 	if (error) {
3571 		log_debug(ls, "receive_bast from %d no lkb %x",
3572 			  ms->m_header.h_nodeid, ms->m_remid);
3573 		return;
3574 	}
3575 
3576 	r = lkb->lkb_resource;
3577 
3578 	hold_rsb(r);
3579 	lock_rsb(r);
3580 
3581 	error = validate_message(lkb, ms);
3582 	if (error)
3583 		goto out;
3584 
3585 	queue_bast(r, lkb, ms->m_bastmode);
3586  out:
3587 	unlock_rsb(r);
3588 	put_rsb(r);
3589 	dlm_put_lkb(lkb);
3590 }
3591 
3592 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3593 {
3594 	int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3595 
3596 	from_nodeid = ms->m_header.h_nodeid;
3597 	our_nodeid = dlm_our_nodeid();
3598 
3599 	len = receive_extralen(ms);
3600 
3601 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3602 	if (dir_nodeid != our_nodeid) {
3603 		log_error(ls, "lookup dir_nodeid %d from %d",
3604 			  dir_nodeid, from_nodeid);
3605 		error = -EINVAL;
3606 		ret_nodeid = -1;
3607 		goto out;
3608 	}
3609 
3610 	error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3611 
3612 	/* Optimization: we're master so treat lookup as a request */
3613 	if (!error && ret_nodeid == our_nodeid) {
3614 		receive_request(ls, ms);
3615 		return;
3616 	}
3617  out:
3618 	send_lookup_reply(ls, ms, ret_nodeid, error);
3619 }
3620 
3621 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3622 {
3623 	int len, dir_nodeid, from_nodeid;
3624 
3625 	from_nodeid = ms->m_header.h_nodeid;
3626 
3627 	len = receive_extralen(ms);
3628 
3629 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3630 	if (dir_nodeid != dlm_our_nodeid()) {
3631 		log_error(ls, "remove dir entry dir_nodeid %d from %d",
3632 			  dir_nodeid, from_nodeid);
3633 		return;
3634 	}
3635 
3636 	dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3637 }
3638 
3639 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3640 {
3641 	do_purge(ls, ms->m_nodeid, ms->m_pid);
3642 }
3643 
3644 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3645 {
3646 	struct dlm_lkb *lkb;
3647 	struct dlm_rsb *r;
3648 	int error, mstype, result;
3649 
3650 	error = find_lkb(ls, ms->m_remid, &lkb);
3651 	if (error) {
3652 		log_debug(ls, "receive_request_reply from %d no lkb %x",
3653 			  ms->m_header.h_nodeid, ms->m_remid);
3654 		return;
3655 	}
3656 
3657 	r = lkb->lkb_resource;
3658 	hold_rsb(r);
3659 	lock_rsb(r);
3660 
3661 	error = validate_message(lkb, ms);
3662 	if (error)
3663 		goto out;
3664 
3665 	mstype = lkb->lkb_wait_type;
3666 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3667 	if (error)
3668 		goto out;
3669 
3670 	/* Optimization: the dir node was also the master, so it took our
3671 	   lookup as a request and sent request reply instead of lookup reply */
3672 	if (mstype == DLM_MSG_LOOKUP) {
3673 		r->res_nodeid = ms->m_header.h_nodeid;
3674 		lkb->lkb_nodeid = r->res_nodeid;
3675 	}
3676 
3677 	/* this is the value returned from do_request() on the master */
3678 	result = ms->m_result;
3679 
3680 	switch (result) {
3681 	case -EAGAIN:
3682 		/* request would block (be queued) on remote master */
3683 		queue_cast(r, lkb, -EAGAIN);
3684 		confirm_master(r, -EAGAIN);
3685 		unhold_lkb(lkb); /* undoes create_lkb() */
3686 		break;
3687 
3688 	case -EINPROGRESS:
3689 	case 0:
3690 		/* request was queued or granted on remote master */
3691 		receive_flags_reply(lkb, ms);
3692 		lkb->lkb_remid = ms->m_lkid;
3693 		if (is_altmode(lkb))
3694 			munge_altmode(lkb, ms);
3695 		if (result) {
3696 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
3697 			add_timeout(lkb);
3698 		} else {
3699 			grant_lock_pc(r, lkb, ms);
3700 			queue_cast(r, lkb, 0);
3701 		}
3702 		confirm_master(r, result);
3703 		break;
3704 
3705 	case -EBADR:
3706 	case -ENOTBLK:
3707 		/* find_rsb failed to find rsb or rsb wasn't master */
3708 		log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3709 			  lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3710 		r->res_nodeid = -1;
3711 		lkb->lkb_nodeid = -1;
3712 
3713 		if (is_overlap(lkb)) {
3714 			/* we'll ignore error in cancel/unlock reply */
3715 			queue_cast_overlap(r, lkb);
3716 			confirm_master(r, result);
3717 			unhold_lkb(lkb); /* undoes create_lkb() */
3718 		} else
3719 			_request_lock(r, lkb);
3720 		break;
3721 
3722 	default:
3723 		log_error(ls, "receive_request_reply %x error %d",
3724 			  lkb->lkb_id, result);
3725 	}
3726 
3727 	if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3728 		log_debug(ls, "receive_request_reply %x result %d unlock",
3729 			  lkb->lkb_id, result);
3730 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3731 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3732 		send_unlock(r, lkb);
3733 	} else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3734 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3735 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3736 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3737 		send_cancel(r, lkb);
3738 	} else {
3739 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3740 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3741 	}
3742  out:
3743 	unlock_rsb(r);
3744 	put_rsb(r);
3745 	dlm_put_lkb(lkb);
3746 }
3747 
3748 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3749 				    struct dlm_message *ms)
3750 {
3751 	/* this is the value returned from do_convert() on the master */
3752 	switch (ms->m_result) {
3753 	case -EAGAIN:
3754 		/* convert would block (be queued) on remote master */
3755 		queue_cast(r, lkb, -EAGAIN);
3756 		break;
3757 
3758 	case -EDEADLK:
3759 		receive_flags_reply(lkb, ms);
3760 		revert_lock_pc(r, lkb);
3761 		queue_cast(r, lkb, -EDEADLK);
3762 		break;
3763 
3764 	case -EINPROGRESS:
3765 		/* convert was queued on remote master */
3766 		receive_flags_reply(lkb, ms);
3767 		if (is_demoted(lkb))
3768 			munge_demoted(lkb);
3769 		del_lkb(r, lkb);
3770 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3771 		add_timeout(lkb);
3772 		break;
3773 
3774 	case 0:
3775 		/* convert was granted on remote master */
3776 		receive_flags_reply(lkb, ms);
3777 		if (is_demoted(lkb))
3778 			munge_demoted(lkb);
3779 		grant_lock_pc(r, lkb, ms);
3780 		queue_cast(r, lkb, 0);
3781 		break;
3782 
3783 	default:
3784 		log_error(r->res_ls, "receive_convert_reply %x error %d",
3785 			  lkb->lkb_id, ms->m_result);
3786 	}
3787 }
3788 
3789 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3790 {
3791 	struct dlm_rsb *r = lkb->lkb_resource;
3792 	int error;
3793 
3794 	hold_rsb(r);
3795 	lock_rsb(r);
3796 
3797 	error = validate_message(lkb, ms);
3798 	if (error)
3799 		goto out;
3800 
3801 	/* stub reply can happen with waiters_mutex held */
3802 	error = remove_from_waiters_ms(lkb, ms);
3803 	if (error)
3804 		goto out;
3805 
3806 	__receive_convert_reply(r, lkb, ms);
3807  out:
3808 	unlock_rsb(r);
3809 	put_rsb(r);
3810 }
3811 
3812 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3813 {
3814 	struct dlm_lkb *lkb;
3815 	int error;
3816 
3817 	error = find_lkb(ls, ms->m_remid, &lkb);
3818 	if (error) {
3819 		log_debug(ls, "receive_convert_reply from %d no lkb %x",
3820 			  ms->m_header.h_nodeid, ms->m_remid);
3821 		return;
3822 	}
3823 
3824 	_receive_convert_reply(lkb, ms);
3825 	dlm_put_lkb(lkb);
3826 }
3827 
3828 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3829 {
3830 	struct dlm_rsb *r = lkb->lkb_resource;
3831 	int error;
3832 
3833 	hold_rsb(r);
3834 	lock_rsb(r);
3835 
3836 	error = validate_message(lkb, ms);
3837 	if (error)
3838 		goto out;
3839 
3840 	/* stub reply can happen with waiters_mutex held */
3841 	error = remove_from_waiters_ms(lkb, ms);
3842 	if (error)
3843 		goto out;
3844 
3845 	/* this is the value returned from do_unlock() on the master */
3846 
3847 	switch (ms->m_result) {
3848 	case -DLM_EUNLOCK:
3849 		receive_flags_reply(lkb, ms);
3850 		remove_lock_pc(r, lkb);
3851 		queue_cast(r, lkb, -DLM_EUNLOCK);
3852 		break;
3853 	case -ENOENT:
3854 		break;
3855 	default:
3856 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
3857 			  lkb->lkb_id, ms->m_result);
3858 	}
3859  out:
3860 	unlock_rsb(r);
3861 	put_rsb(r);
3862 }
3863 
3864 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3865 {
3866 	struct dlm_lkb *lkb;
3867 	int error;
3868 
3869 	error = find_lkb(ls, ms->m_remid, &lkb);
3870 	if (error) {
3871 		log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3872 			  ms->m_header.h_nodeid, ms->m_remid);
3873 		return;
3874 	}
3875 
3876 	_receive_unlock_reply(lkb, ms);
3877 	dlm_put_lkb(lkb);
3878 }
3879 
3880 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3881 {
3882 	struct dlm_rsb *r = lkb->lkb_resource;
3883 	int error;
3884 
3885 	hold_rsb(r);
3886 	lock_rsb(r);
3887 
3888 	error = validate_message(lkb, ms);
3889 	if (error)
3890 		goto out;
3891 
3892 	/* stub reply can happen with waiters_mutex held */
3893 	error = remove_from_waiters_ms(lkb, ms);
3894 	if (error)
3895 		goto out;
3896 
3897 	/* this is the value returned from do_cancel() on the master */
3898 
3899 	switch (ms->m_result) {
3900 	case -DLM_ECANCEL:
3901 		receive_flags_reply(lkb, ms);
3902 		revert_lock_pc(r, lkb);
3903 		queue_cast(r, lkb, -DLM_ECANCEL);
3904 		break;
3905 	case 0:
3906 		break;
3907 	default:
3908 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
3909 			  lkb->lkb_id, ms->m_result);
3910 	}
3911  out:
3912 	unlock_rsb(r);
3913 	put_rsb(r);
3914 }
3915 
3916 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3917 {
3918 	struct dlm_lkb *lkb;
3919 	int error;
3920 
3921 	error = find_lkb(ls, ms->m_remid, &lkb);
3922 	if (error) {
3923 		log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3924 			  ms->m_header.h_nodeid, ms->m_remid);
3925 		return;
3926 	}
3927 
3928 	_receive_cancel_reply(lkb, ms);
3929 	dlm_put_lkb(lkb);
3930 }
3931 
3932 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3933 {
3934 	struct dlm_lkb *lkb;
3935 	struct dlm_rsb *r;
3936 	int error, ret_nodeid;
3937 
3938 	error = find_lkb(ls, ms->m_lkid, &lkb);
3939 	if (error) {
3940 		log_error(ls, "receive_lookup_reply no lkb");
3941 		return;
3942 	}
3943 
3944 	/* ms->m_result is the value returned by dlm_dir_lookup on dir node
3945 	   FIXME: will a non-zero error ever be returned? */
3946 
3947 	r = lkb->lkb_resource;
3948 	hold_rsb(r);
3949 	lock_rsb(r);
3950 
3951 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3952 	if (error)
3953 		goto out;
3954 
3955 	ret_nodeid = ms->m_nodeid;
3956 	if (ret_nodeid == dlm_our_nodeid()) {
3957 		r->res_nodeid = 0;
3958 		ret_nodeid = 0;
3959 		r->res_first_lkid = 0;
3960 	} else {
3961 		/* set_master() will copy res_nodeid to lkb_nodeid */
3962 		r->res_nodeid = ret_nodeid;
3963 	}
3964 
3965 	if (is_overlap(lkb)) {
3966 		log_debug(ls, "receive_lookup_reply %x unlock %x",
3967 			  lkb->lkb_id, lkb->lkb_flags);
3968 		queue_cast_overlap(r, lkb);
3969 		unhold_lkb(lkb); /* undoes create_lkb() */
3970 		goto out_list;
3971 	}
3972 
3973 	_request_lock(r, lkb);
3974 
3975  out_list:
3976 	if (!ret_nodeid)
3977 		process_lookup_list(r);
3978  out:
3979 	unlock_rsb(r);
3980 	put_rsb(r);
3981 	dlm_put_lkb(lkb);
3982 }
3983 
3984 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3985 {
3986 	if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3987 		log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3988 			  ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3989 			  ms->m_remid, ms->m_result);
3990 		return;
3991 	}
3992 
3993 	switch (ms->m_type) {
3994 
3995 	/* messages sent to a master node */
3996 
3997 	case DLM_MSG_REQUEST:
3998 		receive_request(ls, ms);
3999 		break;
4000 
4001 	case DLM_MSG_CONVERT:
4002 		receive_convert(ls, ms);
4003 		break;
4004 
4005 	case DLM_MSG_UNLOCK:
4006 		receive_unlock(ls, ms);
4007 		break;
4008 
4009 	case DLM_MSG_CANCEL:
4010 		receive_cancel(ls, ms);
4011 		break;
4012 
4013 	/* messages sent from a master node (replies to above) */
4014 
4015 	case DLM_MSG_REQUEST_REPLY:
4016 		receive_request_reply(ls, ms);
4017 		break;
4018 
4019 	case DLM_MSG_CONVERT_REPLY:
4020 		receive_convert_reply(ls, ms);
4021 		break;
4022 
4023 	case DLM_MSG_UNLOCK_REPLY:
4024 		receive_unlock_reply(ls, ms);
4025 		break;
4026 
4027 	case DLM_MSG_CANCEL_REPLY:
4028 		receive_cancel_reply(ls, ms);
4029 		break;
4030 
4031 	/* messages sent from a master node (only two types of async msg) */
4032 
4033 	case DLM_MSG_GRANT:
4034 		receive_grant(ls, ms);
4035 		break;
4036 
4037 	case DLM_MSG_BAST:
4038 		receive_bast(ls, ms);
4039 		break;
4040 
4041 	/* messages sent to a dir node */
4042 
4043 	case DLM_MSG_LOOKUP:
4044 		receive_lookup(ls, ms);
4045 		break;
4046 
4047 	case DLM_MSG_REMOVE:
4048 		receive_remove(ls, ms);
4049 		break;
4050 
4051 	/* messages sent from a dir node (remove has no reply) */
4052 
4053 	case DLM_MSG_LOOKUP_REPLY:
4054 		receive_lookup_reply(ls, ms);
4055 		break;
4056 
4057 	/* other messages */
4058 
4059 	case DLM_MSG_PURGE:
4060 		receive_purge(ls, ms);
4061 		break;
4062 
4063 	default:
4064 		log_error(ls, "unknown message type %d", ms->m_type);
4065 	}
4066 }
4067 
4068 /* If the lockspace is in recovery mode (locking stopped), then normal
4069    messages are saved on the requestqueue for processing after recovery is
4070    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4071    messages off the requestqueue before we process new ones. This occurs right
4072    after recovery completes when we transition from saving all messages on
4073    requestqueue, to processing all the saved messages, to processing new
4074    messages as they arrive. */
4075 
4076 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4077 				int nodeid)
4078 {
4079 	if (dlm_locking_stopped(ls)) {
4080 		dlm_add_requestqueue(ls, nodeid, ms);
4081 	} else {
4082 		dlm_wait_requestqueue(ls);
4083 		_receive_message(ls, ms);
4084 	}
4085 }
4086 
4087 /* This is called by dlm_recoverd to process messages that were saved on
4088    the requestqueue. */
4089 
4090 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
4091 {
4092 	_receive_message(ls, ms);
4093 }
4094 
4095 /* This is called by the midcomms layer when something is received for
4096    the lockspace.  It could be either a MSG (normal message sent as part of
4097    standard locking activity) or an RCOM (recovery message sent as part of
4098    lockspace recovery). */
4099 
4100 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4101 {
4102 	struct dlm_header *hd = &p->header;
4103 	struct dlm_ls *ls;
4104 	int type = 0;
4105 
4106 	switch (hd->h_cmd) {
4107 	case DLM_MSG:
4108 		dlm_message_in(&p->message);
4109 		type = p->message.m_type;
4110 		break;
4111 	case DLM_RCOM:
4112 		dlm_rcom_in(&p->rcom);
4113 		type = p->rcom.rc_type;
4114 		break;
4115 	default:
4116 		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4117 		return;
4118 	}
4119 
4120 	if (hd->h_nodeid != nodeid) {
4121 		log_print("invalid h_nodeid %d from %d lockspace %x",
4122 			  hd->h_nodeid, nodeid, hd->h_lockspace);
4123 		return;
4124 	}
4125 
4126 	ls = dlm_find_lockspace_global(hd->h_lockspace);
4127 	if (!ls) {
4128 		if (dlm_config.ci_log_debug)
4129 			log_print("invalid lockspace %x from %d cmd %d type %d",
4130 				  hd->h_lockspace, nodeid, hd->h_cmd, type);
4131 
4132 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4133 			dlm_send_ls_not_ready(nodeid, &p->rcom);
4134 		return;
4135 	}
4136 
4137 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4138 	   be inactive (in this ls) before transitioning to recovery mode */
4139 
4140 	down_read(&ls->ls_recv_active);
4141 	if (hd->h_cmd == DLM_MSG)
4142 		dlm_receive_message(ls, &p->message, nodeid);
4143 	else
4144 		dlm_receive_rcom(ls, &p->rcom, nodeid);
4145 	up_read(&ls->ls_recv_active);
4146 
4147 	dlm_put_lockspace(ls);
4148 }
4149 
4150 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4151 				   struct dlm_message *ms_stub)
4152 {
4153 	if (middle_conversion(lkb)) {
4154 		hold_lkb(lkb);
4155 		memset(ms_stub, 0, sizeof(struct dlm_message));
4156 		ms_stub->m_flags = DLM_IFL_STUB_MS;
4157 		ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4158 		ms_stub->m_result = -EINPROGRESS;
4159 		ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4160 		_receive_convert_reply(lkb, ms_stub);
4161 
4162 		/* Same special case as in receive_rcom_lock_args() */
4163 		lkb->lkb_grmode = DLM_LOCK_IV;
4164 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4165 		unhold_lkb(lkb);
4166 
4167 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4168 		lkb->lkb_flags |= DLM_IFL_RESEND;
4169 	}
4170 
4171 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4172 	   conversions are async; there's no reply from the remote master */
4173 }
4174 
4175 /* A waiting lkb needs recovery if the master node has failed, or
4176    the master node is changing (only when no directory is used) */
4177 
4178 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4179 {
4180 	if (dlm_is_removed(ls, lkb->lkb_nodeid))
4181 		return 1;
4182 
4183 	if (!dlm_no_directory(ls))
4184 		return 0;
4185 
4186 	if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4187 		return 1;
4188 
4189 	return 0;
4190 }
4191 
4192 /* Recovery for locks that are waiting for replies from nodes that are now
4193    gone.  We can just complete unlocks and cancels by faking a reply from the
4194    dead node.  Requests and up-conversions we flag to be resent after
4195    recovery.  Down-conversions can just be completed with a fake reply like
4196    unlocks.  Conversions between PR and CW need special attention. */
4197 
4198 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4199 {
4200 	struct dlm_lkb *lkb, *safe;
4201 	struct dlm_message *ms_stub;
4202 	int wait_type, stub_unlock_result, stub_cancel_result;
4203 
4204 	ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
4205 	if (!ms_stub) {
4206 		log_error(ls, "dlm_recover_waiters_pre no mem");
4207 		return;
4208 	}
4209 
4210 	mutex_lock(&ls->ls_waiters_mutex);
4211 
4212 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4213 
4214 		/* exclude debug messages about unlocks because there can be so
4215 		   many and they aren't very interesting */
4216 
4217 		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4218 			log_debug(ls, "recover_waiter %x nodeid %d "
4219 				  "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
4220 				  lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
4221 		}
4222 
4223 		/* all outstanding lookups, regardless of destination  will be
4224 		   resent after recovery is done */
4225 
4226 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4227 			lkb->lkb_flags |= DLM_IFL_RESEND;
4228 			continue;
4229 		}
4230 
4231 		if (!waiter_needs_recovery(ls, lkb))
4232 			continue;
4233 
4234 		wait_type = lkb->lkb_wait_type;
4235 		stub_unlock_result = -DLM_EUNLOCK;
4236 		stub_cancel_result = -DLM_ECANCEL;
4237 
4238 		/* Main reply may have been received leaving a zero wait_type,
4239 		   but a reply for the overlapping op may not have been
4240 		   received.  In that case we need to fake the appropriate
4241 		   reply for the overlap op. */
4242 
4243 		if (!wait_type) {
4244 			if (is_overlap_cancel(lkb)) {
4245 				wait_type = DLM_MSG_CANCEL;
4246 				if (lkb->lkb_grmode == DLM_LOCK_IV)
4247 					stub_cancel_result = 0;
4248 			}
4249 			if (is_overlap_unlock(lkb)) {
4250 				wait_type = DLM_MSG_UNLOCK;
4251 				if (lkb->lkb_grmode == DLM_LOCK_IV)
4252 					stub_unlock_result = -ENOENT;
4253 			}
4254 
4255 			log_debug(ls, "rwpre overlap %x %x %d %d %d",
4256 				  lkb->lkb_id, lkb->lkb_flags, wait_type,
4257 				  stub_cancel_result, stub_unlock_result);
4258 		}
4259 
4260 		switch (wait_type) {
4261 
4262 		case DLM_MSG_REQUEST:
4263 			lkb->lkb_flags |= DLM_IFL_RESEND;
4264 			break;
4265 
4266 		case DLM_MSG_CONVERT:
4267 			recover_convert_waiter(ls, lkb, ms_stub);
4268 			break;
4269 
4270 		case DLM_MSG_UNLOCK:
4271 			hold_lkb(lkb);
4272 			memset(ms_stub, 0, sizeof(struct dlm_message));
4273 			ms_stub->m_flags = DLM_IFL_STUB_MS;
4274 			ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
4275 			ms_stub->m_result = stub_unlock_result;
4276 			ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4277 			_receive_unlock_reply(lkb, ms_stub);
4278 			dlm_put_lkb(lkb);
4279 			break;
4280 
4281 		case DLM_MSG_CANCEL:
4282 			hold_lkb(lkb);
4283 			memset(ms_stub, 0, sizeof(struct dlm_message));
4284 			ms_stub->m_flags = DLM_IFL_STUB_MS;
4285 			ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
4286 			ms_stub->m_result = stub_cancel_result;
4287 			ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4288 			_receive_cancel_reply(lkb, ms_stub);
4289 			dlm_put_lkb(lkb);
4290 			break;
4291 
4292 		default:
4293 			log_error(ls, "invalid lkb wait_type %d %d",
4294 				  lkb->lkb_wait_type, wait_type);
4295 		}
4296 		schedule();
4297 	}
4298 	mutex_unlock(&ls->ls_waiters_mutex);
4299 	kfree(ms_stub);
4300 }
4301 
4302 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4303 {
4304 	struct dlm_lkb *lkb;
4305 	int found = 0;
4306 
4307 	mutex_lock(&ls->ls_waiters_mutex);
4308 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4309 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
4310 			hold_lkb(lkb);
4311 			found = 1;
4312 			break;
4313 		}
4314 	}
4315 	mutex_unlock(&ls->ls_waiters_mutex);
4316 
4317 	if (!found)
4318 		lkb = NULL;
4319 	return lkb;
4320 }
4321 
4322 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4323    master or dir-node for r.  Processing the lkb may result in it being placed
4324    back on waiters. */
4325 
4326 /* We do this after normal locking has been enabled and any saved messages
4327    (in requestqueue) have been processed.  We should be confident that at
4328    this point we won't get or process a reply to any of these waiting
4329    operations.  But, new ops may be coming in on the rsbs/locks here from
4330    userspace or remotely. */
4331 
4332 /* there may have been an overlap unlock/cancel prior to recovery or after
4333    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4334    overlap flag would just have been set and nothing new sent.  we can be
4335    confident here than any replies to either the initial op or overlap ops
4336    prior to recovery have been received. */
4337 
4338 int dlm_recover_waiters_post(struct dlm_ls *ls)
4339 {
4340 	struct dlm_lkb *lkb;
4341 	struct dlm_rsb *r;
4342 	int error = 0, mstype, err, oc, ou;
4343 
4344 	while (1) {
4345 		if (dlm_locking_stopped(ls)) {
4346 			log_debug(ls, "recover_waiters_post aborted");
4347 			error = -EINTR;
4348 			break;
4349 		}
4350 
4351 		lkb = find_resend_waiter(ls);
4352 		if (!lkb)
4353 			break;
4354 
4355 		r = lkb->lkb_resource;
4356 		hold_rsb(r);
4357 		lock_rsb(r);
4358 
4359 		mstype = lkb->lkb_wait_type;
4360 		oc = is_overlap_cancel(lkb);
4361 		ou = is_overlap_unlock(lkb);
4362 		err = 0;
4363 
4364 		log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
4365 			  lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
4366 
4367 		/* At this point we assume that we won't get a reply to any
4368 		   previous op or overlap op on this lock.  First, do a big
4369 		   remove_from_waiters() for all previous ops. */
4370 
4371 		lkb->lkb_flags &= ~DLM_IFL_RESEND;
4372 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4373 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4374 		lkb->lkb_wait_type = 0;
4375 		lkb->lkb_wait_count = 0;
4376 		mutex_lock(&ls->ls_waiters_mutex);
4377 		list_del_init(&lkb->lkb_wait_reply);
4378 		mutex_unlock(&ls->ls_waiters_mutex);
4379 		unhold_lkb(lkb); /* for waiters list */
4380 
4381 		if (oc || ou) {
4382 			/* do an unlock or cancel instead of resending */
4383 			switch (mstype) {
4384 			case DLM_MSG_LOOKUP:
4385 			case DLM_MSG_REQUEST:
4386 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4387 							-DLM_ECANCEL);
4388 				unhold_lkb(lkb); /* undoes create_lkb() */
4389 				break;
4390 			case DLM_MSG_CONVERT:
4391 				if (oc) {
4392 					queue_cast(r, lkb, -DLM_ECANCEL);
4393 				} else {
4394 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4395 					_unlock_lock(r, lkb);
4396 				}
4397 				break;
4398 			default:
4399 				err = 1;
4400 			}
4401 		} else {
4402 			switch (mstype) {
4403 			case DLM_MSG_LOOKUP:
4404 			case DLM_MSG_REQUEST:
4405 				_request_lock(r, lkb);
4406 				if (is_master(r))
4407 					confirm_master(r, 0);
4408 				break;
4409 			case DLM_MSG_CONVERT:
4410 				_convert_lock(r, lkb);
4411 				break;
4412 			default:
4413 				err = 1;
4414 			}
4415 		}
4416 
4417 		if (err)
4418 			log_error(ls, "recover_waiters_post %x %d %x %d %d",
4419 			  	  lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4420 		unlock_rsb(r);
4421 		put_rsb(r);
4422 		dlm_put_lkb(lkb);
4423 	}
4424 
4425 	return error;
4426 }
4427 
4428 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4429 			int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4430 {
4431 	struct dlm_ls *ls = r->res_ls;
4432 	struct dlm_lkb *lkb, *safe;
4433 
4434 	list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4435 		if (test(ls, lkb)) {
4436 			rsb_set_flag(r, RSB_LOCKS_PURGED);
4437 			del_lkb(r, lkb);
4438 			/* this put should free the lkb */
4439 			if (!dlm_put_lkb(lkb))
4440 				log_error(ls, "purged lkb not released");
4441 		}
4442 	}
4443 }
4444 
4445 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4446 {
4447 	return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4448 }
4449 
4450 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4451 {
4452 	return is_master_copy(lkb);
4453 }
4454 
4455 static void purge_dead_locks(struct dlm_rsb *r)
4456 {
4457 	purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4458 	purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4459 	purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4460 }
4461 
4462 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4463 {
4464 	purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4465 	purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4466 	purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4467 }
4468 
4469 /* Get rid of locks held by nodes that are gone. */
4470 
4471 int dlm_purge_locks(struct dlm_ls *ls)
4472 {
4473 	struct dlm_rsb *r;
4474 
4475 	log_debug(ls, "dlm_purge_locks");
4476 
4477 	down_write(&ls->ls_root_sem);
4478 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4479 		hold_rsb(r);
4480 		lock_rsb(r);
4481 		if (is_master(r))
4482 			purge_dead_locks(r);
4483 		unlock_rsb(r);
4484 		unhold_rsb(r);
4485 
4486 		schedule();
4487 	}
4488 	up_write(&ls->ls_root_sem);
4489 
4490 	return 0;
4491 }
4492 
4493 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4494 {
4495 	struct rb_node *n;
4496 	struct dlm_rsb *r, *r_ret = NULL;
4497 
4498 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
4499 	for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
4500 		r = rb_entry(n, struct dlm_rsb, res_hashnode);
4501 		if (!rsb_flag(r, RSB_LOCKS_PURGED))
4502 			continue;
4503 		hold_rsb(r);
4504 		rsb_clear_flag(r, RSB_LOCKS_PURGED);
4505 		r_ret = r;
4506 		break;
4507 	}
4508 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4509 	return r_ret;
4510 }
4511 
4512 void dlm_grant_after_purge(struct dlm_ls *ls)
4513 {
4514 	struct dlm_rsb *r;
4515 	int bucket = 0;
4516 
4517 	while (1) {
4518 		r = find_purged_rsb(ls, bucket);
4519 		if (!r) {
4520 			if (bucket == ls->ls_rsbtbl_size - 1)
4521 				break;
4522 			bucket++;
4523 			continue;
4524 		}
4525 		lock_rsb(r);
4526 		if (is_master(r)) {
4527 			grant_pending_locks(r);
4528 			confirm_master(r, 0);
4529 		}
4530 		unlock_rsb(r);
4531 		put_rsb(r);
4532 		schedule();
4533 	}
4534 }
4535 
4536 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4537 					 uint32_t remid)
4538 {
4539 	struct dlm_lkb *lkb;
4540 
4541 	list_for_each_entry(lkb, head, lkb_statequeue) {
4542 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4543 			return lkb;
4544 	}
4545 	return NULL;
4546 }
4547 
4548 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4549 				    uint32_t remid)
4550 {
4551 	struct dlm_lkb *lkb;
4552 
4553 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4554 	if (lkb)
4555 		return lkb;
4556 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4557 	if (lkb)
4558 		return lkb;
4559 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4560 	if (lkb)
4561 		return lkb;
4562 	return NULL;
4563 }
4564 
4565 /* needs at least dlm_rcom + rcom_lock */
4566 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4567 				  struct dlm_rsb *r, struct dlm_rcom *rc)
4568 {
4569 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4570 
4571 	lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4572 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4573 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4574 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4575 	lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4576 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
4577 	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4578 	lkb->lkb_rqmode = rl->rl_rqmode;
4579 	lkb->lkb_grmode = rl->rl_grmode;
4580 	/* don't set lkb_status because add_lkb wants to itself */
4581 
4582 	lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
4583 	lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
4584 
4585 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4586 		int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4587 			 sizeof(struct rcom_lock);
4588 		if (lvblen > ls->ls_lvblen)
4589 			return -EINVAL;
4590 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4591 		if (!lkb->lkb_lvbptr)
4592 			return -ENOMEM;
4593 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4594 	}
4595 
4596 	/* Conversions between PR and CW (middle modes) need special handling.
4597 	   The real granted mode of these converting locks cannot be determined
4598 	   until all locks have been rebuilt on the rsb (recover_conversion) */
4599 
4600 	if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4601 	    middle_conversion(lkb)) {
4602 		rl->rl_status = DLM_LKSTS_CONVERT;
4603 		lkb->lkb_grmode = DLM_LOCK_IV;
4604 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
4605 	}
4606 
4607 	return 0;
4608 }
4609 
4610 /* This lkb may have been recovered in a previous aborted recovery so we need
4611    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4612    If so we just send back a standard reply.  If not, we create a new lkb with
4613    the given values and send back our lkid.  We send back our lkid by sending
4614    back the rcom_lock struct we got but with the remid field filled in. */
4615 
4616 /* needs at least dlm_rcom + rcom_lock */
4617 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4618 {
4619 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4620 	struct dlm_rsb *r;
4621 	struct dlm_lkb *lkb;
4622 	int error;
4623 
4624 	if (rl->rl_parent_lkid) {
4625 		error = -EOPNOTSUPP;
4626 		goto out;
4627 	}
4628 
4629 	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4630 			 R_MASTER, &r);
4631 	if (error)
4632 		goto out;
4633 
4634 	lock_rsb(r);
4635 
4636 	lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4637 	if (lkb) {
4638 		error = -EEXIST;
4639 		goto out_remid;
4640 	}
4641 
4642 	error = create_lkb(ls, &lkb);
4643 	if (error)
4644 		goto out_unlock;
4645 
4646 	error = receive_rcom_lock_args(ls, lkb, r, rc);
4647 	if (error) {
4648 		__put_lkb(ls, lkb);
4649 		goto out_unlock;
4650 	}
4651 
4652 	attach_lkb(r, lkb);
4653 	add_lkb(r, lkb, rl->rl_status);
4654 	error = 0;
4655 
4656  out_remid:
4657 	/* this is the new value returned to the lock holder for
4658 	   saving in its process-copy lkb */
4659 	rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4660 
4661  out_unlock:
4662 	unlock_rsb(r);
4663 	put_rsb(r);
4664  out:
4665 	if (error)
4666 		log_debug(ls, "recover_master_copy %d %x", error,
4667 			  le32_to_cpu(rl->rl_lkid));
4668 	rl->rl_result = cpu_to_le32(error);
4669 	return error;
4670 }
4671 
4672 /* needs at least dlm_rcom + rcom_lock */
4673 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4674 {
4675 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4676 	struct dlm_rsb *r;
4677 	struct dlm_lkb *lkb;
4678 	int error;
4679 
4680 	error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4681 	if (error) {
4682 		log_error(ls, "recover_process_copy no lkid %x",
4683 				le32_to_cpu(rl->rl_lkid));
4684 		return error;
4685 	}
4686 
4687 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4688 
4689 	error = le32_to_cpu(rl->rl_result);
4690 
4691 	r = lkb->lkb_resource;
4692 	hold_rsb(r);
4693 	lock_rsb(r);
4694 
4695 	switch (error) {
4696 	case -EBADR:
4697 		/* There's a chance the new master received our lock before
4698 		   dlm_recover_master_reply(), this wouldn't happen if we did
4699 		   a barrier between recover_masters and recover_locks. */
4700 		log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4701 			  (unsigned long)r, r->res_name);
4702 		dlm_send_rcom_lock(r, lkb);
4703 		goto out;
4704 	case -EEXIST:
4705 		log_debug(ls, "master copy exists %x", lkb->lkb_id);
4706 		/* fall through */
4707 	case 0:
4708 		lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4709 		break;
4710 	default:
4711 		log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4712 			  error, lkb->lkb_id);
4713 	}
4714 
4715 	/* an ack for dlm_recover_locks() which waits for replies from
4716 	   all the locks it sends to new masters */
4717 	dlm_recovered_lock(r);
4718  out:
4719 	unlock_rsb(r);
4720 	put_rsb(r);
4721 	dlm_put_lkb(lkb);
4722 
4723 	return 0;
4724 }
4725 
4726 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4727 		     int mode, uint32_t flags, void *name, unsigned int namelen,
4728 		     unsigned long timeout_cs)
4729 {
4730 	struct dlm_lkb *lkb;
4731 	struct dlm_args args;
4732 	int error;
4733 
4734 	dlm_lock_recovery(ls);
4735 
4736 	error = create_lkb(ls, &lkb);
4737 	if (error) {
4738 		kfree(ua);
4739 		goto out;
4740 	}
4741 
4742 	if (flags & DLM_LKF_VALBLK) {
4743 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4744 		if (!ua->lksb.sb_lvbptr) {
4745 			kfree(ua);
4746 			__put_lkb(ls, lkb);
4747 			error = -ENOMEM;
4748 			goto out;
4749 		}
4750 	}
4751 
4752 	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
4753 	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
4754 	   lock and that lkb_astparam is the dlm_user_args structure. */
4755 
4756 	error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4757 			      fake_astfn, ua, fake_bastfn, &args);
4758 	lkb->lkb_flags |= DLM_IFL_USER;
4759 
4760 	if (error) {
4761 		__put_lkb(ls, lkb);
4762 		goto out;
4763 	}
4764 
4765 	error = request_lock(ls, lkb, name, namelen, &args);
4766 
4767 	switch (error) {
4768 	case 0:
4769 		break;
4770 	case -EINPROGRESS:
4771 		error = 0;
4772 		break;
4773 	case -EAGAIN:
4774 		error = 0;
4775 		/* fall through */
4776 	default:
4777 		__put_lkb(ls, lkb);
4778 		goto out;
4779 	}
4780 
4781 	/* add this new lkb to the per-process list of locks */
4782 	spin_lock(&ua->proc->locks_spin);
4783 	hold_lkb(lkb);
4784 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4785 	spin_unlock(&ua->proc->locks_spin);
4786  out:
4787 	dlm_unlock_recovery(ls);
4788 	return error;
4789 }
4790 
4791 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4792 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4793 		     unsigned long timeout_cs)
4794 {
4795 	struct dlm_lkb *lkb;
4796 	struct dlm_args args;
4797 	struct dlm_user_args *ua;
4798 	int error;
4799 
4800 	dlm_lock_recovery(ls);
4801 
4802 	error = find_lkb(ls, lkid, &lkb);
4803 	if (error)
4804 		goto out;
4805 
4806 	/* user can change the params on its lock when it converts it, or
4807 	   add an lvb that didn't exist before */
4808 
4809 	ua = lkb->lkb_ua;
4810 
4811 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4812 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4813 		if (!ua->lksb.sb_lvbptr) {
4814 			error = -ENOMEM;
4815 			goto out_put;
4816 		}
4817 	}
4818 	if (lvb_in && ua->lksb.sb_lvbptr)
4819 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4820 
4821 	ua->xid = ua_tmp->xid;
4822 	ua->castparam = ua_tmp->castparam;
4823 	ua->castaddr = ua_tmp->castaddr;
4824 	ua->bastparam = ua_tmp->bastparam;
4825 	ua->bastaddr = ua_tmp->bastaddr;
4826 	ua->user_lksb = ua_tmp->user_lksb;
4827 
4828 	error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4829 			      fake_astfn, ua, fake_bastfn, &args);
4830 	if (error)
4831 		goto out_put;
4832 
4833 	error = convert_lock(ls, lkb, &args);
4834 
4835 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4836 		error = 0;
4837  out_put:
4838 	dlm_put_lkb(lkb);
4839  out:
4840 	dlm_unlock_recovery(ls);
4841 	kfree(ua_tmp);
4842 	return error;
4843 }
4844 
4845 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4846 		    uint32_t flags, uint32_t lkid, char *lvb_in)
4847 {
4848 	struct dlm_lkb *lkb;
4849 	struct dlm_args args;
4850 	struct dlm_user_args *ua;
4851 	int error;
4852 
4853 	dlm_lock_recovery(ls);
4854 
4855 	error = find_lkb(ls, lkid, &lkb);
4856 	if (error)
4857 		goto out;
4858 
4859 	ua = lkb->lkb_ua;
4860 
4861 	if (lvb_in && ua->lksb.sb_lvbptr)
4862 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4863 	if (ua_tmp->castparam)
4864 		ua->castparam = ua_tmp->castparam;
4865 	ua->user_lksb = ua_tmp->user_lksb;
4866 
4867 	error = set_unlock_args(flags, ua, &args);
4868 	if (error)
4869 		goto out_put;
4870 
4871 	error = unlock_lock(ls, lkb, &args);
4872 
4873 	if (error == -DLM_EUNLOCK)
4874 		error = 0;
4875 	/* from validate_unlock_args() */
4876 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4877 		error = 0;
4878 	if (error)
4879 		goto out_put;
4880 
4881 	spin_lock(&ua->proc->locks_spin);
4882 	/* dlm_user_add_cb() may have already taken lkb off the proc list */
4883 	if (!list_empty(&lkb->lkb_ownqueue))
4884 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4885 	spin_unlock(&ua->proc->locks_spin);
4886  out_put:
4887 	dlm_put_lkb(lkb);
4888  out:
4889 	dlm_unlock_recovery(ls);
4890 	kfree(ua_tmp);
4891 	return error;
4892 }
4893 
4894 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4895 		    uint32_t flags, uint32_t lkid)
4896 {
4897 	struct dlm_lkb *lkb;
4898 	struct dlm_args args;
4899 	struct dlm_user_args *ua;
4900 	int error;
4901 
4902 	dlm_lock_recovery(ls);
4903 
4904 	error = find_lkb(ls, lkid, &lkb);
4905 	if (error)
4906 		goto out;
4907 
4908 	ua = lkb->lkb_ua;
4909 	if (ua_tmp->castparam)
4910 		ua->castparam = ua_tmp->castparam;
4911 	ua->user_lksb = ua_tmp->user_lksb;
4912 
4913 	error = set_unlock_args(flags, ua, &args);
4914 	if (error)
4915 		goto out_put;
4916 
4917 	error = cancel_lock(ls, lkb, &args);
4918 
4919 	if (error == -DLM_ECANCEL)
4920 		error = 0;
4921 	/* from validate_unlock_args() */
4922 	if (error == -EBUSY)
4923 		error = 0;
4924  out_put:
4925 	dlm_put_lkb(lkb);
4926  out:
4927 	dlm_unlock_recovery(ls);
4928 	kfree(ua_tmp);
4929 	return error;
4930 }
4931 
4932 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4933 {
4934 	struct dlm_lkb *lkb;
4935 	struct dlm_args args;
4936 	struct dlm_user_args *ua;
4937 	struct dlm_rsb *r;
4938 	int error;
4939 
4940 	dlm_lock_recovery(ls);
4941 
4942 	error = find_lkb(ls, lkid, &lkb);
4943 	if (error)
4944 		goto out;
4945 
4946 	ua = lkb->lkb_ua;
4947 
4948 	error = set_unlock_args(flags, ua, &args);
4949 	if (error)
4950 		goto out_put;
4951 
4952 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4953 
4954 	r = lkb->lkb_resource;
4955 	hold_rsb(r);
4956 	lock_rsb(r);
4957 
4958 	error = validate_unlock_args(lkb, &args);
4959 	if (error)
4960 		goto out_r;
4961 	lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4962 
4963 	error = _cancel_lock(r, lkb);
4964  out_r:
4965 	unlock_rsb(r);
4966 	put_rsb(r);
4967 
4968 	if (error == -DLM_ECANCEL)
4969 		error = 0;
4970 	/* from validate_unlock_args() */
4971 	if (error == -EBUSY)
4972 		error = 0;
4973  out_put:
4974 	dlm_put_lkb(lkb);
4975  out:
4976 	dlm_unlock_recovery(ls);
4977 	return error;
4978 }
4979 
4980 /* lkb's that are removed from the waiters list by revert are just left on the
4981    orphans list with the granted orphan locks, to be freed by purge */
4982 
4983 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4984 {
4985 	struct dlm_args args;
4986 	int error;
4987 
4988 	hold_lkb(lkb);
4989 	mutex_lock(&ls->ls_orphans_mutex);
4990 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4991 	mutex_unlock(&ls->ls_orphans_mutex);
4992 
4993 	set_unlock_args(0, lkb->lkb_ua, &args);
4994 
4995 	error = cancel_lock(ls, lkb, &args);
4996 	if (error == -DLM_ECANCEL)
4997 		error = 0;
4998 	return error;
4999 }
5000 
5001 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
5002    Regardless of what rsb queue the lock is on, it's removed and freed. */
5003 
5004 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5005 {
5006 	struct dlm_args args;
5007 	int error;
5008 
5009 	set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
5010 
5011 	error = unlock_lock(ls, lkb, &args);
5012 	if (error == -DLM_EUNLOCK)
5013 		error = 0;
5014 	return error;
5015 }
5016 
5017 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
5018    (which does lock_rsb) due to deadlock with receiving a message that does
5019    lock_rsb followed by dlm_user_add_cb() */
5020 
5021 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
5022 				     struct dlm_user_proc *proc)
5023 {
5024 	struct dlm_lkb *lkb = NULL;
5025 
5026 	mutex_lock(&ls->ls_clear_proc_locks);
5027 	if (list_empty(&proc->locks))
5028 		goto out;
5029 
5030 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
5031 	list_del_init(&lkb->lkb_ownqueue);
5032 
5033 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5034 		lkb->lkb_flags |= DLM_IFL_ORPHAN;
5035 	else
5036 		lkb->lkb_flags |= DLM_IFL_DEAD;
5037  out:
5038 	mutex_unlock(&ls->ls_clear_proc_locks);
5039 	return lkb;
5040 }
5041 
5042 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
5043    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
5044    which we clear here. */
5045 
5046 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
5047    list, and no more device_writes should add lkb's to proc->locks list; so we
5048    shouldn't need to take asts_spin or locks_spin here.  this assumes that
5049    device reads/writes/closes are serialized -- FIXME: we may need to serialize
5050    them ourself. */
5051 
5052 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5053 {
5054 	struct dlm_lkb *lkb, *safe;
5055 
5056 	dlm_lock_recovery(ls);
5057 
5058 	while (1) {
5059 		lkb = del_proc_lock(ls, proc);
5060 		if (!lkb)
5061 			break;
5062 		del_timeout(lkb);
5063 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5064 			orphan_proc_lock(ls, lkb);
5065 		else
5066 			unlock_proc_lock(ls, lkb);
5067 
5068 		/* this removes the reference for the proc->locks list
5069 		   added by dlm_user_request, it may result in the lkb
5070 		   being freed */
5071 
5072 		dlm_put_lkb(lkb);
5073 	}
5074 
5075 	mutex_lock(&ls->ls_clear_proc_locks);
5076 
5077 	/* in-progress unlocks */
5078 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5079 		list_del_init(&lkb->lkb_ownqueue);
5080 		lkb->lkb_flags |= DLM_IFL_DEAD;
5081 		dlm_put_lkb(lkb);
5082 	}
5083 
5084 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5085 		memset(&lkb->lkb_callbacks, 0,
5086 		       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5087 		list_del_init(&lkb->lkb_cb_list);
5088 		dlm_put_lkb(lkb);
5089 	}
5090 
5091 	mutex_unlock(&ls->ls_clear_proc_locks);
5092 	dlm_unlock_recovery(ls);
5093 }
5094 
5095 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5096 {
5097 	struct dlm_lkb *lkb, *safe;
5098 
5099 	while (1) {
5100 		lkb = NULL;
5101 		spin_lock(&proc->locks_spin);
5102 		if (!list_empty(&proc->locks)) {
5103 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
5104 					 lkb_ownqueue);
5105 			list_del_init(&lkb->lkb_ownqueue);
5106 		}
5107 		spin_unlock(&proc->locks_spin);
5108 
5109 		if (!lkb)
5110 			break;
5111 
5112 		lkb->lkb_flags |= DLM_IFL_DEAD;
5113 		unlock_proc_lock(ls, lkb);
5114 		dlm_put_lkb(lkb); /* ref from proc->locks list */
5115 	}
5116 
5117 	spin_lock(&proc->locks_spin);
5118 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5119 		list_del_init(&lkb->lkb_ownqueue);
5120 		lkb->lkb_flags |= DLM_IFL_DEAD;
5121 		dlm_put_lkb(lkb);
5122 	}
5123 	spin_unlock(&proc->locks_spin);
5124 
5125 	spin_lock(&proc->asts_spin);
5126 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5127 		memset(&lkb->lkb_callbacks, 0,
5128 		       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5129 		list_del_init(&lkb->lkb_cb_list);
5130 		dlm_put_lkb(lkb);
5131 	}
5132 	spin_unlock(&proc->asts_spin);
5133 }
5134 
5135 /* pid of 0 means purge all orphans */
5136 
5137 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
5138 {
5139 	struct dlm_lkb *lkb, *safe;
5140 
5141 	mutex_lock(&ls->ls_orphans_mutex);
5142 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
5143 		if (pid && lkb->lkb_ownpid != pid)
5144 			continue;
5145 		unlock_proc_lock(ls, lkb);
5146 		list_del_init(&lkb->lkb_ownqueue);
5147 		dlm_put_lkb(lkb);
5148 	}
5149 	mutex_unlock(&ls->ls_orphans_mutex);
5150 }
5151 
5152 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
5153 {
5154 	struct dlm_message *ms;
5155 	struct dlm_mhandle *mh;
5156 	int error;
5157 
5158 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
5159 				DLM_MSG_PURGE, &ms, &mh);
5160 	if (error)
5161 		return error;
5162 	ms->m_nodeid = nodeid;
5163 	ms->m_pid = pid;
5164 
5165 	return send_message(mh, ms);
5166 }
5167 
5168 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5169 		   int nodeid, int pid)
5170 {
5171 	int error = 0;
5172 
5173 	if (nodeid != dlm_our_nodeid()) {
5174 		error = send_purge(ls, nodeid, pid);
5175 	} else {
5176 		dlm_lock_recovery(ls);
5177 		if (pid == current->pid)
5178 			purge_proc_locks(ls, proc);
5179 		else
5180 			do_purge(ls, nodeid, pid);
5181 		dlm_unlock_recovery(ls);
5182 	}
5183 	return error;
5184 }
5185 
5186