xref: /openbmc/linux/fs/dlm/lock.c (revision a1e58bbd)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 /* Central locking logic has four stages:
14 
15    dlm_lock()
16    dlm_unlock()
17 
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22 
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27 
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32 
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35 
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40 
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43 
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46 
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49 
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53 
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75 
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87 				    struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91 
92 /*
93  * Lock compatibilty matrix - thanks Steve
94  * UN = Unlocked state. Not really a state, used as a flag
95  * PD = Padding. Used to make the matrix a nice power of two in size
96  * Other states are the same as the VMS DLM.
97  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
98  */
99 
100 static const int __dlm_compat_matrix[8][8] = {
101       /* UN NL CR CW PR PW EX PD */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
104         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
105         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
106         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
107         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
108         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
109         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
110 };
111 
112 /*
113  * This defines the direction of transfer of LVB data.
114  * Granted mode is the row; requested mode is the column.
115  * Usage: matrix[grmode+1][rqmode+1]
116  * 1 = LVB is returned to the caller
117  * 0 = LVB is written to the resource
118  * -1 = nothing happens to the LVB
119  */
120 
121 const int dlm_lvb_operations[8][8] = {
122         /* UN   NL  CR  CW  PR  PW  EX  PD*/
123         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
124         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
125         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
126         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
127         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
128         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
131 };
132 
133 #define modes_compat(gr, rq) \
134 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
135 
136 int dlm_modes_compat(int mode1, int mode2)
137 {
138 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
139 }
140 
141 /*
142  * Compatibility matrix for conversions with QUECVT set.
143  * Granted mode is the row; requested mode is the column.
144  * Usage: matrix[grmode+1][rqmode+1]
145  */
146 
147 static const int __quecvt_compat_matrix[8][8] = {
148       /* UN NL CR CW PR PW EX PD */
149         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
150         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
151         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
152         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
153         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
154         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
155         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
156         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
157 };
158 
159 void dlm_print_lkb(struct dlm_lkb *lkb)
160 {
161 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
162 	       "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
163 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
164 	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
165 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
166 }
167 
168 void dlm_print_rsb(struct dlm_rsb *r)
169 {
170 	printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
171 	       r->res_nodeid, r->res_flags, r->res_first_lkid,
172 	       r->res_recover_locks_count, r->res_name);
173 }
174 
175 void dlm_dump_rsb(struct dlm_rsb *r)
176 {
177 	struct dlm_lkb *lkb;
178 
179 	dlm_print_rsb(r);
180 
181 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
182 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
183 	printk(KERN_ERR "rsb lookup list\n");
184 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
185 		dlm_print_lkb(lkb);
186 	printk(KERN_ERR "rsb grant queue:\n");
187 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
188 		dlm_print_lkb(lkb);
189 	printk(KERN_ERR "rsb convert queue:\n");
190 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
191 		dlm_print_lkb(lkb);
192 	printk(KERN_ERR "rsb wait queue:\n");
193 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
194 		dlm_print_lkb(lkb);
195 }
196 
197 /* Threads cannot use the lockspace while it's being recovered */
198 
199 static inline void dlm_lock_recovery(struct dlm_ls *ls)
200 {
201 	down_read(&ls->ls_in_recovery);
202 }
203 
204 void dlm_unlock_recovery(struct dlm_ls *ls)
205 {
206 	up_read(&ls->ls_in_recovery);
207 }
208 
209 int dlm_lock_recovery_try(struct dlm_ls *ls)
210 {
211 	return down_read_trylock(&ls->ls_in_recovery);
212 }
213 
214 static inline int can_be_queued(struct dlm_lkb *lkb)
215 {
216 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 }
218 
219 static inline int force_blocking_asts(struct dlm_lkb *lkb)
220 {
221 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 }
223 
224 static inline int is_demoted(struct dlm_lkb *lkb)
225 {
226 	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 }
228 
229 static inline int is_altmode(struct dlm_lkb *lkb)
230 {
231 	return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 }
233 
234 static inline int is_granted(struct dlm_lkb *lkb)
235 {
236 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
237 }
238 
239 static inline int is_remote(struct dlm_rsb *r)
240 {
241 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
242 	return !!r->res_nodeid;
243 }
244 
245 static inline int is_process_copy(struct dlm_lkb *lkb)
246 {
247 	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 }
249 
250 static inline int is_master_copy(struct dlm_lkb *lkb)
251 {
252 	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
253 		DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
254 	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 }
256 
257 static inline int middle_conversion(struct dlm_lkb *lkb)
258 {
259 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
260 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
261 		return 1;
262 	return 0;
263 }
264 
265 static inline int down_conversion(struct dlm_lkb *lkb)
266 {
267 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 }
269 
270 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
271 {
272 	return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 }
274 
275 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
276 {
277 	return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 }
279 
280 static inline int is_overlap(struct dlm_lkb *lkb)
281 {
282 	return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
283 				  DLM_IFL_OVERLAP_CANCEL));
284 }
285 
286 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
287 {
288 	if (is_master_copy(lkb))
289 		return;
290 
291 	del_timeout(lkb);
292 
293 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
294 
295 	/* if the operation was a cancel, then return -DLM_ECANCEL, if a
296 	   timeout caused the cancel then return -ETIMEDOUT */
297 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
298 		lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
299 		rv = -ETIMEDOUT;
300 	}
301 
302 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
303 		lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
304 		rv = -EDEADLK;
305 	}
306 
307 	lkb->lkb_lksb->sb_status = rv;
308 	lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
309 
310 	dlm_add_ast(lkb, AST_COMP);
311 }
312 
313 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
314 {
315 	queue_cast(r, lkb,
316 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
317 }
318 
319 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
320 {
321 	if (is_master_copy(lkb))
322 		send_bast(r, lkb, rqmode);
323 	else {
324 		lkb->lkb_bastmode = rqmode;
325 		dlm_add_ast(lkb, AST_BAST);
326 	}
327 }
328 
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332 
333 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
334 {
335 	struct dlm_rsb *r;
336 
337 	r = dlm_allocate_rsb(ls, len);
338 	if (!r)
339 		return NULL;
340 
341 	r->res_ls = ls;
342 	r->res_length = len;
343 	memcpy(r->res_name, name, len);
344 	mutex_init(&r->res_mutex);
345 
346 	INIT_LIST_HEAD(&r->res_lookup);
347 	INIT_LIST_HEAD(&r->res_grantqueue);
348 	INIT_LIST_HEAD(&r->res_convertqueue);
349 	INIT_LIST_HEAD(&r->res_waitqueue);
350 	INIT_LIST_HEAD(&r->res_root_list);
351 	INIT_LIST_HEAD(&r->res_recover_list);
352 
353 	return r;
354 }
355 
356 static int search_rsb_list(struct list_head *head, char *name, int len,
357 			   unsigned int flags, struct dlm_rsb **r_ret)
358 {
359 	struct dlm_rsb *r;
360 	int error = 0;
361 
362 	list_for_each_entry(r, head, res_hashchain) {
363 		if (len == r->res_length && !memcmp(name, r->res_name, len))
364 			goto found;
365 	}
366 	return -EBADR;
367 
368  found:
369 	if (r->res_nodeid && (flags & R_MASTER))
370 		error = -ENOTBLK;
371 	*r_ret = r;
372 	return error;
373 }
374 
375 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
376 		       unsigned int flags, struct dlm_rsb **r_ret)
377 {
378 	struct dlm_rsb *r;
379 	int error;
380 
381 	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
382 	if (!error) {
383 		kref_get(&r->res_ref);
384 		goto out;
385 	}
386 	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
387 	if (error)
388 		goto out;
389 
390 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
391 
392 	if (dlm_no_directory(ls))
393 		goto out;
394 
395 	if (r->res_nodeid == -1) {
396 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
397 		r->res_first_lkid = 0;
398 	} else if (r->res_nodeid > 0) {
399 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
400 		r->res_first_lkid = 0;
401 	} else {
402 		DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
403 		DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
404 	}
405  out:
406 	*r_ret = r;
407 	return error;
408 }
409 
410 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
411 		      unsigned int flags, struct dlm_rsb **r_ret)
412 {
413 	int error;
414 	write_lock(&ls->ls_rsbtbl[b].lock);
415 	error = _search_rsb(ls, name, len, b, flags, r_ret);
416 	write_unlock(&ls->ls_rsbtbl[b].lock);
417 	return error;
418 }
419 
420 /*
421  * Find rsb in rsbtbl and potentially create/add one
422  *
423  * Delaying the release of rsb's has a similar benefit to applications keeping
424  * NL locks on an rsb, but without the guarantee that the cached master value
425  * will still be valid when the rsb is reused.  Apps aren't always smart enough
426  * to keep NL locks on an rsb that they may lock again shortly; this can lead
427  * to excessive master lookups and removals if we don't delay the release.
428  *
429  * Searching for an rsb means looking through both the normal list and toss
430  * list.  When found on the toss list the rsb is moved to the normal list with
431  * ref count of 1; when found on normal list the ref count is incremented.
432  */
433 
434 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
435 		    unsigned int flags, struct dlm_rsb **r_ret)
436 {
437 	struct dlm_rsb *r, *tmp;
438 	uint32_t hash, bucket;
439 	int error = -EINVAL;
440 
441 	if (namelen > DLM_RESNAME_MAXLEN)
442 		goto out;
443 
444 	if (dlm_no_directory(ls))
445 		flags |= R_CREATE;
446 
447 	error = 0;
448 	hash = jhash(name, namelen, 0);
449 	bucket = hash & (ls->ls_rsbtbl_size - 1);
450 
451 	error = search_rsb(ls, name, namelen, bucket, flags, &r);
452 	if (!error)
453 		goto out;
454 
455 	if (error == -EBADR && !(flags & R_CREATE))
456 		goto out;
457 
458 	/* the rsb was found but wasn't a master copy */
459 	if (error == -ENOTBLK)
460 		goto out;
461 
462 	error = -ENOMEM;
463 	r = create_rsb(ls, name, namelen);
464 	if (!r)
465 		goto out;
466 
467 	r->res_hash = hash;
468 	r->res_bucket = bucket;
469 	r->res_nodeid = -1;
470 	kref_init(&r->res_ref);
471 
472 	/* With no directory, the master can be set immediately */
473 	if (dlm_no_directory(ls)) {
474 		int nodeid = dlm_dir_nodeid(r);
475 		if (nodeid == dlm_our_nodeid())
476 			nodeid = 0;
477 		r->res_nodeid = nodeid;
478 	}
479 
480 	write_lock(&ls->ls_rsbtbl[bucket].lock);
481 	error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
482 	if (!error) {
483 		write_unlock(&ls->ls_rsbtbl[bucket].lock);
484 		dlm_free_rsb(r);
485 		r = tmp;
486 		goto out;
487 	}
488 	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
489 	write_unlock(&ls->ls_rsbtbl[bucket].lock);
490 	error = 0;
491  out:
492 	*r_ret = r;
493 	return error;
494 }
495 
496 /* This is only called to add a reference when the code already holds
497    a valid reference to the rsb, so there's no need for locking. */
498 
499 static inline void hold_rsb(struct dlm_rsb *r)
500 {
501 	kref_get(&r->res_ref);
502 }
503 
504 void dlm_hold_rsb(struct dlm_rsb *r)
505 {
506 	hold_rsb(r);
507 }
508 
509 static void toss_rsb(struct kref *kref)
510 {
511 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
512 	struct dlm_ls *ls = r->res_ls;
513 
514 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
515 	kref_init(&r->res_ref);
516 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
517 	r->res_toss_time = jiffies;
518 	if (r->res_lvbptr) {
519 		dlm_free_lvb(r->res_lvbptr);
520 		r->res_lvbptr = NULL;
521 	}
522 }
523 
524 /* When all references to the rsb are gone it's transfered to
525    the tossed list for later disposal. */
526 
527 static void put_rsb(struct dlm_rsb *r)
528 {
529 	struct dlm_ls *ls = r->res_ls;
530 	uint32_t bucket = r->res_bucket;
531 
532 	write_lock(&ls->ls_rsbtbl[bucket].lock);
533 	kref_put(&r->res_ref, toss_rsb);
534 	write_unlock(&ls->ls_rsbtbl[bucket].lock);
535 }
536 
537 void dlm_put_rsb(struct dlm_rsb *r)
538 {
539 	put_rsb(r);
540 }
541 
542 /* See comment for unhold_lkb */
543 
544 static void unhold_rsb(struct dlm_rsb *r)
545 {
546 	int rv;
547 	rv = kref_put(&r->res_ref, toss_rsb);
548 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
549 }
550 
551 static void kill_rsb(struct kref *kref)
552 {
553 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
554 
555 	/* All work is done after the return from kref_put() so we
556 	   can release the write_lock before the remove and free. */
557 
558 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
559 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
560 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
561 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
562 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
563 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
564 }
565 
566 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
567    The rsb must exist as long as any lkb's for it do. */
568 
569 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
570 {
571 	hold_rsb(r);
572 	lkb->lkb_resource = r;
573 }
574 
575 static void detach_lkb(struct dlm_lkb *lkb)
576 {
577 	if (lkb->lkb_resource) {
578 		put_rsb(lkb->lkb_resource);
579 		lkb->lkb_resource = NULL;
580 	}
581 }
582 
583 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
584 {
585 	struct dlm_lkb *lkb, *tmp;
586 	uint32_t lkid = 0;
587 	uint16_t bucket;
588 
589 	lkb = dlm_allocate_lkb(ls);
590 	if (!lkb)
591 		return -ENOMEM;
592 
593 	lkb->lkb_nodeid = -1;
594 	lkb->lkb_grmode = DLM_LOCK_IV;
595 	kref_init(&lkb->lkb_ref);
596 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
597 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
598 	INIT_LIST_HEAD(&lkb->lkb_time_list);
599 
600 	get_random_bytes(&bucket, sizeof(bucket));
601 	bucket &= (ls->ls_lkbtbl_size - 1);
602 
603 	write_lock(&ls->ls_lkbtbl[bucket].lock);
604 
605 	/* counter can roll over so we must verify lkid is not in use */
606 
607 	while (lkid == 0) {
608 		lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
609 
610 		list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
611 				    lkb_idtbl_list) {
612 			if (tmp->lkb_id != lkid)
613 				continue;
614 			lkid = 0;
615 			break;
616 		}
617 	}
618 
619 	lkb->lkb_id = lkid;
620 	list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
621 	write_unlock(&ls->ls_lkbtbl[bucket].lock);
622 
623 	*lkb_ret = lkb;
624 	return 0;
625 }
626 
627 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
628 {
629 	struct dlm_lkb *lkb;
630 	uint16_t bucket = (lkid >> 16);
631 
632 	list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
633 		if (lkb->lkb_id == lkid)
634 			return lkb;
635 	}
636 	return NULL;
637 }
638 
639 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
640 {
641 	struct dlm_lkb *lkb;
642 	uint16_t bucket = (lkid >> 16);
643 
644 	if (bucket >= ls->ls_lkbtbl_size)
645 		return -EBADSLT;
646 
647 	read_lock(&ls->ls_lkbtbl[bucket].lock);
648 	lkb = __find_lkb(ls, lkid);
649 	if (lkb)
650 		kref_get(&lkb->lkb_ref);
651 	read_unlock(&ls->ls_lkbtbl[bucket].lock);
652 
653 	*lkb_ret = lkb;
654 	return lkb ? 0 : -ENOENT;
655 }
656 
657 static void kill_lkb(struct kref *kref)
658 {
659 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
660 
661 	/* All work is done after the return from kref_put() so we
662 	   can release the write_lock before the detach_lkb */
663 
664 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
665 }
666 
667 /* __put_lkb() is used when an lkb may not have an rsb attached to
668    it so we need to provide the lockspace explicitly */
669 
670 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
671 {
672 	uint16_t bucket = (lkb->lkb_id >> 16);
673 
674 	write_lock(&ls->ls_lkbtbl[bucket].lock);
675 	if (kref_put(&lkb->lkb_ref, kill_lkb)) {
676 		list_del(&lkb->lkb_idtbl_list);
677 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
678 
679 		detach_lkb(lkb);
680 
681 		/* for local/process lkbs, lvbptr points to caller's lksb */
682 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
683 			dlm_free_lvb(lkb->lkb_lvbptr);
684 		dlm_free_lkb(lkb);
685 		return 1;
686 	} else {
687 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
688 		return 0;
689 	}
690 }
691 
692 int dlm_put_lkb(struct dlm_lkb *lkb)
693 {
694 	struct dlm_ls *ls;
695 
696 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
697 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
698 
699 	ls = lkb->lkb_resource->res_ls;
700 	return __put_lkb(ls, lkb);
701 }
702 
703 /* This is only called to add a reference when the code already holds
704    a valid reference to the lkb, so there's no need for locking. */
705 
706 static inline void hold_lkb(struct dlm_lkb *lkb)
707 {
708 	kref_get(&lkb->lkb_ref);
709 }
710 
711 /* This is called when we need to remove a reference and are certain
712    it's not the last ref.  e.g. del_lkb is always called between a
713    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
714    put_lkb would work fine, but would involve unnecessary locking */
715 
716 static inline void unhold_lkb(struct dlm_lkb *lkb)
717 {
718 	int rv;
719 	rv = kref_put(&lkb->lkb_ref, kill_lkb);
720 	DLM_ASSERT(!rv, dlm_print_lkb(lkb););
721 }
722 
723 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
724 			    int mode)
725 {
726 	struct dlm_lkb *lkb = NULL;
727 
728 	list_for_each_entry(lkb, head, lkb_statequeue)
729 		if (lkb->lkb_rqmode < mode)
730 			break;
731 
732 	if (!lkb)
733 		list_add_tail(new, head);
734 	else
735 		__list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
736 }
737 
738 /* add/remove lkb to rsb's grant/convert/wait queue */
739 
740 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
741 {
742 	kref_get(&lkb->lkb_ref);
743 
744 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
745 
746 	lkb->lkb_status = status;
747 
748 	switch (status) {
749 	case DLM_LKSTS_WAITING:
750 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
751 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
752 		else
753 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
754 		break;
755 	case DLM_LKSTS_GRANTED:
756 		/* convention says granted locks kept in order of grmode */
757 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
758 				lkb->lkb_grmode);
759 		break;
760 	case DLM_LKSTS_CONVERT:
761 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
762 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
763 		else
764 			list_add_tail(&lkb->lkb_statequeue,
765 				      &r->res_convertqueue);
766 		break;
767 	default:
768 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
769 	}
770 }
771 
772 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
773 {
774 	lkb->lkb_status = 0;
775 	list_del(&lkb->lkb_statequeue);
776 	unhold_lkb(lkb);
777 }
778 
779 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
780 {
781 	hold_lkb(lkb);
782 	del_lkb(r, lkb);
783 	add_lkb(r, lkb, sts);
784 	unhold_lkb(lkb);
785 }
786 
787 static int msg_reply_type(int mstype)
788 {
789 	switch (mstype) {
790 	case DLM_MSG_REQUEST:
791 		return DLM_MSG_REQUEST_REPLY;
792 	case DLM_MSG_CONVERT:
793 		return DLM_MSG_CONVERT_REPLY;
794 	case DLM_MSG_UNLOCK:
795 		return DLM_MSG_UNLOCK_REPLY;
796 	case DLM_MSG_CANCEL:
797 		return DLM_MSG_CANCEL_REPLY;
798 	case DLM_MSG_LOOKUP:
799 		return DLM_MSG_LOOKUP_REPLY;
800 	}
801 	return -1;
802 }
803 
804 /* add/remove lkb from global waiters list of lkb's waiting for
805    a reply from a remote node */
806 
807 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
808 {
809 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
810 	int error = 0;
811 
812 	mutex_lock(&ls->ls_waiters_mutex);
813 
814 	if (is_overlap_unlock(lkb) ||
815 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
816 		error = -EINVAL;
817 		goto out;
818 	}
819 
820 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
821 		switch (mstype) {
822 		case DLM_MSG_UNLOCK:
823 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
824 			break;
825 		case DLM_MSG_CANCEL:
826 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
827 			break;
828 		default:
829 			error = -EBUSY;
830 			goto out;
831 		}
832 		lkb->lkb_wait_count++;
833 		hold_lkb(lkb);
834 
835 		log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
836 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
837 			  lkb->lkb_wait_count, lkb->lkb_flags);
838 		goto out;
839 	}
840 
841 	DLM_ASSERT(!lkb->lkb_wait_count,
842 		   dlm_print_lkb(lkb);
843 		   printk("wait_count %d\n", lkb->lkb_wait_count););
844 
845 	lkb->lkb_wait_count++;
846 	lkb->lkb_wait_type = mstype;
847 	hold_lkb(lkb);
848 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
849  out:
850 	if (error)
851 		log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
852 			  lkb->lkb_id, error, lkb->lkb_flags, mstype,
853 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
854 	mutex_unlock(&ls->ls_waiters_mutex);
855 	return error;
856 }
857 
858 /* We clear the RESEND flag because we might be taking an lkb off the waiters
859    list as part of process_requestqueue (e.g. a lookup that has an optimized
860    request reply on the requestqueue) between dlm_recover_waiters_pre() which
861    set RESEND and dlm_recover_waiters_post() */
862 
863 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
864 {
865 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
866 	int overlap_done = 0;
867 
868 	if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
869 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
870 		overlap_done = 1;
871 		goto out_del;
872 	}
873 
874 	if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
875 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
876 		overlap_done = 1;
877 		goto out_del;
878 	}
879 
880 	/* N.B. type of reply may not always correspond to type of original
881 	   msg due to lookup->request optimization, verify others? */
882 
883 	if (lkb->lkb_wait_type) {
884 		lkb->lkb_wait_type = 0;
885 		goto out_del;
886 	}
887 
888 	log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
889 		  lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
890 	return -1;
891 
892  out_del:
893 	/* the force-unlock/cancel has completed and we haven't recvd a reply
894 	   to the op that was in progress prior to the unlock/cancel; we
895 	   give up on any reply to the earlier op.  FIXME: not sure when/how
896 	   this would happen */
897 
898 	if (overlap_done && lkb->lkb_wait_type) {
899 		log_error(ls, "remove_from_waiters %x reply %d give up on %d",
900 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
901 		lkb->lkb_wait_count--;
902 		lkb->lkb_wait_type = 0;
903 	}
904 
905 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
906 
907 	lkb->lkb_flags &= ~DLM_IFL_RESEND;
908 	lkb->lkb_wait_count--;
909 	if (!lkb->lkb_wait_count)
910 		list_del_init(&lkb->lkb_wait_reply);
911 	unhold_lkb(lkb);
912 	return 0;
913 }
914 
915 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
916 {
917 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
918 	int error;
919 
920 	mutex_lock(&ls->ls_waiters_mutex);
921 	error = _remove_from_waiters(lkb, mstype);
922 	mutex_unlock(&ls->ls_waiters_mutex);
923 	return error;
924 }
925 
926 /* Handles situations where we might be processing a "fake" or "stub" reply in
927    which we can't try to take waiters_mutex again. */
928 
929 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
930 {
931 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
932 	int error;
933 
934 	if (ms != &ls->ls_stub_ms)
935 		mutex_lock(&ls->ls_waiters_mutex);
936 	error = _remove_from_waiters(lkb, ms->m_type);
937 	if (ms != &ls->ls_stub_ms)
938 		mutex_unlock(&ls->ls_waiters_mutex);
939 	return error;
940 }
941 
942 static void dir_remove(struct dlm_rsb *r)
943 {
944 	int to_nodeid;
945 
946 	if (dlm_no_directory(r->res_ls))
947 		return;
948 
949 	to_nodeid = dlm_dir_nodeid(r);
950 	if (to_nodeid != dlm_our_nodeid())
951 		send_remove(r);
952 	else
953 		dlm_dir_remove_entry(r->res_ls, to_nodeid,
954 				     r->res_name, r->res_length);
955 }
956 
957 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
958    found since they are in order of newest to oldest? */
959 
960 static int shrink_bucket(struct dlm_ls *ls, int b)
961 {
962 	struct dlm_rsb *r;
963 	int count = 0, found;
964 
965 	for (;;) {
966 		found = 0;
967 		write_lock(&ls->ls_rsbtbl[b].lock);
968 		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
969 					    res_hashchain) {
970 			if (!time_after_eq(jiffies, r->res_toss_time +
971 					   dlm_config.ci_toss_secs * HZ))
972 				continue;
973 			found = 1;
974 			break;
975 		}
976 
977 		if (!found) {
978 			write_unlock(&ls->ls_rsbtbl[b].lock);
979 			break;
980 		}
981 
982 		if (kref_put(&r->res_ref, kill_rsb)) {
983 			list_del(&r->res_hashchain);
984 			write_unlock(&ls->ls_rsbtbl[b].lock);
985 
986 			if (is_master(r))
987 				dir_remove(r);
988 			dlm_free_rsb(r);
989 			count++;
990 		} else {
991 			write_unlock(&ls->ls_rsbtbl[b].lock);
992 			log_error(ls, "tossed rsb in use %s", r->res_name);
993 		}
994 	}
995 
996 	return count;
997 }
998 
999 void dlm_scan_rsbs(struct dlm_ls *ls)
1000 {
1001 	int i;
1002 
1003 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1004 		shrink_bucket(ls, i);
1005 		if (dlm_locking_stopped(ls))
1006 			break;
1007 		cond_resched();
1008 	}
1009 }
1010 
1011 static void add_timeout(struct dlm_lkb *lkb)
1012 {
1013 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1014 
1015 	if (is_master_copy(lkb)) {
1016 		lkb->lkb_timestamp = jiffies;
1017 		return;
1018 	}
1019 
1020 	if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1021 	    !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1022 		lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1023 		goto add_it;
1024 	}
1025 	if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1026 		goto add_it;
1027 	return;
1028 
1029  add_it:
1030 	DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1031 	mutex_lock(&ls->ls_timeout_mutex);
1032 	hold_lkb(lkb);
1033 	lkb->lkb_timestamp = jiffies;
1034 	list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1035 	mutex_unlock(&ls->ls_timeout_mutex);
1036 }
1037 
1038 static void del_timeout(struct dlm_lkb *lkb)
1039 {
1040 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1041 
1042 	mutex_lock(&ls->ls_timeout_mutex);
1043 	if (!list_empty(&lkb->lkb_time_list)) {
1044 		list_del_init(&lkb->lkb_time_list);
1045 		unhold_lkb(lkb);
1046 	}
1047 	mutex_unlock(&ls->ls_timeout_mutex);
1048 }
1049 
1050 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1051    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1052    and then lock rsb because of lock ordering in add_timeout.  We may need
1053    to specify some special timeout-related bits in the lkb that are just to
1054    be accessed under the timeout_mutex. */
1055 
1056 void dlm_scan_timeout(struct dlm_ls *ls)
1057 {
1058 	struct dlm_rsb *r;
1059 	struct dlm_lkb *lkb;
1060 	int do_cancel, do_warn;
1061 
1062 	for (;;) {
1063 		if (dlm_locking_stopped(ls))
1064 			break;
1065 
1066 		do_cancel = 0;
1067 		do_warn = 0;
1068 		mutex_lock(&ls->ls_timeout_mutex);
1069 		list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1070 
1071 			if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1072 			    time_after_eq(jiffies, lkb->lkb_timestamp +
1073 					  lkb->lkb_timeout_cs * HZ/100))
1074 				do_cancel = 1;
1075 
1076 			if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1077 			    time_after_eq(jiffies, lkb->lkb_timestamp +
1078 				   	   dlm_config.ci_timewarn_cs * HZ/100))
1079 				do_warn = 1;
1080 
1081 			if (!do_cancel && !do_warn)
1082 				continue;
1083 			hold_lkb(lkb);
1084 			break;
1085 		}
1086 		mutex_unlock(&ls->ls_timeout_mutex);
1087 
1088 		if (!do_cancel && !do_warn)
1089 			break;
1090 
1091 		r = lkb->lkb_resource;
1092 		hold_rsb(r);
1093 		lock_rsb(r);
1094 
1095 		if (do_warn) {
1096 			/* clear flag so we only warn once */
1097 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1098 			if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1099 				del_timeout(lkb);
1100 			dlm_timeout_warn(lkb);
1101 		}
1102 
1103 		if (do_cancel) {
1104 			log_debug(ls, "timeout cancel %x node %d %s",
1105 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1106 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1107 			lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1108 			del_timeout(lkb);
1109 			_cancel_lock(r, lkb);
1110 		}
1111 
1112 		unlock_rsb(r);
1113 		unhold_rsb(r);
1114 		dlm_put_lkb(lkb);
1115 	}
1116 }
1117 
1118 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1119    dlm_recoverd before checking/setting ls_recover_begin. */
1120 
1121 void dlm_adjust_timeouts(struct dlm_ls *ls)
1122 {
1123 	struct dlm_lkb *lkb;
1124 	long adj = jiffies - ls->ls_recover_begin;
1125 
1126 	ls->ls_recover_begin = 0;
1127 	mutex_lock(&ls->ls_timeout_mutex);
1128 	list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1129 		lkb->lkb_timestamp += adj;
1130 	mutex_unlock(&ls->ls_timeout_mutex);
1131 }
1132 
1133 /* lkb is master or local copy */
1134 
1135 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1136 {
1137 	int b, len = r->res_ls->ls_lvblen;
1138 
1139 	/* b=1 lvb returned to caller
1140 	   b=0 lvb written to rsb or invalidated
1141 	   b=-1 do nothing */
1142 
1143 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1144 
1145 	if (b == 1) {
1146 		if (!lkb->lkb_lvbptr)
1147 			return;
1148 
1149 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1150 			return;
1151 
1152 		if (!r->res_lvbptr)
1153 			return;
1154 
1155 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1156 		lkb->lkb_lvbseq = r->res_lvbseq;
1157 
1158 	} else if (b == 0) {
1159 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1160 			rsb_set_flag(r, RSB_VALNOTVALID);
1161 			return;
1162 		}
1163 
1164 		if (!lkb->lkb_lvbptr)
1165 			return;
1166 
1167 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1168 			return;
1169 
1170 		if (!r->res_lvbptr)
1171 			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1172 
1173 		if (!r->res_lvbptr)
1174 			return;
1175 
1176 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1177 		r->res_lvbseq++;
1178 		lkb->lkb_lvbseq = r->res_lvbseq;
1179 		rsb_clear_flag(r, RSB_VALNOTVALID);
1180 	}
1181 
1182 	if (rsb_flag(r, RSB_VALNOTVALID))
1183 		lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1184 }
1185 
1186 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1187 {
1188 	if (lkb->lkb_grmode < DLM_LOCK_PW)
1189 		return;
1190 
1191 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1192 		rsb_set_flag(r, RSB_VALNOTVALID);
1193 		return;
1194 	}
1195 
1196 	if (!lkb->lkb_lvbptr)
1197 		return;
1198 
1199 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1200 		return;
1201 
1202 	if (!r->res_lvbptr)
1203 		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1204 
1205 	if (!r->res_lvbptr)
1206 		return;
1207 
1208 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1209 	r->res_lvbseq++;
1210 	rsb_clear_flag(r, RSB_VALNOTVALID);
1211 }
1212 
1213 /* lkb is process copy (pc) */
1214 
1215 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1216 			    struct dlm_message *ms)
1217 {
1218 	int b;
1219 
1220 	if (!lkb->lkb_lvbptr)
1221 		return;
1222 
1223 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1224 		return;
1225 
1226 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1227 	if (b == 1) {
1228 		int len = receive_extralen(ms);
1229 		if (len > DLM_RESNAME_MAXLEN)
1230 			len = DLM_RESNAME_MAXLEN;
1231 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1232 		lkb->lkb_lvbseq = ms->m_lvbseq;
1233 	}
1234 }
1235 
1236 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1237    remove_lock -- used for unlock, removes lkb from granted
1238    revert_lock -- used for cancel, moves lkb from convert to granted
1239    grant_lock  -- used for request and convert, adds lkb to granted or
1240                   moves lkb from convert or waiting to granted
1241 
1242    Each of these is used for master or local copy lkb's.  There is
1243    also a _pc() variation used to make the corresponding change on
1244    a process copy (pc) lkb. */
1245 
1246 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1247 {
1248 	del_lkb(r, lkb);
1249 	lkb->lkb_grmode = DLM_LOCK_IV;
1250 	/* this unhold undoes the original ref from create_lkb()
1251 	   so this leads to the lkb being freed */
1252 	unhold_lkb(lkb);
1253 }
1254 
1255 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1256 {
1257 	set_lvb_unlock(r, lkb);
1258 	_remove_lock(r, lkb);
1259 }
1260 
1261 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1262 {
1263 	_remove_lock(r, lkb);
1264 }
1265 
1266 /* returns: 0 did nothing
1267 	    1 moved lock to granted
1268 	   -1 removed lock */
1269 
1270 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1271 {
1272 	int rv = 0;
1273 
1274 	lkb->lkb_rqmode = DLM_LOCK_IV;
1275 
1276 	switch (lkb->lkb_status) {
1277 	case DLM_LKSTS_GRANTED:
1278 		break;
1279 	case DLM_LKSTS_CONVERT:
1280 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1281 		rv = 1;
1282 		break;
1283 	case DLM_LKSTS_WAITING:
1284 		del_lkb(r, lkb);
1285 		lkb->lkb_grmode = DLM_LOCK_IV;
1286 		/* this unhold undoes the original ref from create_lkb()
1287 		   so this leads to the lkb being freed */
1288 		unhold_lkb(lkb);
1289 		rv = -1;
1290 		break;
1291 	default:
1292 		log_print("invalid status for revert %d", lkb->lkb_status);
1293 	}
1294 	return rv;
1295 }
1296 
1297 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1298 {
1299 	return revert_lock(r, lkb);
1300 }
1301 
1302 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1303 {
1304 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1305 		lkb->lkb_grmode = lkb->lkb_rqmode;
1306 		if (lkb->lkb_status)
1307 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1308 		else
1309 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1310 	}
1311 
1312 	lkb->lkb_rqmode = DLM_LOCK_IV;
1313 }
1314 
1315 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1316 {
1317 	set_lvb_lock(r, lkb);
1318 	_grant_lock(r, lkb);
1319 	lkb->lkb_highbast = 0;
1320 }
1321 
1322 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1323 			  struct dlm_message *ms)
1324 {
1325 	set_lvb_lock_pc(r, lkb, ms);
1326 	_grant_lock(r, lkb);
1327 }
1328 
1329 /* called by grant_pending_locks() which means an async grant message must
1330    be sent to the requesting node in addition to granting the lock if the
1331    lkb belongs to a remote node. */
1332 
1333 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1334 {
1335 	grant_lock(r, lkb);
1336 	if (is_master_copy(lkb))
1337 		send_grant(r, lkb);
1338 	else
1339 		queue_cast(r, lkb, 0);
1340 }
1341 
1342 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1343    change the granted/requested modes.  We're munging things accordingly in
1344    the process copy.
1345    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1346    conversion deadlock
1347    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1348    compatible with other granted locks */
1349 
1350 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1351 {
1352 	if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1353 		log_print("munge_demoted %x invalid reply type %d",
1354 			  lkb->lkb_id, ms->m_type);
1355 		return;
1356 	}
1357 
1358 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1359 		log_print("munge_demoted %x invalid modes gr %d rq %d",
1360 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1361 		return;
1362 	}
1363 
1364 	lkb->lkb_grmode = DLM_LOCK_NL;
1365 }
1366 
1367 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1368 {
1369 	if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1370 	    ms->m_type != DLM_MSG_GRANT) {
1371 		log_print("munge_altmode %x invalid reply type %d",
1372 			  lkb->lkb_id, ms->m_type);
1373 		return;
1374 	}
1375 
1376 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1377 		lkb->lkb_rqmode = DLM_LOCK_PR;
1378 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1379 		lkb->lkb_rqmode = DLM_LOCK_CW;
1380 	else {
1381 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1382 		dlm_print_lkb(lkb);
1383 	}
1384 }
1385 
1386 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1387 {
1388 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1389 					   lkb_statequeue);
1390 	if (lkb->lkb_id == first->lkb_id)
1391 		return 1;
1392 
1393 	return 0;
1394 }
1395 
1396 /* Check if the given lkb conflicts with another lkb on the queue. */
1397 
1398 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1399 {
1400 	struct dlm_lkb *this;
1401 
1402 	list_for_each_entry(this, head, lkb_statequeue) {
1403 		if (this == lkb)
1404 			continue;
1405 		if (!modes_compat(this, lkb))
1406 			return 1;
1407 	}
1408 	return 0;
1409 }
1410 
1411 /*
1412  * "A conversion deadlock arises with a pair of lock requests in the converting
1413  * queue for one resource.  The granted mode of each lock blocks the requested
1414  * mode of the other lock."
1415  *
1416  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1417  * convert queue from being granted, then deadlk/demote lkb.
1418  *
1419  * Example:
1420  * Granted Queue: empty
1421  * Convert Queue: NL->EX (first lock)
1422  *                PR->EX (second lock)
1423  *
1424  * The first lock can't be granted because of the granted mode of the second
1425  * lock and the second lock can't be granted because it's not first in the
1426  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1427  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1428  * flag set and return DEMOTED in the lksb flags.
1429  *
1430  * Originally, this function detected conv-deadlk in a more limited scope:
1431  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1432  * - if lkb1 was the first entry in the queue (not just earlier), and was
1433  *   blocked by the granted mode of lkb2, and there was nothing on the
1434  *   granted queue preventing lkb1 from being granted immediately, i.e.
1435  *   lkb2 was the only thing preventing lkb1 from being granted.
1436  *
1437  * That second condition meant we'd only say there was conv-deadlk if
1438  * resolving it (by demotion) would lead to the first lock on the convert
1439  * queue being granted right away.  It allowed conversion deadlocks to exist
1440  * between locks on the convert queue while they couldn't be granted anyway.
1441  *
1442  * Now, we detect and take action on conversion deadlocks immediately when
1443  * they're created, even if they may not be immediately consequential.  If
1444  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1445  * mode that would prevent lkb1's conversion from being granted, we do a
1446  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1447  * I think this means that the lkb_is_ahead condition below should always
1448  * be zero, i.e. there will never be conv-deadlk between two locks that are
1449  * both already on the convert queue.
1450  */
1451 
1452 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1453 {
1454 	struct dlm_lkb *lkb1;
1455 	int lkb_is_ahead = 0;
1456 
1457 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1458 		if (lkb1 == lkb2) {
1459 			lkb_is_ahead = 1;
1460 			continue;
1461 		}
1462 
1463 		if (!lkb_is_ahead) {
1464 			if (!modes_compat(lkb2, lkb1))
1465 				return 1;
1466 		} else {
1467 			if (!modes_compat(lkb2, lkb1) &&
1468 			    !modes_compat(lkb1, lkb2))
1469 				return 1;
1470 		}
1471 	}
1472 	return 0;
1473 }
1474 
1475 /*
1476  * Return 1 if the lock can be granted, 0 otherwise.
1477  * Also detect and resolve conversion deadlocks.
1478  *
1479  * lkb is the lock to be granted
1480  *
1481  * now is 1 if the function is being called in the context of the
1482  * immediate request, it is 0 if called later, after the lock has been
1483  * queued.
1484  *
1485  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1486  */
1487 
1488 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1489 {
1490 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1491 
1492 	/*
1493 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1494 	 * a new request for a NL mode lock being blocked.
1495 	 *
1496 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1497 	 * request, then it would be granted.  In essence, the use of this flag
1498 	 * tells the Lock Manager to expedite theis request by not considering
1499 	 * what may be in the CONVERTING or WAITING queues...  As of this
1500 	 * writing, the EXPEDITE flag can be used only with new requests for NL
1501 	 * mode locks.  This flag is not valid for conversion requests.
1502 	 *
1503 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1504 	 * conversion or used with a non-NL requested mode.  We also know an
1505 	 * EXPEDITE request is always granted immediately, so now must always
1506 	 * be 1.  The full condition to grant an expedite request: (now &&
1507 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1508 	 * therefore be shortened to just checking the flag.
1509 	 */
1510 
1511 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1512 		return 1;
1513 
1514 	/*
1515 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1516 	 * added to the remaining conditions.
1517 	 */
1518 
1519 	if (queue_conflict(&r->res_grantqueue, lkb))
1520 		goto out;
1521 
1522 	/*
1523 	 * 6-3: By default, a conversion request is immediately granted if the
1524 	 * requested mode is compatible with the modes of all other granted
1525 	 * locks
1526 	 */
1527 
1528 	if (queue_conflict(&r->res_convertqueue, lkb))
1529 		goto out;
1530 
1531 	/*
1532 	 * 6-5: But the default algorithm for deciding whether to grant or
1533 	 * queue conversion requests does not by itself guarantee that such
1534 	 * requests are serviced on a "first come first serve" basis.  This, in
1535 	 * turn, can lead to a phenomenon known as "indefinate postponement".
1536 	 *
1537 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1538 	 * the system service employed to request a lock conversion.  This flag
1539 	 * forces certain conversion requests to be queued, even if they are
1540 	 * compatible with the granted modes of other locks on the same
1541 	 * resource.  Thus, the use of this flag results in conversion requests
1542 	 * being ordered on a "first come first servce" basis.
1543 	 *
1544 	 * DCT: This condition is all about new conversions being able to occur
1545 	 * "in place" while the lock remains on the granted queue (assuming
1546 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1547 	 * doesn't _have_ to go onto the convert queue where it's processed in
1548 	 * order.  The "now" variable is necessary to distinguish converts
1549 	 * being received and processed for the first time now, because once a
1550 	 * convert is moved to the conversion queue the condition below applies
1551 	 * requiring fifo granting.
1552 	 */
1553 
1554 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1555 		return 1;
1556 
1557 	/*
1558 	 * The NOORDER flag is set to avoid the standard vms rules on grant
1559 	 * order.
1560 	 */
1561 
1562 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1563 		return 1;
1564 
1565 	/*
1566 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1567 	 * granted until all other conversion requests ahead of it are granted
1568 	 * and/or canceled.
1569 	 */
1570 
1571 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1572 		return 1;
1573 
1574 	/*
1575 	 * 6-4: By default, a new request is immediately granted only if all
1576 	 * three of the following conditions are satisfied when the request is
1577 	 * issued:
1578 	 * - The queue of ungranted conversion requests for the resource is
1579 	 *   empty.
1580 	 * - The queue of ungranted new requests for the resource is empty.
1581 	 * - The mode of the new request is compatible with the most
1582 	 *   restrictive mode of all granted locks on the resource.
1583 	 */
1584 
1585 	if (now && !conv && list_empty(&r->res_convertqueue) &&
1586 	    list_empty(&r->res_waitqueue))
1587 		return 1;
1588 
1589 	/*
1590 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
1591 	 * it cannot be granted until the queue of ungranted conversion
1592 	 * requests is empty, all ungranted new requests ahead of it are
1593 	 * granted and/or canceled, and it is compatible with the granted mode
1594 	 * of the most restrictive lock granted on the resource.
1595 	 */
1596 
1597 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
1598 	    first_in_list(lkb, &r->res_waitqueue))
1599 		return 1;
1600  out:
1601 	return 0;
1602 }
1603 
1604 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1605 			  int *err)
1606 {
1607 	int rv;
1608 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1609 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1610 
1611 	if (err)
1612 		*err = 0;
1613 
1614 	rv = _can_be_granted(r, lkb, now);
1615 	if (rv)
1616 		goto out;
1617 
1618 	/*
1619 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1620 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1621 	 * cancels one of the locks.
1622 	 */
1623 
1624 	if (is_convert && can_be_queued(lkb) &&
1625 	    conversion_deadlock_detect(r, lkb)) {
1626 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1627 			lkb->lkb_grmode = DLM_LOCK_NL;
1628 			lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1629 		} else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1630 			if (err)
1631 				*err = -EDEADLK;
1632 			else {
1633 				log_print("can_be_granted deadlock %x now %d",
1634 					  lkb->lkb_id, now);
1635 				dlm_dump_rsb(r);
1636 			}
1637 		}
1638 		goto out;
1639 	}
1640 
1641 	/*
1642 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1643 	 * to grant a request in a mode other than the normal rqmode.  It's a
1644 	 * simple way to provide a big optimization to applications that can
1645 	 * use them.
1646 	 */
1647 
1648 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1649 		alt = DLM_LOCK_PR;
1650 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1651 		alt = DLM_LOCK_CW;
1652 
1653 	if (alt) {
1654 		lkb->lkb_rqmode = alt;
1655 		rv = _can_be_granted(r, lkb, now);
1656 		if (rv)
1657 			lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1658 		else
1659 			lkb->lkb_rqmode = rqmode;
1660 	}
1661  out:
1662 	return rv;
1663 }
1664 
1665 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1666    for locks pending on the convert list.  Once verified (watch for these
1667    log_prints), we should be able to just call _can_be_granted() and not
1668    bother with the demote/deadlk cases here (and there's no easy way to deal
1669    with a deadlk here, we'd have to generate something like grant_lock with
1670    the deadlk error.) */
1671 
1672 /* Returns the highest requested mode of all blocked conversions; sets
1673    cw if there's a blocked conversion to DLM_LOCK_CW. */
1674 
1675 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1676 {
1677 	struct dlm_lkb *lkb, *s;
1678 	int hi, demoted, quit, grant_restart, demote_restart;
1679 	int deadlk;
1680 
1681 	quit = 0;
1682  restart:
1683 	grant_restart = 0;
1684 	demote_restart = 0;
1685 	hi = DLM_LOCK_IV;
1686 
1687 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1688 		demoted = is_demoted(lkb);
1689 		deadlk = 0;
1690 
1691 		if (can_be_granted(r, lkb, 0, &deadlk)) {
1692 			grant_lock_pending(r, lkb);
1693 			grant_restart = 1;
1694 			continue;
1695 		}
1696 
1697 		if (!demoted && is_demoted(lkb)) {
1698 			log_print("WARN: pending demoted %x node %d %s",
1699 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1700 			demote_restart = 1;
1701 			continue;
1702 		}
1703 
1704 		if (deadlk) {
1705 			log_print("WARN: pending deadlock %x node %d %s",
1706 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1707 			dlm_dump_rsb(r);
1708 			continue;
1709 		}
1710 
1711 		hi = max_t(int, lkb->lkb_rqmode, hi);
1712 
1713 		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1714 			*cw = 1;
1715 	}
1716 
1717 	if (grant_restart)
1718 		goto restart;
1719 	if (demote_restart && !quit) {
1720 		quit = 1;
1721 		goto restart;
1722 	}
1723 
1724 	return max_t(int, high, hi);
1725 }
1726 
1727 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1728 {
1729 	struct dlm_lkb *lkb, *s;
1730 
1731 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1732 		if (can_be_granted(r, lkb, 0, NULL))
1733 			grant_lock_pending(r, lkb);
1734                 else {
1735 			high = max_t(int, lkb->lkb_rqmode, high);
1736 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
1737 				*cw = 1;
1738 		}
1739 	}
1740 
1741 	return high;
1742 }
1743 
1744 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1745    on either the convert or waiting queue.
1746    high is the largest rqmode of all locks blocked on the convert or
1747    waiting queue. */
1748 
1749 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1750 {
1751 	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1752 		if (gr->lkb_highbast < DLM_LOCK_EX)
1753 			return 1;
1754 		return 0;
1755 	}
1756 
1757 	if (gr->lkb_highbast < high &&
1758 	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1759 		return 1;
1760 	return 0;
1761 }
1762 
1763 static void grant_pending_locks(struct dlm_rsb *r)
1764 {
1765 	struct dlm_lkb *lkb, *s;
1766 	int high = DLM_LOCK_IV;
1767 	int cw = 0;
1768 
1769 	DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1770 
1771 	high = grant_pending_convert(r, high, &cw);
1772 	high = grant_pending_wait(r, high, &cw);
1773 
1774 	if (high == DLM_LOCK_IV)
1775 		return;
1776 
1777 	/*
1778 	 * If there are locks left on the wait/convert queue then send blocking
1779 	 * ASTs to granted locks based on the largest requested mode (high)
1780 	 * found above.
1781 	 */
1782 
1783 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1784 		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1785 			if (cw && high == DLM_LOCK_PR)
1786 				queue_bast(r, lkb, DLM_LOCK_CW);
1787 			else
1788 				queue_bast(r, lkb, high);
1789 			lkb->lkb_highbast = high;
1790 		}
1791 	}
1792 }
1793 
1794 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1795 {
1796 	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1797 	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1798 		if (gr->lkb_highbast < DLM_LOCK_EX)
1799 			return 1;
1800 		return 0;
1801 	}
1802 
1803 	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1804 		return 1;
1805 	return 0;
1806 }
1807 
1808 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1809 			    struct dlm_lkb *lkb)
1810 {
1811 	struct dlm_lkb *gr;
1812 
1813 	list_for_each_entry(gr, head, lkb_statequeue) {
1814 		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1815 			queue_bast(r, gr, lkb->lkb_rqmode);
1816 			gr->lkb_highbast = lkb->lkb_rqmode;
1817 		}
1818 	}
1819 }
1820 
1821 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1822 {
1823 	send_bast_queue(r, &r->res_grantqueue, lkb);
1824 }
1825 
1826 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1827 {
1828 	send_bast_queue(r, &r->res_grantqueue, lkb);
1829 	send_bast_queue(r, &r->res_convertqueue, lkb);
1830 }
1831 
1832 /* set_master(r, lkb) -- set the master nodeid of a resource
1833 
1834    The purpose of this function is to set the nodeid field in the given
1835    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1836    known, it can just be copied to the lkb and the function will return
1837    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1838    before it can be copied to the lkb.
1839 
1840    When the rsb nodeid is being looked up remotely, the initial lkb
1841    causing the lookup is kept on the ls_waiters list waiting for the
1842    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1843    on the rsb's res_lookup list until the master is verified.
1844 
1845    Return values:
1846    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1847    1: the rsb master is not available and the lkb has been placed on
1848       a wait queue
1849 */
1850 
1851 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1852 {
1853 	struct dlm_ls *ls = r->res_ls;
1854 	int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1855 
1856 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1857 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1858 		r->res_first_lkid = lkb->lkb_id;
1859 		lkb->lkb_nodeid = r->res_nodeid;
1860 		return 0;
1861 	}
1862 
1863 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1864 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1865 		return 1;
1866 	}
1867 
1868 	if (r->res_nodeid == 0) {
1869 		lkb->lkb_nodeid = 0;
1870 		return 0;
1871 	}
1872 
1873 	if (r->res_nodeid > 0) {
1874 		lkb->lkb_nodeid = r->res_nodeid;
1875 		return 0;
1876 	}
1877 
1878 	DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1879 
1880 	dir_nodeid = dlm_dir_nodeid(r);
1881 
1882 	if (dir_nodeid != our_nodeid) {
1883 		r->res_first_lkid = lkb->lkb_id;
1884 		send_lookup(r, lkb);
1885 		return 1;
1886 	}
1887 
1888 	for (i = 0; i < 2; i++) {
1889 		/* It's possible for dlm_scand to remove an old rsb for
1890 		   this same resource from the toss list, us to create
1891 		   a new one, look up the master locally, and find it
1892 		   already exists just before dlm_scand does the
1893 		   dir_remove() on the previous rsb. */
1894 
1895 		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1896 				       r->res_length, &ret_nodeid);
1897 		if (!error)
1898 			break;
1899 		log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1900 		schedule();
1901 	}
1902 	if (error && error != -EEXIST)
1903 		return error;
1904 
1905 	if (ret_nodeid == our_nodeid) {
1906 		r->res_first_lkid = 0;
1907 		r->res_nodeid = 0;
1908 		lkb->lkb_nodeid = 0;
1909 	} else {
1910 		r->res_first_lkid = lkb->lkb_id;
1911 		r->res_nodeid = ret_nodeid;
1912 		lkb->lkb_nodeid = ret_nodeid;
1913 	}
1914 	return 0;
1915 }
1916 
1917 static void process_lookup_list(struct dlm_rsb *r)
1918 {
1919 	struct dlm_lkb *lkb, *safe;
1920 
1921 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1922 		list_del_init(&lkb->lkb_rsb_lookup);
1923 		_request_lock(r, lkb);
1924 		schedule();
1925 	}
1926 }
1927 
1928 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1929 
1930 static void confirm_master(struct dlm_rsb *r, int error)
1931 {
1932 	struct dlm_lkb *lkb;
1933 
1934 	if (!r->res_first_lkid)
1935 		return;
1936 
1937 	switch (error) {
1938 	case 0:
1939 	case -EINPROGRESS:
1940 		r->res_first_lkid = 0;
1941 		process_lookup_list(r);
1942 		break;
1943 
1944 	case -EAGAIN:
1945 	case -EBADR:
1946 	case -ENOTBLK:
1947 		/* the remote request failed and won't be retried (it was
1948 		   a NOQUEUE, or has been canceled/unlocked); make a waiting
1949 		   lkb the first_lkid */
1950 
1951 		r->res_first_lkid = 0;
1952 
1953 		if (!list_empty(&r->res_lookup)) {
1954 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1955 					 lkb_rsb_lookup);
1956 			list_del_init(&lkb->lkb_rsb_lookup);
1957 			r->res_first_lkid = lkb->lkb_id;
1958 			_request_lock(r, lkb);
1959 		} else
1960 			r->res_nodeid = -1;
1961 		break;
1962 
1963 	default:
1964 		log_error(r->res_ls, "confirm_master unknown error %d", error);
1965 	}
1966 }
1967 
1968 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1969 			 int namelen, unsigned long timeout_cs,
1970 			 void (*ast) (void *astparam),
1971 			 void *astparam,
1972 			 void (*bast) (void *astparam, int mode),
1973 			 struct dlm_args *args)
1974 {
1975 	int rv = -EINVAL;
1976 
1977 	/* check for invalid arg usage */
1978 
1979 	if (mode < 0 || mode > DLM_LOCK_EX)
1980 		goto out;
1981 
1982 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1983 		goto out;
1984 
1985 	if (flags & DLM_LKF_CANCEL)
1986 		goto out;
1987 
1988 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1989 		goto out;
1990 
1991 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1992 		goto out;
1993 
1994 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1995 		goto out;
1996 
1997 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1998 		goto out;
1999 
2000 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2001 		goto out;
2002 
2003 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2004 		goto out;
2005 
2006 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2007 		goto out;
2008 
2009 	if (!ast || !lksb)
2010 		goto out;
2011 
2012 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2013 		goto out;
2014 
2015 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2016 		goto out;
2017 
2018 	/* these args will be copied to the lkb in validate_lock_args,
2019 	   it cannot be done now because when converting locks, fields in
2020 	   an active lkb cannot be modified before locking the rsb */
2021 
2022 	args->flags = flags;
2023 	args->astfn = ast;
2024 	args->astparam = astparam;
2025 	args->bastfn = bast;
2026 	args->timeout = timeout_cs;
2027 	args->mode = mode;
2028 	args->lksb = lksb;
2029 	rv = 0;
2030  out:
2031 	return rv;
2032 }
2033 
2034 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2035 {
2036 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2037  		      DLM_LKF_FORCEUNLOCK))
2038 		return -EINVAL;
2039 
2040 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2041 		return -EINVAL;
2042 
2043 	args->flags = flags;
2044 	args->astparam = astarg;
2045 	return 0;
2046 }
2047 
2048 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2049 			      struct dlm_args *args)
2050 {
2051 	int rv = -EINVAL;
2052 
2053 	if (args->flags & DLM_LKF_CONVERT) {
2054 		if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2055 			goto out;
2056 
2057 		if (args->flags & DLM_LKF_QUECVT &&
2058 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2059 			goto out;
2060 
2061 		rv = -EBUSY;
2062 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2063 			goto out;
2064 
2065 		if (lkb->lkb_wait_type)
2066 			goto out;
2067 
2068 		if (is_overlap(lkb))
2069 			goto out;
2070 	}
2071 
2072 	lkb->lkb_exflags = args->flags;
2073 	lkb->lkb_sbflags = 0;
2074 	lkb->lkb_astfn = args->astfn;
2075 	lkb->lkb_astparam = args->astparam;
2076 	lkb->lkb_bastfn = args->bastfn;
2077 	lkb->lkb_rqmode = args->mode;
2078 	lkb->lkb_lksb = args->lksb;
2079 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2080 	lkb->lkb_ownpid = (int) current->pid;
2081 	lkb->lkb_timeout_cs = args->timeout;
2082 	rv = 0;
2083  out:
2084 	return rv;
2085 }
2086 
2087 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2088    for success */
2089 
2090 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2091    because there may be a lookup in progress and it's valid to do
2092    cancel/unlockf on it */
2093 
2094 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2095 {
2096 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2097 	int rv = -EINVAL;
2098 
2099 	if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2100 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2101 		dlm_print_lkb(lkb);
2102 		goto out;
2103 	}
2104 
2105 	/* an lkb may still exist even though the lock is EOL'ed due to a
2106 	   cancel, unlock or failed noqueue request; an app can't use these
2107 	   locks; return same error as if the lkid had not been found at all */
2108 
2109 	if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2110 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2111 		rv = -ENOENT;
2112 		goto out;
2113 	}
2114 
2115 	/* an lkb may be waiting for an rsb lookup to complete where the
2116 	   lookup was initiated by another lock */
2117 
2118 	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2119 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2120 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2121 			list_del_init(&lkb->lkb_rsb_lookup);
2122 			queue_cast(lkb->lkb_resource, lkb,
2123 				   args->flags & DLM_LKF_CANCEL ?
2124 				   -DLM_ECANCEL : -DLM_EUNLOCK);
2125 			unhold_lkb(lkb); /* undoes create_lkb() */
2126 		}
2127 		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2128 		rv = -EBUSY;
2129 		goto out;
2130 	}
2131 
2132 	/* cancel not allowed with another cancel/unlock in progress */
2133 
2134 	if (args->flags & DLM_LKF_CANCEL) {
2135 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2136 			goto out;
2137 
2138 		if (is_overlap(lkb))
2139 			goto out;
2140 
2141 		/* don't let scand try to do a cancel */
2142 		del_timeout(lkb);
2143 
2144 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2145 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2146 			rv = -EBUSY;
2147 			goto out;
2148 		}
2149 
2150 		switch (lkb->lkb_wait_type) {
2151 		case DLM_MSG_LOOKUP:
2152 		case DLM_MSG_REQUEST:
2153 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2154 			rv = -EBUSY;
2155 			goto out;
2156 		case DLM_MSG_UNLOCK:
2157 		case DLM_MSG_CANCEL:
2158 			goto out;
2159 		}
2160 		/* add_to_waiters() will set OVERLAP_CANCEL */
2161 		goto out_ok;
2162 	}
2163 
2164 	/* do we need to allow a force-unlock if there's a normal unlock
2165 	   already in progress?  in what conditions could the normal unlock
2166 	   fail such that we'd want to send a force-unlock to be sure? */
2167 
2168 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2169 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2170 			goto out;
2171 
2172 		if (is_overlap_unlock(lkb))
2173 			goto out;
2174 
2175 		/* don't let scand try to do a cancel */
2176 		del_timeout(lkb);
2177 
2178 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2179 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2180 			rv = -EBUSY;
2181 			goto out;
2182 		}
2183 
2184 		switch (lkb->lkb_wait_type) {
2185 		case DLM_MSG_LOOKUP:
2186 		case DLM_MSG_REQUEST:
2187 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2188 			rv = -EBUSY;
2189 			goto out;
2190 		case DLM_MSG_UNLOCK:
2191 			goto out;
2192 		}
2193 		/* add_to_waiters() will set OVERLAP_UNLOCK */
2194 		goto out_ok;
2195 	}
2196 
2197 	/* normal unlock not allowed if there's any op in progress */
2198 	rv = -EBUSY;
2199 	if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2200 		goto out;
2201 
2202  out_ok:
2203 	/* an overlapping op shouldn't blow away exflags from other op */
2204 	lkb->lkb_exflags |= args->flags;
2205 	lkb->lkb_sbflags = 0;
2206 	lkb->lkb_astparam = args->astparam;
2207 	rv = 0;
2208  out:
2209 	if (rv)
2210 		log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2211 			  lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2212 			  args->flags, lkb->lkb_wait_type,
2213 			  lkb->lkb_resource->res_name);
2214 	return rv;
2215 }
2216 
2217 /*
2218  * Four stage 4 varieties:
2219  * do_request(), do_convert(), do_unlock(), do_cancel()
2220  * These are called on the master node for the given lock and
2221  * from the central locking logic.
2222  */
2223 
2224 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2225 {
2226 	int error = 0;
2227 
2228 	if (can_be_granted(r, lkb, 1, NULL)) {
2229 		grant_lock(r, lkb);
2230 		queue_cast(r, lkb, 0);
2231 		goto out;
2232 	}
2233 
2234 	if (can_be_queued(lkb)) {
2235 		error = -EINPROGRESS;
2236 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
2237 		send_blocking_asts(r, lkb);
2238 		add_timeout(lkb);
2239 		goto out;
2240 	}
2241 
2242 	error = -EAGAIN;
2243 	if (force_blocking_asts(lkb))
2244 		send_blocking_asts_all(r, lkb);
2245 	queue_cast(r, lkb, -EAGAIN);
2246 
2247  out:
2248 	return error;
2249 }
2250 
2251 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2252 {
2253 	int error = 0;
2254 	int deadlk = 0;
2255 
2256 	/* changing an existing lock may allow others to be granted */
2257 
2258 	if (can_be_granted(r, lkb, 1, &deadlk)) {
2259 		grant_lock(r, lkb);
2260 		queue_cast(r, lkb, 0);
2261 		grant_pending_locks(r);
2262 		goto out;
2263 	}
2264 
2265 	/* can_be_granted() detected that this lock would block in a conversion
2266 	   deadlock, so we leave it on the granted queue and return EDEADLK in
2267 	   the ast for the convert. */
2268 
2269 	if (deadlk) {
2270 		/* it's left on the granted queue */
2271 		log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2272 			  lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2273 			  lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2274 		revert_lock(r, lkb);
2275 		queue_cast(r, lkb, -EDEADLK);
2276 		error = -EDEADLK;
2277 		goto out;
2278 	}
2279 
2280 	/* is_demoted() means the can_be_granted() above set the grmode
2281 	   to NL, and left us on the granted queue.  This auto-demotion
2282 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
2283 	   now grantable.  We have to try to grant other converting locks
2284 	   before we try again to grant this one. */
2285 
2286 	if (is_demoted(lkb)) {
2287 		grant_pending_convert(r, DLM_LOCK_IV, NULL);
2288 		if (_can_be_granted(r, lkb, 1)) {
2289 			grant_lock(r, lkb);
2290 			queue_cast(r, lkb, 0);
2291 			grant_pending_locks(r);
2292 			goto out;
2293 		}
2294 		/* else fall through and move to convert queue */
2295 	}
2296 
2297 	if (can_be_queued(lkb)) {
2298 		error = -EINPROGRESS;
2299 		del_lkb(r, lkb);
2300 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2301 		send_blocking_asts(r, lkb);
2302 		add_timeout(lkb);
2303 		goto out;
2304 	}
2305 
2306 	error = -EAGAIN;
2307 	if (force_blocking_asts(lkb))
2308 		send_blocking_asts_all(r, lkb);
2309 	queue_cast(r, lkb, -EAGAIN);
2310 
2311  out:
2312 	return error;
2313 }
2314 
2315 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2316 {
2317 	remove_lock(r, lkb);
2318 	queue_cast(r, lkb, -DLM_EUNLOCK);
2319 	grant_pending_locks(r);
2320 	return -DLM_EUNLOCK;
2321 }
2322 
2323 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2324 
2325 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2326 {
2327 	int error;
2328 
2329 	error = revert_lock(r, lkb);
2330 	if (error) {
2331 		queue_cast(r, lkb, -DLM_ECANCEL);
2332 		grant_pending_locks(r);
2333 		return -DLM_ECANCEL;
2334 	}
2335 	return 0;
2336 }
2337 
2338 /*
2339  * Four stage 3 varieties:
2340  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2341  */
2342 
2343 /* add a new lkb to a possibly new rsb, called by requesting process */
2344 
2345 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2346 {
2347 	int error;
2348 
2349 	/* set_master: sets lkb nodeid from r */
2350 
2351 	error = set_master(r, lkb);
2352 	if (error < 0)
2353 		goto out;
2354 	if (error) {
2355 		error = 0;
2356 		goto out;
2357 	}
2358 
2359 	if (is_remote(r))
2360 		/* receive_request() calls do_request() on remote node */
2361 		error = send_request(r, lkb);
2362 	else
2363 		error = do_request(r, lkb);
2364  out:
2365 	return error;
2366 }
2367 
2368 /* change some property of an existing lkb, e.g. mode */
2369 
2370 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2371 {
2372 	int error;
2373 
2374 	if (is_remote(r))
2375 		/* receive_convert() calls do_convert() on remote node */
2376 		error = send_convert(r, lkb);
2377 	else
2378 		error = do_convert(r, lkb);
2379 
2380 	return error;
2381 }
2382 
2383 /* remove an existing lkb from the granted queue */
2384 
2385 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2386 {
2387 	int error;
2388 
2389 	if (is_remote(r))
2390 		/* receive_unlock() calls do_unlock() on remote node */
2391 		error = send_unlock(r, lkb);
2392 	else
2393 		error = do_unlock(r, lkb);
2394 
2395 	return error;
2396 }
2397 
2398 /* remove an existing lkb from the convert or wait queue */
2399 
2400 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2401 {
2402 	int error;
2403 
2404 	if (is_remote(r))
2405 		/* receive_cancel() calls do_cancel() on remote node */
2406 		error = send_cancel(r, lkb);
2407 	else
2408 		error = do_cancel(r, lkb);
2409 
2410 	return error;
2411 }
2412 
2413 /*
2414  * Four stage 2 varieties:
2415  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2416  */
2417 
2418 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2419 			int len, struct dlm_args *args)
2420 {
2421 	struct dlm_rsb *r;
2422 	int error;
2423 
2424 	error = validate_lock_args(ls, lkb, args);
2425 	if (error)
2426 		goto out;
2427 
2428 	error = find_rsb(ls, name, len, R_CREATE, &r);
2429 	if (error)
2430 		goto out;
2431 
2432 	lock_rsb(r);
2433 
2434 	attach_lkb(r, lkb);
2435 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2436 
2437 	error = _request_lock(r, lkb);
2438 
2439 	unlock_rsb(r);
2440 	put_rsb(r);
2441 
2442  out:
2443 	return error;
2444 }
2445 
2446 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2447 			struct dlm_args *args)
2448 {
2449 	struct dlm_rsb *r;
2450 	int error;
2451 
2452 	r = lkb->lkb_resource;
2453 
2454 	hold_rsb(r);
2455 	lock_rsb(r);
2456 
2457 	error = validate_lock_args(ls, lkb, args);
2458 	if (error)
2459 		goto out;
2460 
2461 	error = _convert_lock(r, lkb);
2462  out:
2463 	unlock_rsb(r);
2464 	put_rsb(r);
2465 	return error;
2466 }
2467 
2468 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2469 		       struct dlm_args *args)
2470 {
2471 	struct dlm_rsb *r;
2472 	int error;
2473 
2474 	r = lkb->lkb_resource;
2475 
2476 	hold_rsb(r);
2477 	lock_rsb(r);
2478 
2479 	error = validate_unlock_args(lkb, args);
2480 	if (error)
2481 		goto out;
2482 
2483 	error = _unlock_lock(r, lkb);
2484  out:
2485 	unlock_rsb(r);
2486 	put_rsb(r);
2487 	return error;
2488 }
2489 
2490 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2491 		       struct dlm_args *args)
2492 {
2493 	struct dlm_rsb *r;
2494 	int error;
2495 
2496 	r = lkb->lkb_resource;
2497 
2498 	hold_rsb(r);
2499 	lock_rsb(r);
2500 
2501 	error = validate_unlock_args(lkb, args);
2502 	if (error)
2503 		goto out;
2504 
2505 	error = _cancel_lock(r, lkb);
2506  out:
2507 	unlock_rsb(r);
2508 	put_rsb(r);
2509 	return error;
2510 }
2511 
2512 /*
2513  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2514  */
2515 
2516 int dlm_lock(dlm_lockspace_t *lockspace,
2517 	     int mode,
2518 	     struct dlm_lksb *lksb,
2519 	     uint32_t flags,
2520 	     void *name,
2521 	     unsigned int namelen,
2522 	     uint32_t parent_lkid,
2523 	     void (*ast) (void *astarg),
2524 	     void *astarg,
2525 	     void (*bast) (void *astarg, int mode))
2526 {
2527 	struct dlm_ls *ls;
2528 	struct dlm_lkb *lkb;
2529 	struct dlm_args args;
2530 	int error, convert = flags & DLM_LKF_CONVERT;
2531 
2532 	ls = dlm_find_lockspace_local(lockspace);
2533 	if (!ls)
2534 		return -EINVAL;
2535 
2536 	dlm_lock_recovery(ls);
2537 
2538 	if (convert)
2539 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
2540 	else
2541 		error = create_lkb(ls, &lkb);
2542 
2543 	if (error)
2544 		goto out;
2545 
2546 	error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2547 			      astarg, bast, &args);
2548 	if (error)
2549 		goto out_put;
2550 
2551 	if (convert)
2552 		error = convert_lock(ls, lkb, &args);
2553 	else
2554 		error = request_lock(ls, lkb, name, namelen, &args);
2555 
2556 	if (error == -EINPROGRESS)
2557 		error = 0;
2558  out_put:
2559 	if (convert || error)
2560 		__put_lkb(ls, lkb);
2561 	if (error == -EAGAIN || error == -EDEADLK)
2562 		error = 0;
2563  out:
2564 	dlm_unlock_recovery(ls);
2565 	dlm_put_lockspace(ls);
2566 	return error;
2567 }
2568 
2569 int dlm_unlock(dlm_lockspace_t *lockspace,
2570 	       uint32_t lkid,
2571 	       uint32_t flags,
2572 	       struct dlm_lksb *lksb,
2573 	       void *astarg)
2574 {
2575 	struct dlm_ls *ls;
2576 	struct dlm_lkb *lkb;
2577 	struct dlm_args args;
2578 	int error;
2579 
2580 	ls = dlm_find_lockspace_local(lockspace);
2581 	if (!ls)
2582 		return -EINVAL;
2583 
2584 	dlm_lock_recovery(ls);
2585 
2586 	error = find_lkb(ls, lkid, &lkb);
2587 	if (error)
2588 		goto out;
2589 
2590 	error = set_unlock_args(flags, astarg, &args);
2591 	if (error)
2592 		goto out_put;
2593 
2594 	if (flags & DLM_LKF_CANCEL)
2595 		error = cancel_lock(ls, lkb, &args);
2596 	else
2597 		error = unlock_lock(ls, lkb, &args);
2598 
2599 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2600 		error = 0;
2601 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2602 		error = 0;
2603  out_put:
2604 	dlm_put_lkb(lkb);
2605  out:
2606 	dlm_unlock_recovery(ls);
2607 	dlm_put_lockspace(ls);
2608 	return error;
2609 }
2610 
2611 /*
2612  * send/receive routines for remote operations and replies
2613  *
2614  * send_args
2615  * send_common
2616  * send_request			receive_request
2617  * send_convert			receive_convert
2618  * send_unlock			receive_unlock
2619  * send_cancel			receive_cancel
2620  * send_grant			receive_grant
2621  * send_bast			receive_bast
2622  * send_lookup			receive_lookup
2623  * send_remove			receive_remove
2624  *
2625  * 				send_common_reply
2626  * receive_request_reply	send_request_reply
2627  * receive_convert_reply	send_convert_reply
2628  * receive_unlock_reply		send_unlock_reply
2629  * receive_cancel_reply		send_cancel_reply
2630  * receive_lookup_reply		send_lookup_reply
2631  */
2632 
2633 static int _create_message(struct dlm_ls *ls, int mb_len,
2634 			   int to_nodeid, int mstype,
2635 			   struct dlm_message **ms_ret,
2636 			   struct dlm_mhandle **mh_ret)
2637 {
2638 	struct dlm_message *ms;
2639 	struct dlm_mhandle *mh;
2640 	char *mb;
2641 
2642 	/* get_buffer gives us a message handle (mh) that we need to
2643 	   pass into lowcomms_commit and a message buffer (mb) that we
2644 	   write our data into */
2645 
2646 	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
2647 	if (!mh)
2648 		return -ENOBUFS;
2649 
2650 	memset(mb, 0, mb_len);
2651 
2652 	ms = (struct dlm_message *) mb;
2653 
2654 	ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2655 	ms->m_header.h_lockspace = ls->ls_global_id;
2656 	ms->m_header.h_nodeid = dlm_our_nodeid();
2657 	ms->m_header.h_length = mb_len;
2658 	ms->m_header.h_cmd = DLM_MSG;
2659 
2660 	ms->m_type = mstype;
2661 
2662 	*mh_ret = mh;
2663 	*ms_ret = ms;
2664 	return 0;
2665 }
2666 
2667 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2668 			  int to_nodeid, int mstype,
2669 			  struct dlm_message **ms_ret,
2670 			  struct dlm_mhandle **mh_ret)
2671 {
2672 	int mb_len = sizeof(struct dlm_message);
2673 
2674 	switch (mstype) {
2675 	case DLM_MSG_REQUEST:
2676 	case DLM_MSG_LOOKUP:
2677 	case DLM_MSG_REMOVE:
2678 		mb_len += r->res_length;
2679 		break;
2680 	case DLM_MSG_CONVERT:
2681 	case DLM_MSG_UNLOCK:
2682 	case DLM_MSG_REQUEST_REPLY:
2683 	case DLM_MSG_CONVERT_REPLY:
2684 	case DLM_MSG_GRANT:
2685 		if (lkb && lkb->lkb_lvbptr)
2686 			mb_len += r->res_ls->ls_lvblen;
2687 		break;
2688 	}
2689 
2690 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2691 			       ms_ret, mh_ret);
2692 }
2693 
2694 /* further lowcomms enhancements or alternate implementations may make
2695    the return value from this function useful at some point */
2696 
2697 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2698 {
2699 	dlm_message_out(ms);
2700 	dlm_lowcomms_commit_buffer(mh);
2701 	return 0;
2702 }
2703 
2704 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2705 		      struct dlm_message *ms)
2706 {
2707 	ms->m_nodeid   = lkb->lkb_nodeid;
2708 	ms->m_pid      = lkb->lkb_ownpid;
2709 	ms->m_lkid     = lkb->lkb_id;
2710 	ms->m_remid    = lkb->lkb_remid;
2711 	ms->m_exflags  = lkb->lkb_exflags;
2712 	ms->m_sbflags  = lkb->lkb_sbflags;
2713 	ms->m_flags    = lkb->lkb_flags;
2714 	ms->m_lvbseq   = lkb->lkb_lvbseq;
2715 	ms->m_status   = lkb->lkb_status;
2716 	ms->m_grmode   = lkb->lkb_grmode;
2717 	ms->m_rqmode   = lkb->lkb_rqmode;
2718 	ms->m_hash     = r->res_hash;
2719 
2720 	/* m_result and m_bastmode are set from function args,
2721 	   not from lkb fields */
2722 
2723 	if (lkb->lkb_bastfn)
2724 		ms->m_asts |= AST_BAST;
2725 	if (lkb->lkb_astfn)
2726 		ms->m_asts |= AST_COMP;
2727 
2728 	/* compare with switch in create_message; send_remove() doesn't
2729 	   use send_args() */
2730 
2731 	switch (ms->m_type) {
2732 	case DLM_MSG_REQUEST:
2733 	case DLM_MSG_LOOKUP:
2734 		memcpy(ms->m_extra, r->res_name, r->res_length);
2735 		break;
2736 	case DLM_MSG_CONVERT:
2737 	case DLM_MSG_UNLOCK:
2738 	case DLM_MSG_REQUEST_REPLY:
2739 	case DLM_MSG_CONVERT_REPLY:
2740 	case DLM_MSG_GRANT:
2741 		if (!lkb->lkb_lvbptr)
2742 			break;
2743 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2744 		break;
2745 	}
2746 }
2747 
2748 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2749 {
2750 	struct dlm_message *ms;
2751 	struct dlm_mhandle *mh;
2752 	int to_nodeid, error;
2753 
2754 	error = add_to_waiters(lkb, mstype);
2755 	if (error)
2756 		return error;
2757 
2758 	to_nodeid = r->res_nodeid;
2759 
2760 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2761 	if (error)
2762 		goto fail;
2763 
2764 	send_args(r, lkb, ms);
2765 
2766 	error = send_message(mh, ms);
2767 	if (error)
2768 		goto fail;
2769 	return 0;
2770 
2771  fail:
2772 	remove_from_waiters(lkb, msg_reply_type(mstype));
2773 	return error;
2774 }
2775 
2776 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2777 {
2778 	return send_common(r, lkb, DLM_MSG_REQUEST);
2779 }
2780 
2781 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2782 {
2783 	int error;
2784 
2785 	error = send_common(r, lkb, DLM_MSG_CONVERT);
2786 
2787 	/* down conversions go without a reply from the master */
2788 	if (!error && down_conversion(lkb)) {
2789 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2790 		r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2791 		r->res_ls->ls_stub_ms.m_result = 0;
2792 		r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2793 		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2794 	}
2795 
2796 	return error;
2797 }
2798 
2799 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2800    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2801    that the master is still correct. */
2802 
2803 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2804 {
2805 	return send_common(r, lkb, DLM_MSG_UNLOCK);
2806 }
2807 
2808 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2809 {
2810 	return send_common(r, lkb, DLM_MSG_CANCEL);
2811 }
2812 
2813 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2814 {
2815 	struct dlm_message *ms;
2816 	struct dlm_mhandle *mh;
2817 	int to_nodeid, error;
2818 
2819 	to_nodeid = lkb->lkb_nodeid;
2820 
2821 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2822 	if (error)
2823 		goto out;
2824 
2825 	send_args(r, lkb, ms);
2826 
2827 	ms->m_result = 0;
2828 
2829 	error = send_message(mh, ms);
2830  out:
2831 	return error;
2832 }
2833 
2834 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2835 {
2836 	struct dlm_message *ms;
2837 	struct dlm_mhandle *mh;
2838 	int to_nodeid, error;
2839 
2840 	to_nodeid = lkb->lkb_nodeid;
2841 
2842 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2843 	if (error)
2844 		goto out;
2845 
2846 	send_args(r, lkb, ms);
2847 
2848 	ms->m_bastmode = mode;
2849 
2850 	error = send_message(mh, ms);
2851  out:
2852 	return error;
2853 }
2854 
2855 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2856 {
2857 	struct dlm_message *ms;
2858 	struct dlm_mhandle *mh;
2859 	int to_nodeid, error;
2860 
2861 	error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2862 	if (error)
2863 		return error;
2864 
2865 	to_nodeid = dlm_dir_nodeid(r);
2866 
2867 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2868 	if (error)
2869 		goto fail;
2870 
2871 	send_args(r, lkb, ms);
2872 
2873 	error = send_message(mh, ms);
2874 	if (error)
2875 		goto fail;
2876 	return 0;
2877 
2878  fail:
2879 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2880 	return error;
2881 }
2882 
2883 static int send_remove(struct dlm_rsb *r)
2884 {
2885 	struct dlm_message *ms;
2886 	struct dlm_mhandle *mh;
2887 	int to_nodeid, error;
2888 
2889 	to_nodeid = dlm_dir_nodeid(r);
2890 
2891 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2892 	if (error)
2893 		goto out;
2894 
2895 	memcpy(ms->m_extra, r->res_name, r->res_length);
2896 	ms->m_hash = r->res_hash;
2897 
2898 	error = send_message(mh, ms);
2899  out:
2900 	return error;
2901 }
2902 
2903 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2904 			     int mstype, int rv)
2905 {
2906 	struct dlm_message *ms;
2907 	struct dlm_mhandle *mh;
2908 	int to_nodeid, error;
2909 
2910 	to_nodeid = lkb->lkb_nodeid;
2911 
2912 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2913 	if (error)
2914 		goto out;
2915 
2916 	send_args(r, lkb, ms);
2917 
2918 	ms->m_result = rv;
2919 
2920 	error = send_message(mh, ms);
2921  out:
2922 	return error;
2923 }
2924 
2925 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2926 {
2927 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2928 }
2929 
2930 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2931 {
2932 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2933 }
2934 
2935 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2936 {
2937 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2938 }
2939 
2940 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2941 {
2942 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2943 }
2944 
2945 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2946 			     int ret_nodeid, int rv)
2947 {
2948 	struct dlm_rsb *r = &ls->ls_stub_rsb;
2949 	struct dlm_message *ms;
2950 	struct dlm_mhandle *mh;
2951 	int error, nodeid = ms_in->m_header.h_nodeid;
2952 
2953 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2954 	if (error)
2955 		goto out;
2956 
2957 	ms->m_lkid = ms_in->m_lkid;
2958 	ms->m_result = rv;
2959 	ms->m_nodeid = ret_nodeid;
2960 
2961 	error = send_message(mh, ms);
2962  out:
2963 	return error;
2964 }
2965 
2966 /* which args we save from a received message depends heavily on the type
2967    of message, unlike the send side where we can safely send everything about
2968    the lkb for any type of message */
2969 
2970 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2971 {
2972 	lkb->lkb_exflags = ms->m_exflags;
2973 	lkb->lkb_sbflags = ms->m_sbflags;
2974 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2975 		         (ms->m_flags & 0x0000FFFF);
2976 }
2977 
2978 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2979 {
2980 	lkb->lkb_sbflags = ms->m_sbflags;
2981 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2982 		         (ms->m_flags & 0x0000FFFF);
2983 }
2984 
2985 static int receive_extralen(struct dlm_message *ms)
2986 {
2987 	return (ms->m_header.h_length - sizeof(struct dlm_message));
2988 }
2989 
2990 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2991 		       struct dlm_message *ms)
2992 {
2993 	int len;
2994 
2995 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2996 		if (!lkb->lkb_lvbptr)
2997 			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
2998 		if (!lkb->lkb_lvbptr)
2999 			return -ENOMEM;
3000 		len = receive_extralen(ms);
3001 		if (len > DLM_RESNAME_MAXLEN)
3002 			len = DLM_RESNAME_MAXLEN;
3003 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3004 	}
3005 	return 0;
3006 }
3007 
3008 static void fake_bastfn(void *astparam, int mode)
3009 {
3010 	log_print("fake_bastfn should not be called");
3011 }
3012 
3013 static void fake_astfn(void *astparam)
3014 {
3015 	log_print("fake_astfn should not be called");
3016 }
3017 
3018 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3019 				struct dlm_message *ms)
3020 {
3021 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3022 	lkb->lkb_ownpid = ms->m_pid;
3023 	lkb->lkb_remid = ms->m_lkid;
3024 	lkb->lkb_grmode = DLM_LOCK_IV;
3025 	lkb->lkb_rqmode = ms->m_rqmode;
3026 
3027 	lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3028 	lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3029 
3030 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3031 		/* lkb was just created so there won't be an lvb yet */
3032 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3033 		if (!lkb->lkb_lvbptr)
3034 			return -ENOMEM;
3035 	}
3036 
3037 	return 0;
3038 }
3039 
3040 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3041 				struct dlm_message *ms)
3042 {
3043 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3044 		return -EBUSY;
3045 
3046 	if (receive_lvb(ls, lkb, ms))
3047 		return -ENOMEM;
3048 
3049 	lkb->lkb_rqmode = ms->m_rqmode;
3050 	lkb->lkb_lvbseq = ms->m_lvbseq;
3051 
3052 	return 0;
3053 }
3054 
3055 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3056 			       struct dlm_message *ms)
3057 {
3058 	if (receive_lvb(ls, lkb, ms))
3059 		return -ENOMEM;
3060 	return 0;
3061 }
3062 
3063 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3064    uses to send a reply and that the remote end uses to process the reply. */
3065 
3066 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3067 {
3068 	struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3069 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3070 	lkb->lkb_remid = ms->m_lkid;
3071 }
3072 
3073 /* This is called after the rsb is locked so that we can safely inspect
3074    fields in the lkb. */
3075 
3076 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3077 {
3078 	int from = ms->m_header.h_nodeid;
3079 	int error = 0;
3080 
3081 	switch (ms->m_type) {
3082 	case DLM_MSG_CONVERT:
3083 	case DLM_MSG_UNLOCK:
3084 	case DLM_MSG_CANCEL:
3085 		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3086 			error = -EINVAL;
3087 		break;
3088 
3089 	case DLM_MSG_CONVERT_REPLY:
3090 	case DLM_MSG_UNLOCK_REPLY:
3091 	case DLM_MSG_CANCEL_REPLY:
3092 	case DLM_MSG_GRANT:
3093 	case DLM_MSG_BAST:
3094 		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3095 			error = -EINVAL;
3096 		break;
3097 
3098 	case DLM_MSG_REQUEST_REPLY:
3099 		if (!is_process_copy(lkb))
3100 			error = -EINVAL;
3101 		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3102 			error = -EINVAL;
3103 		break;
3104 
3105 	default:
3106 		error = -EINVAL;
3107 	}
3108 
3109 	if (error)
3110 		log_error(lkb->lkb_resource->res_ls,
3111 			  "ignore invalid message %d from %d %x %x %x %d",
3112 			  ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3113 			  lkb->lkb_flags, lkb->lkb_nodeid);
3114 	return error;
3115 }
3116 
3117 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3118 {
3119 	struct dlm_lkb *lkb;
3120 	struct dlm_rsb *r;
3121 	int error, namelen;
3122 
3123 	error = create_lkb(ls, &lkb);
3124 	if (error)
3125 		goto fail;
3126 
3127 	receive_flags(lkb, ms);
3128 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
3129 	error = receive_request_args(ls, lkb, ms);
3130 	if (error) {
3131 		__put_lkb(ls, lkb);
3132 		goto fail;
3133 	}
3134 
3135 	namelen = receive_extralen(ms);
3136 
3137 	error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3138 	if (error) {
3139 		__put_lkb(ls, lkb);
3140 		goto fail;
3141 	}
3142 
3143 	lock_rsb(r);
3144 
3145 	attach_lkb(r, lkb);
3146 	error = do_request(r, lkb);
3147 	send_request_reply(r, lkb, error);
3148 
3149 	unlock_rsb(r);
3150 	put_rsb(r);
3151 
3152 	if (error == -EINPROGRESS)
3153 		error = 0;
3154 	if (error)
3155 		dlm_put_lkb(lkb);
3156 	return;
3157 
3158  fail:
3159 	setup_stub_lkb(ls, ms);
3160 	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3161 }
3162 
3163 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3164 {
3165 	struct dlm_lkb *lkb;
3166 	struct dlm_rsb *r;
3167 	int error, reply = 1;
3168 
3169 	error = find_lkb(ls, ms->m_remid, &lkb);
3170 	if (error)
3171 		goto fail;
3172 
3173 	r = lkb->lkb_resource;
3174 
3175 	hold_rsb(r);
3176 	lock_rsb(r);
3177 
3178 	error = validate_message(lkb, ms);
3179 	if (error)
3180 		goto out;
3181 
3182 	receive_flags(lkb, ms);
3183 	error = receive_convert_args(ls, lkb, ms);
3184 	if (error)
3185 		goto out_reply;
3186 	reply = !down_conversion(lkb);
3187 
3188 	error = do_convert(r, lkb);
3189  out_reply:
3190 	if (reply)
3191 		send_convert_reply(r, lkb, error);
3192  out:
3193 	unlock_rsb(r);
3194 	put_rsb(r);
3195 	dlm_put_lkb(lkb);
3196 	return;
3197 
3198  fail:
3199 	setup_stub_lkb(ls, ms);
3200 	send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3201 }
3202 
3203 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3204 {
3205 	struct dlm_lkb *lkb;
3206 	struct dlm_rsb *r;
3207 	int error;
3208 
3209 	error = find_lkb(ls, ms->m_remid, &lkb);
3210 	if (error)
3211 		goto fail;
3212 
3213 	r = lkb->lkb_resource;
3214 
3215 	hold_rsb(r);
3216 	lock_rsb(r);
3217 
3218 	error = validate_message(lkb, ms);
3219 	if (error)
3220 		goto out;
3221 
3222 	receive_flags(lkb, ms);
3223 	error = receive_unlock_args(ls, lkb, ms);
3224 	if (error)
3225 		goto out_reply;
3226 
3227 	error = do_unlock(r, lkb);
3228  out_reply:
3229 	send_unlock_reply(r, lkb, error);
3230  out:
3231 	unlock_rsb(r);
3232 	put_rsb(r);
3233 	dlm_put_lkb(lkb);
3234 	return;
3235 
3236  fail:
3237 	setup_stub_lkb(ls, ms);
3238 	send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3239 }
3240 
3241 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3242 {
3243 	struct dlm_lkb *lkb;
3244 	struct dlm_rsb *r;
3245 	int error;
3246 
3247 	error = find_lkb(ls, ms->m_remid, &lkb);
3248 	if (error)
3249 		goto fail;
3250 
3251 	receive_flags(lkb, ms);
3252 
3253 	r = lkb->lkb_resource;
3254 
3255 	hold_rsb(r);
3256 	lock_rsb(r);
3257 
3258 	error = validate_message(lkb, ms);
3259 	if (error)
3260 		goto out;
3261 
3262 	error = do_cancel(r, lkb);
3263 	send_cancel_reply(r, lkb, error);
3264  out:
3265 	unlock_rsb(r);
3266 	put_rsb(r);
3267 	dlm_put_lkb(lkb);
3268 	return;
3269 
3270  fail:
3271 	setup_stub_lkb(ls, ms);
3272 	send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3273 }
3274 
3275 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3276 {
3277 	struct dlm_lkb *lkb;
3278 	struct dlm_rsb *r;
3279 	int error;
3280 
3281 	error = find_lkb(ls, ms->m_remid, &lkb);
3282 	if (error) {
3283 		log_debug(ls, "receive_grant from %d no lkb %x",
3284 			  ms->m_header.h_nodeid, ms->m_remid);
3285 		return;
3286 	}
3287 
3288 	r = lkb->lkb_resource;
3289 
3290 	hold_rsb(r);
3291 	lock_rsb(r);
3292 
3293 	error = validate_message(lkb, ms);
3294 	if (error)
3295 		goto out;
3296 
3297 	receive_flags_reply(lkb, ms);
3298 	if (is_altmode(lkb))
3299 		munge_altmode(lkb, ms);
3300 	grant_lock_pc(r, lkb, ms);
3301 	queue_cast(r, lkb, 0);
3302  out:
3303 	unlock_rsb(r);
3304 	put_rsb(r);
3305 	dlm_put_lkb(lkb);
3306 }
3307 
3308 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3309 {
3310 	struct dlm_lkb *lkb;
3311 	struct dlm_rsb *r;
3312 	int error;
3313 
3314 	error = find_lkb(ls, ms->m_remid, &lkb);
3315 	if (error) {
3316 		log_debug(ls, "receive_bast from %d no lkb %x",
3317 			  ms->m_header.h_nodeid, ms->m_remid);
3318 		return;
3319 	}
3320 
3321 	r = lkb->lkb_resource;
3322 
3323 	hold_rsb(r);
3324 	lock_rsb(r);
3325 
3326 	error = validate_message(lkb, ms);
3327 	if (error)
3328 		goto out;
3329 
3330 	queue_bast(r, lkb, ms->m_bastmode);
3331  out:
3332 	unlock_rsb(r);
3333 	put_rsb(r);
3334 	dlm_put_lkb(lkb);
3335 }
3336 
3337 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3338 {
3339 	int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3340 
3341 	from_nodeid = ms->m_header.h_nodeid;
3342 	our_nodeid = dlm_our_nodeid();
3343 
3344 	len = receive_extralen(ms);
3345 
3346 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3347 	if (dir_nodeid != our_nodeid) {
3348 		log_error(ls, "lookup dir_nodeid %d from %d",
3349 			  dir_nodeid, from_nodeid);
3350 		error = -EINVAL;
3351 		ret_nodeid = -1;
3352 		goto out;
3353 	}
3354 
3355 	error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3356 
3357 	/* Optimization: we're master so treat lookup as a request */
3358 	if (!error && ret_nodeid == our_nodeid) {
3359 		receive_request(ls, ms);
3360 		return;
3361 	}
3362  out:
3363 	send_lookup_reply(ls, ms, ret_nodeid, error);
3364 }
3365 
3366 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3367 {
3368 	int len, dir_nodeid, from_nodeid;
3369 
3370 	from_nodeid = ms->m_header.h_nodeid;
3371 
3372 	len = receive_extralen(ms);
3373 
3374 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3375 	if (dir_nodeid != dlm_our_nodeid()) {
3376 		log_error(ls, "remove dir entry dir_nodeid %d from %d",
3377 			  dir_nodeid, from_nodeid);
3378 		return;
3379 	}
3380 
3381 	dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3382 }
3383 
3384 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3385 {
3386 	do_purge(ls, ms->m_nodeid, ms->m_pid);
3387 }
3388 
3389 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3390 {
3391 	struct dlm_lkb *lkb;
3392 	struct dlm_rsb *r;
3393 	int error, mstype, result;
3394 
3395 	error = find_lkb(ls, ms->m_remid, &lkb);
3396 	if (error) {
3397 		log_debug(ls, "receive_request_reply from %d no lkb %x",
3398 			  ms->m_header.h_nodeid, ms->m_remid);
3399 		return;
3400 	}
3401 
3402 	r = lkb->lkb_resource;
3403 	hold_rsb(r);
3404 	lock_rsb(r);
3405 
3406 	error = validate_message(lkb, ms);
3407 	if (error)
3408 		goto out;
3409 
3410 	mstype = lkb->lkb_wait_type;
3411 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3412 	if (error)
3413 		goto out;
3414 
3415 	/* Optimization: the dir node was also the master, so it took our
3416 	   lookup as a request and sent request reply instead of lookup reply */
3417 	if (mstype == DLM_MSG_LOOKUP) {
3418 		r->res_nodeid = ms->m_header.h_nodeid;
3419 		lkb->lkb_nodeid = r->res_nodeid;
3420 	}
3421 
3422 	/* this is the value returned from do_request() on the master */
3423 	result = ms->m_result;
3424 
3425 	switch (result) {
3426 	case -EAGAIN:
3427 		/* request would block (be queued) on remote master */
3428 		queue_cast(r, lkb, -EAGAIN);
3429 		confirm_master(r, -EAGAIN);
3430 		unhold_lkb(lkb); /* undoes create_lkb() */
3431 		break;
3432 
3433 	case -EINPROGRESS:
3434 	case 0:
3435 		/* request was queued or granted on remote master */
3436 		receive_flags_reply(lkb, ms);
3437 		lkb->lkb_remid = ms->m_lkid;
3438 		if (is_altmode(lkb))
3439 			munge_altmode(lkb, ms);
3440 		if (result) {
3441 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
3442 			add_timeout(lkb);
3443 		} else {
3444 			grant_lock_pc(r, lkb, ms);
3445 			queue_cast(r, lkb, 0);
3446 		}
3447 		confirm_master(r, result);
3448 		break;
3449 
3450 	case -EBADR:
3451 	case -ENOTBLK:
3452 		/* find_rsb failed to find rsb or rsb wasn't master */
3453 		log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3454 			  lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3455 		r->res_nodeid = -1;
3456 		lkb->lkb_nodeid = -1;
3457 
3458 		if (is_overlap(lkb)) {
3459 			/* we'll ignore error in cancel/unlock reply */
3460 			queue_cast_overlap(r, lkb);
3461 			confirm_master(r, result);
3462 			unhold_lkb(lkb); /* undoes create_lkb() */
3463 		} else
3464 			_request_lock(r, lkb);
3465 		break;
3466 
3467 	default:
3468 		log_error(ls, "receive_request_reply %x error %d",
3469 			  lkb->lkb_id, result);
3470 	}
3471 
3472 	if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3473 		log_debug(ls, "receive_request_reply %x result %d unlock",
3474 			  lkb->lkb_id, result);
3475 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3476 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3477 		send_unlock(r, lkb);
3478 	} else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3479 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3480 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3481 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3482 		send_cancel(r, lkb);
3483 	} else {
3484 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3485 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3486 	}
3487  out:
3488 	unlock_rsb(r);
3489 	put_rsb(r);
3490 	dlm_put_lkb(lkb);
3491 }
3492 
3493 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3494 				    struct dlm_message *ms)
3495 {
3496 	/* this is the value returned from do_convert() on the master */
3497 	switch (ms->m_result) {
3498 	case -EAGAIN:
3499 		/* convert would block (be queued) on remote master */
3500 		queue_cast(r, lkb, -EAGAIN);
3501 		break;
3502 
3503 	case -EDEADLK:
3504 		receive_flags_reply(lkb, ms);
3505 		revert_lock_pc(r, lkb);
3506 		queue_cast(r, lkb, -EDEADLK);
3507 		break;
3508 
3509 	case -EINPROGRESS:
3510 		/* convert was queued on remote master */
3511 		receive_flags_reply(lkb, ms);
3512 		if (is_demoted(lkb))
3513 			munge_demoted(lkb, ms);
3514 		del_lkb(r, lkb);
3515 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3516 		add_timeout(lkb);
3517 		break;
3518 
3519 	case 0:
3520 		/* convert was granted on remote master */
3521 		receive_flags_reply(lkb, ms);
3522 		if (is_demoted(lkb))
3523 			munge_demoted(lkb, ms);
3524 		grant_lock_pc(r, lkb, ms);
3525 		queue_cast(r, lkb, 0);
3526 		break;
3527 
3528 	default:
3529 		log_error(r->res_ls, "receive_convert_reply %x error %d",
3530 			  lkb->lkb_id, ms->m_result);
3531 	}
3532 }
3533 
3534 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3535 {
3536 	struct dlm_rsb *r = lkb->lkb_resource;
3537 	int error;
3538 
3539 	hold_rsb(r);
3540 	lock_rsb(r);
3541 
3542 	error = validate_message(lkb, ms);
3543 	if (error)
3544 		goto out;
3545 
3546 	/* stub reply can happen with waiters_mutex held */
3547 	error = remove_from_waiters_ms(lkb, ms);
3548 	if (error)
3549 		goto out;
3550 
3551 	__receive_convert_reply(r, lkb, ms);
3552  out:
3553 	unlock_rsb(r);
3554 	put_rsb(r);
3555 }
3556 
3557 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3558 {
3559 	struct dlm_lkb *lkb;
3560 	int error;
3561 
3562 	error = find_lkb(ls, ms->m_remid, &lkb);
3563 	if (error) {
3564 		log_debug(ls, "receive_convert_reply from %d no lkb %x",
3565 			  ms->m_header.h_nodeid, ms->m_remid);
3566 		return;
3567 	}
3568 
3569 	_receive_convert_reply(lkb, ms);
3570 	dlm_put_lkb(lkb);
3571 }
3572 
3573 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3574 {
3575 	struct dlm_rsb *r = lkb->lkb_resource;
3576 	int error;
3577 
3578 	hold_rsb(r);
3579 	lock_rsb(r);
3580 
3581 	error = validate_message(lkb, ms);
3582 	if (error)
3583 		goto out;
3584 
3585 	/* stub reply can happen with waiters_mutex held */
3586 	error = remove_from_waiters_ms(lkb, ms);
3587 	if (error)
3588 		goto out;
3589 
3590 	/* this is the value returned from do_unlock() on the master */
3591 
3592 	switch (ms->m_result) {
3593 	case -DLM_EUNLOCK:
3594 		receive_flags_reply(lkb, ms);
3595 		remove_lock_pc(r, lkb);
3596 		queue_cast(r, lkb, -DLM_EUNLOCK);
3597 		break;
3598 	case -ENOENT:
3599 		break;
3600 	default:
3601 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
3602 			  lkb->lkb_id, ms->m_result);
3603 	}
3604  out:
3605 	unlock_rsb(r);
3606 	put_rsb(r);
3607 }
3608 
3609 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3610 {
3611 	struct dlm_lkb *lkb;
3612 	int error;
3613 
3614 	error = find_lkb(ls, ms->m_remid, &lkb);
3615 	if (error) {
3616 		log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3617 			  ms->m_header.h_nodeid, ms->m_remid);
3618 		return;
3619 	}
3620 
3621 	_receive_unlock_reply(lkb, ms);
3622 	dlm_put_lkb(lkb);
3623 }
3624 
3625 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3626 {
3627 	struct dlm_rsb *r = lkb->lkb_resource;
3628 	int error;
3629 
3630 	hold_rsb(r);
3631 	lock_rsb(r);
3632 
3633 	error = validate_message(lkb, ms);
3634 	if (error)
3635 		goto out;
3636 
3637 	/* stub reply can happen with waiters_mutex held */
3638 	error = remove_from_waiters_ms(lkb, ms);
3639 	if (error)
3640 		goto out;
3641 
3642 	/* this is the value returned from do_cancel() on the master */
3643 
3644 	switch (ms->m_result) {
3645 	case -DLM_ECANCEL:
3646 		receive_flags_reply(lkb, ms);
3647 		revert_lock_pc(r, lkb);
3648 		queue_cast(r, lkb, -DLM_ECANCEL);
3649 		break;
3650 	case 0:
3651 		break;
3652 	default:
3653 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
3654 			  lkb->lkb_id, ms->m_result);
3655 	}
3656  out:
3657 	unlock_rsb(r);
3658 	put_rsb(r);
3659 }
3660 
3661 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3662 {
3663 	struct dlm_lkb *lkb;
3664 	int error;
3665 
3666 	error = find_lkb(ls, ms->m_remid, &lkb);
3667 	if (error) {
3668 		log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3669 			  ms->m_header.h_nodeid, ms->m_remid);
3670 		return;
3671 	}
3672 
3673 	_receive_cancel_reply(lkb, ms);
3674 	dlm_put_lkb(lkb);
3675 }
3676 
3677 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3678 {
3679 	struct dlm_lkb *lkb;
3680 	struct dlm_rsb *r;
3681 	int error, ret_nodeid;
3682 
3683 	error = find_lkb(ls, ms->m_lkid, &lkb);
3684 	if (error) {
3685 		log_error(ls, "receive_lookup_reply no lkb");
3686 		return;
3687 	}
3688 
3689 	/* ms->m_result is the value returned by dlm_dir_lookup on dir node
3690 	   FIXME: will a non-zero error ever be returned? */
3691 
3692 	r = lkb->lkb_resource;
3693 	hold_rsb(r);
3694 	lock_rsb(r);
3695 
3696 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3697 	if (error)
3698 		goto out;
3699 
3700 	ret_nodeid = ms->m_nodeid;
3701 	if (ret_nodeid == dlm_our_nodeid()) {
3702 		r->res_nodeid = 0;
3703 		ret_nodeid = 0;
3704 		r->res_first_lkid = 0;
3705 	} else {
3706 		/* set_master() will copy res_nodeid to lkb_nodeid */
3707 		r->res_nodeid = ret_nodeid;
3708 	}
3709 
3710 	if (is_overlap(lkb)) {
3711 		log_debug(ls, "receive_lookup_reply %x unlock %x",
3712 			  lkb->lkb_id, lkb->lkb_flags);
3713 		queue_cast_overlap(r, lkb);
3714 		unhold_lkb(lkb); /* undoes create_lkb() */
3715 		goto out_list;
3716 	}
3717 
3718 	_request_lock(r, lkb);
3719 
3720  out_list:
3721 	if (!ret_nodeid)
3722 		process_lookup_list(r);
3723  out:
3724 	unlock_rsb(r);
3725 	put_rsb(r);
3726 	dlm_put_lkb(lkb);
3727 }
3728 
3729 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3730 {
3731 	if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3732 		log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3733 			  ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3734 			  ms->m_remid, ms->m_result);
3735 		return;
3736 	}
3737 
3738 	switch (ms->m_type) {
3739 
3740 	/* messages sent to a master node */
3741 
3742 	case DLM_MSG_REQUEST:
3743 		receive_request(ls, ms);
3744 		break;
3745 
3746 	case DLM_MSG_CONVERT:
3747 		receive_convert(ls, ms);
3748 		break;
3749 
3750 	case DLM_MSG_UNLOCK:
3751 		receive_unlock(ls, ms);
3752 		break;
3753 
3754 	case DLM_MSG_CANCEL:
3755 		receive_cancel(ls, ms);
3756 		break;
3757 
3758 	/* messages sent from a master node (replies to above) */
3759 
3760 	case DLM_MSG_REQUEST_REPLY:
3761 		receive_request_reply(ls, ms);
3762 		break;
3763 
3764 	case DLM_MSG_CONVERT_REPLY:
3765 		receive_convert_reply(ls, ms);
3766 		break;
3767 
3768 	case DLM_MSG_UNLOCK_REPLY:
3769 		receive_unlock_reply(ls, ms);
3770 		break;
3771 
3772 	case DLM_MSG_CANCEL_REPLY:
3773 		receive_cancel_reply(ls, ms);
3774 		break;
3775 
3776 	/* messages sent from a master node (only two types of async msg) */
3777 
3778 	case DLM_MSG_GRANT:
3779 		receive_grant(ls, ms);
3780 		break;
3781 
3782 	case DLM_MSG_BAST:
3783 		receive_bast(ls, ms);
3784 		break;
3785 
3786 	/* messages sent to a dir node */
3787 
3788 	case DLM_MSG_LOOKUP:
3789 		receive_lookup(ls, ms);
3790 		break;
3791 
3792 	case DLM_MSG_REMOVE:
3793 		receive_remove(ls, ms);
3794 		break;
3795 
3796 	/* messages sent from a dir node (remove has no reply) */
3797 
3798 	case DLM_MSG_LOOKUP_REPLY:
3799 		receive_lookup_reply(ls, ms);
3800 		break;
3801 
3802 	/* other messages */
3803 
3804 	case DLM_MSG_PURGE:
3805 		receive_purge(ls, ms);
3806 		break;
3807 
3808 	default:
3809 		log_error(ls, "unknown message type %d", ms->m_type);
3810 	}
3811 
3812 	dlm_astd_wake();
3813 }
3814 
3815 /* If the lockspace is in recovery mode (locking stopped), then normal
3816    messages are saved on the requestqueue for processing after recovery is
3817    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3818    messages off the requestqueue before we process new ones. This occurs right
3819    after recovery completes when we transition from saving all messages on
3820    requestqueue, to processing all the saved messages, to processing new
3821    messages as they arrive. */
3822 
3823 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3824 				int nodeid)
3825 {
3826 	if (dlm_locking_stopped(ls)) {
3827 		dlm_add_requestqueue(ls, nodeid, ms);
3828 	} else {
3829 		dlm_wait_requestqueue(ls);
3830 		_receive_message(ls, ms);
3831 	}
3832 }
3833 
3834 /* This is called by dlm_recoverd to process messages that were saved on
3835    the requestqueue. */
3836 
3837 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3838 {
3839 	_receive_message(ls, ms);
3840 }
3841 
3842 /* This is called by the midcomms layer when something is received for
3843    the lockspace.  It could be either a MSG (normal message sent as part of
3844    standard locking activity) or an RCOM (recovery message sent as part of
3845    lockspace recovery). */
3846 
3847 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3848 {
3849 	struct dlm_header *hd = &p->header;
3850 	struct dlm_ls *ls;
3851 	int type = 0;
3852 
3853 	switch (hd->h_cmd) {
3854 	case DLM_MSG:
3855 		dlm_message_in(&p->message);
3856 		type = p->message.m_type;
3857 		break;
3858 	case DLM_RCOM:
3859 		dlm_rcom_in(&p->rcom);
3860 		type = p->rcom.rc_type;
3861 		break;
3862 	default:
3863 		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3864 		return;
3865 	}
3866 
3867 	if (hd->h_nodeid != nodeid) {
3868 		log_print("invalid h_nodeid %d from %d lockspace %x",
3869 			  hd->h_nodeid, nodeid, hd->h_lockspace);
3870 		return;
3871 	}
3872 
3873 	ls = dlm_find_lockspace_global(hd->h_lockspace);
3874 	if (!ls) {
3875 		if (dlm_config.ci_log_debug)
3876 			log_print("invalid lockspace %x from %d cmd %d type %d",
3877 				  hd->h_lockspace, nodeid, hd->h_cmd, type);
3878 
3879 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3880 			dlm_send_ls_not_ready(nodeid, &p->rcom);
3881 		return;
3882 	}
3883 
3884 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3885 	   be inactive (in this ls) before transitioning to recovery mode */
3886 
3887 	down_read(&ls->ls_recv_active);
3888 	if (hd->h_cmd == DLM_MSG)
3889 		dlm_receive_message(ls, &p->message, nodeid);
3890 	else
3891 		dlm_receive_rcom(ls, &p->rcom, nodeid);
3892 	up_read(&ls->ls_recv_active);
3893 
3894 	dlm_put_lockspace(ls);
3895 }
3896 
3897 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3898 {
3899 	if (middle_conversion(lkb)) {
3900 		hold_lkb(lkb);
3901 		ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3902 		ls->ls_stub_ms.m_result = -EINPROGRESS;
3903 		ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3904 		ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3905 		_receive_convert_reply(lkb, &ls->ls_stub_ms);
3906 
3907 		/* Same special case as in receive_rcom_lock_args() */
3908 		lkb->lkb_grmode = DLM_LOCK_IV;
3909 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3910 		unhold_lkb(lkb);
3911 
3912 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3913 		lkb->lkb_flags |= DLM_IFL_RESEND;
3914 	}
3915 
3916 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3917 	   conversions are async; there's no reply from the remote master */
3918 }
3919 
3920 /* A waiting lkb needs recovery if the master node has failed, or
3921    the master node is changing (only when no directory is used) */
3922 
3923 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3924 {
3925 	if (dlm_is_removed(ls, lkb->lkb_nodeid))
3926 		return 1;
3927 
3928 	if (!dlm_no_directory(ls))
3929 		return 0;
3930 
3931 	if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3932 		return 1;
3933 
3934 	return 0;
3935 }
3936 
3937 /* Recovery for locks that are waiting for replies from nodes that are now
3938    gone.  We can just complete unlocks and cancels by faking a reply from the
3939    dead node.  Requests and up-conversions we flag to be resent after
3940    recovery.  Down-conversions can just be completed with a fake reply like
3941    unlocks.  Conversions between PR and CW need special attention. */
3942 
3943 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3944 {
3945 	struct dlm_lkb *lkb, *safe;
3946 	int wait_type, stub_unlock_result, stub_cancel_result;
3947 
3948 	mutex_lock(&ls->ls_waiters_mutex);
3949 
3950 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3951 		log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3952 			  lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3953 
3954 		/* all outstanding lookups, regardless of destination  will be
3955 		   resent after recovery is done */
3956 
3957 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3958 			lkb->lkb_flags |= DLM_IFL_RESEND;
3959 			continue;
3960 		}
3961 
3962 		if (!waiter_needs_recovery(ls, lkb))
3963 			continue;
3964 
3965 		wait_type = lkb->lkb_wait_type;
3966 		stub_unlock_result = -DLM_EUNLOCK;
3967 		stub_cancel_result = -DLM_ECANCEL;
3968 
3969 		/* Main reply may have been received leaving a zero wait_type,
3970 		   but a reply for the overlapping op may not have been
3971 		   received.  In that case we need to fake the appropriate
3972 		   reply for the overlap op. */
3973 
3974 		if (!wait_type) {
3975 			if (is_overlap_cancel(lkb)) {
3976 				wait_type = DLM_MSG_CANCEL;
3977 				if (lkb->lkb_grmode == DLM_LOCK_IV)
3978 					stub_cancel_result = 0;
3979 			}
3980 			if (is_overlap_unlock(lkb)) {
3981 				wait_type = DLM_MSG_UNLOCK;
3982 				if (lkb->lkb_grmode == DLM_LOCK_IV)
3983 					stub_unlock_result = -ENOENT;
3984 			}
3985 
3986 			log_debug(ls, "rwpre overlap %x %x %d %d %d",
3987 				  lkb->lkb_id, lkb->lkb_flags, wait_type,
3988 				  stub_cancel_result, stub_unlock_result);
3989 		}
3990 
3991 		switch (wait_type) {
3992 
3993 		case DLM_MSG_REQUEST:
3994 			lkb->lkb_flags |= DLM_IFL_RESEND;
3995 			break;
3996 
3997 		case DLM_MSG_CONVERT:
3998 			recover_convert_waiter(ls, lkb);
3999 			break;
4000 
4001 		case DLM_MSG_UNLOCK:
4002 			hold_lkb(lkb);
4003 			ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4004 			ls->ls_stub_ms.m_result = stub_unlock_result;
4005 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4006 			ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4007 			_receive_unlock_reply(lkb, &ls->ls_stub_ms);
4008 			dlm_put_lkb(lkb);
4009 			break;
4010 
4011 		case DLM_MSG_CANCEL:
4012 			hold_lkb(lkb);
4013 			ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4014 			ls->ls_stub_ms.m_result = stub_cancel_result;
4015 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4016 			ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4017 			_receive_cancel_reply(lkb, &ls->ls_stub_ms);
4018 			dlm_put_lkb(lkb);
4019 			break;
4020 
4021 		default:
4022 			log_error(ls, "invalid lkb wait_type %d %d",
4023 				  lkb->lkb_wait_type, wait_type);
4024 		}
4025 		schedule();
4026 	}
4027 	mutex_unlock(&ls->ls_waiters_mutex);
4028 }
4029 
4030 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4031 {
4032 	struct dlm_lkb *lkb;
4033 	int found = 0;
4034 
4035 	mutex_lock(&ls->ls_waiters_mutex);
4036 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4037 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
4038 			hold_lkb(lkb);
4039 			found = 1;
4040 			break;
4041 		}
4042 	}
4043 	mutex_unlock(&ls->ls_waiters_mutex);
4044 
4045 	if (!found)
4046 		lkb = NULL;
4047 	return lkb;
4048 }
4049 
4050 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4051    master or dir-node for r.  Processing the lkb may result in it being placed
4052    back on waiters. */
4053 
4054 /* We do this after normal locking has been enabled and any saved messages
4055    (in requestqueue) have been processed.  We should be confident that at
4056    this point we won't get or process a reply to any of these waiting
4057    operations.  But, new ops may be coming in on the rsbs/locks here from
4058    userspace or remotely. */
4059 
4060 /* there may have been an overlap unlock/cancel prior to recovery or after
4061    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4062    overlap flag would just have been set and nothing new sent.  we can be
4063    confident here than any replies to either the initial op or overlap ops
4064    prior to recovery have been received. */
4065 
4066 int dlm_recover_waiters_post(struct dlm_ls *ls)
4067 {
4068 	struct dlm_lkb *lkb;
4069 	struct dlm_rsb *r;
4070 	int error = 0, mstype, err, oc, ou;
4071 
4072 	while (1) {
4073 		if (dlm_locking_stopped(ls)) {
4074 			log_debug(ls, "recover_waiters_post aborted");
4075 			error = -EINTR;
4076 			break;
4077 		}
4078 
4079 		lkb = find_resend_waiter(ls);
4080 		if (!lkb)
4081 			break;
4082 
4083 		r = lkb->lkb_resource;
4084 		hold_rsb(r);
4085 		lock_rsb(r);
4086 
4087 		mstype = lkb->lkb_wait_type;
4088 		oc = is_overlap_cancel(lkb);
4089 		ou = is_overlap_unlock(lkb);
4090 		err = 0;
4091 
4092 		log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4093 			  lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4094 
4095 		/* At this point we assume that we won't get a reply to any
4096 		   previous op or overlap op on this lock.  First, do a big
4097 		   remove_from_waiters() for all previous ops. */
4098 
4099 		lkb->lkb_flags &= ~DLM_IFL_RESEND;
4100 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4101 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4102 		lkb->lkb_wait_type = 0;
4103 		lkb->lkb_wait_count = 0;
4104 		mutex_lock(&ls->ls_waiters_mutex);
4105 		list_del_init(&lkb->lkb_wait_reply);
4106 		mutex_unlock(&ls->ls_waiters_mutex);
4107 		unhold_lkb(lkb); /* for waiters list */
4108 
4109 		if (oc || ou) {
4110 			/* do an unlock or cancel instead of resending */
4111 			switch (mstype) {
4112 			case DLM_MSG_LOOKUP:
4113 			case DLM_MSG_REQUEST:
4114 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4115 							-DLM_ECANCEL);
4116 				unhold_lkb(lkb); /* undoes create_lkb() */
4117 				break;
4118 			case DLM_MSG_CONVERT:
4119 				if (oc) {
4120 					queue_cast(r, lkb, -DLM_ECANCEL);
4121 				} else {
4122 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4123 					_unlock_lock(r, lkb);
4124 				}
4125 				break;
4126 			default:
4127 				err = 1;
4128 			}
4129 		} else {
4130 			switch (mstype) {
4131 			case DLM_MSG_LOOKUP:
4132 			case DLM_MSG_REQUEST:
4133 				_request_lock(r, lkb);
4134 				if (is_master(r))
4135 					confirm_master(r, 0);
4136 				break;
4137 			case DLM_MSG_CONVERT:
4138 				_convert_lock(r, lkb);
4139 				break;
4140 			default:
4141 				err = 1;
4142 			}
4143 		}
4144 
4145 		if (err)
4146 			log_error(ls, "recover_waiters_post %x %d %x %d %d",
4147 			  	  lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4148 		unlock_rsb(r);
4149 		put_rsb(r);
4150 		dlm_put_lkb(lkb);
4151 	}
4152 
4153 	return error;
4154 }
4155 
4156 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4157 			int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4158 {
4159 	struct dlm_ls *ls = r->res_ls;
4160 	struct dlm_lkb *lkb, *safe;
4161 
4162 	list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4163 		if (test(ls, lkb)) {
4164 			rsb_set_flag(r, RSB_LOCKS_PURGED);
4165 			del_lkb(r, lkb);
4166 			/* this put should free the lkb */
4167 			if (!dlm_put_lkb(lkb))
4168 				log_error(ls, "purged lkb not released");
4169 		}
4170 	}
4171 }
4172 
4173 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4174 {
4175 	return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4176 }
4177 
4178 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4179 {
4180 	return is_master_copy(lkb);
4181 }
4182 
4183 static void purge_dead_locks(struct dlm_rsb *r)
4184 {
4185 	purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4186 	purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4187 	purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4188 }
4189 
4190 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4191 {
4192 	purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4193 	purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4194 	purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4195 }
4196 
4197 /* Get rid of locks held by nodes that are gone. */
4198 
4199 int dlm_purge_locks(struct dlm_ls *ls)
4200 {
4201 	struct dlm_rsb *r;
4202 
4203 	log_debug(ls, "dlm_purge_locks");
4204 
4205 	down_write(&ls->ls_root_sem);
4206 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4207 		hold_rsb(r);
4208 		lock_rsb(r);
4209 		if (is_master(r))
4210 			purge_dead_locks(r);
4211 		unlock_rsb(r);
4212 		unhold_rsb(r);
4213 
4214 		schedule();
4215 	}
4216 	up_write(&ls->ls_root_sem);
4217 
4218 	return 0;
4219 }
4220 
4221 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4222 {
4223 	struct dlm_rsb *r, *r_ret = NULL;
4224 
4225 	read_lock(&ls->ls_rsbtbl[bucket].lock);
4226 	list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4227 		if (!rsb_flag(r, RSB_LOCKS_PURGED))
4228 			continue;
4229 		hold_rsb(r);
4230 		rsb_clear_flag(r, RSB_LOCKS_PURGED);
4231 		r_ret = r;
4232 		break;
4233 	}
4234 	read_unlock(&ls->ls_rsbtbl[bucket].lock);
4235 	return r_ret;
4236 }
4237 
4238 void dlm_grant_after_purge(struct dlm_ls *ls)
4239 {
4240 	struct dlm_rsb *r;
4241 	int bucket = 0;
4242 
4243 	while (1) {
4244 		r = find_purged_rsb(ls, bucket);
4245 		if (!r) {
4246 			if (bucket == ls->ls_rsbtbl_size - 1)
4247 				break;
4248 			bucket++;
4249 			continue;
4250 		}
4251 		lock_rsb(r);
4252 		if (is_master(r)) {
4253 			grant_pending_locks(r);
4254 			confirm_master(r, 0);
4255 		}
4256 		unlock_rsb(r);
4257 		put_rsb(r);
4258 		schedule();
4259 	}
4260 }
4261 
4262 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4263 					 uint32_t remid)
4264 {
4265 	struct dlm_lkb *lkb;
4266 
4267 	list_for_each_entry(lkb, head, lkb_statequeue) {
4268 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4269 			return lkb;
4270 	}
4271 	return NULL;
4272 }
4273 
4274 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4275 				    uint32_t remid)
4276 {
4277 	struct dlm_lkb *lkb;
4278 
4279 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4280 	if (lkb)
4281 		return lkb;
4282 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4283 	if (lkb)
4284 		return lkb;
4285 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4286 	if (lkb)
4287 		return lkb;
4288 	return NULL;
4289 }
4290 
4291 /* needs at least dlm_rcom + rcom_lock */
4292 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4293 				  struct dlm_rsb *r, struct dlm_rcom *rc)
4294 {
4295 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4296 
4297 	lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4298 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4299 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4300 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4301 	lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4302 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
4303 	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4304 	lkb->lkb_rqmode = rl->rl_rqmode;
4305 	lkb->lkb_grmode = rl->rl_grmode;
4306 	/* don't set lkb_status because add_lkb wants to itself */
4307 
4308 	lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
4309 	lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
4310 
4311 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4312 		int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4313 			 sizeof(struct rcom_lock);
4314 		if (lvblen > ls->ls_lvblen)
4315 			return -EINVAL;
4316 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4317 		if (!lkb->lkb_lvbptr)
4318 			return -ENOMEM;
4319 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4320 	}
4321 
4322 	/* Conversions between PR and CW (middle modes) need special handling.
4323 	   The real granted mode of these converting locks cannot be determined
4324 	   until all locks have been rebuilt on the rsb (recover_conversion) */
4325 
4326 	if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4327 	    middle_conversion(lkb)) {
4328 		rl->rl_status = DLM_LKSTS_CONVERT;
4329 		lkb->lkb_grmode = DLM_LOCK_IV;
4330 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
4331 	}
4332 
4333 	return 0;
4334 }
4335 
4336 /* This lkb may have been recovered in a previous aborted recovery so we need
4337    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4338    If so we just send back a standard reply.  If not, we create a new lkb with
4339    the given values and send back our lkid.  We send back our lkid by sending
4340    back the rcom_lock struct we got but with the remid field filled in. */
4341 
4342 /* needs at least dlm_rcom + rcom_lock */
4343 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4344 {
4345 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4346 	struct dlm_rsb *r;
4347 	struct dlm_lkb *lkb;
4348 	int error;
4349 
4350 	if (rl->rl_parent_lkid) {
4351 		error = -EOPNOTSUPP;
4352 		goto out;
4353 	}
4354 
4355 	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4356 			 R_MASTER, &r);
4357 	if (error)
4358 		goto out;
4359 
4360 	lock_rsb(r);
4361 
4362 	lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4363 	if (lkb) {
4364 		error = -EEXIST;
4365 		goto out_remid;
4366 	}
4367 
4368 	error = create_lkb(ls, &lkb);
4369 	if (error)
4370 		goto out_unlock;
4371 
4372 	error = receive_rcom_lock_args(ls, lkb, r, rc);
4373 	if (error) {
4374 		__put_lkb(ls, lkb);
4375 		goto out_unlock;
4376 	}
4377 
4378 	attach_lkb(r, lkb);
4379 	add_lkb(r, lkb, rl->rl_status);
4380 	error = 0;
4381 
4382  out_remid:
4383 	/* this is the new value returned to the lock holder for
4384 	   saving in its process-copy lkb */
4385 	rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4386 
4387  out_unlock:
4388 	unlock_rsb(r);
4389 	put_rsb(r);
4390  out:
4391 	if (error)
4392 		log_debug(ls, "recover_master_copy %d %x", error,
4393 			  le32_to_cpu(rl->rl_lkid));
4394 	rl->rl_result = cpu_to_le32(error);
4395 	return error;
4396 }
4397 
4398 /* needs at least dlm_rcom + rcom_lock */
4399 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4400 {
4401 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4402 	struct dlm_rsb *r;
4403 	struct dlm_lkb *lkb;
4404 	int error;
4405 
4406 	error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4407 	if (error) {
4408 		log_error(ls, "recover_process_copy no lkid %x",
4409 				le32_to_cpu(rl->rl_lkid));
4410 		return error;
4411 	}
4412 
4413 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4414 
4415 	error = le32_to_cpu(rl->rl_result);
4416 
4417 	r = lkb->lkb_resource;
4418 	hold_rsb(r);
4419 	lock_rsb(r);
4420 
4421 	switch (error) {
4422 	case -EBADR:
4423 		/* There's a chance the new master received our lock before
4424 		   dlm_recover_master_reply(), this wouldn't happen if we did
4425 		   a barrier between recover_masters and recover_locks. */
4426 		log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4427 			  (unsigned long)r, r->res_name);
4428 		dlm_send_rcom_lock(r, lkb);
4429 		goto out;
4430 	case -EEXIST:
4431 		log_debug(ls, "master copy exists %x", lkb->lkb_id);
4432 		/* fall through */
4433 	case 0:
4434 		lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4435 		break;
4436 	default:
4437 		log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4438 			  error, lkb->lkb_id);
4439 	}
4440 
4441 	/* an ack for dlm_recover_locks() which waits for replies from
4442 	   all the locks it sends to new masters */
4443 	dlm_recovered_lock(r);
4444  out:
4445 	unlock_rsb(r);
4446 	put_rsb(r);
4447 	dlm_put_lkb(lkb);
4448 
4449 	return 0;
4450 }
4451 
4452 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4453 		     int mode, uint32_t flags, void *name, unsigned int namelen,
4454 		     unsigned long timeout_cs)
4455 {
4456 	struct dlm_lkb *lkb;
4457 	struct dlm_args args;
4458 	int error;
4459 
4460 	dlm_lock_recovery(ls);
4461 
4462 	error = create_lkb(ls, &lkb);
4463 	if (error) {
4464 		kfree(ua);
4465 		goto out;
4466 	}
4467 
4468 	if (flags & DLM_LKF_VALBLK) {
4469 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4470 		if (!ua->lksb.sb_lvbptr) {
4471 			kfree(ua);
4472 			__put_lkb(ls, lkb);
4473 			error = -ENOMEM;
4474 			goto out;
4475 		}
4476 	}
4477 
4478 	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
4479 	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
4480 	   lock and that lkb_astparam is the dlm_user_args structure. */
4481 
4482 	error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4483 			      fake_astfn, ua, fake_bastfn, &args);
4484 	lkb->lkb_flags |= DLM_IFL_USER;
4485 	ua->old_mode = DLM_LOCK_IV;
4486 
4487 	if (error) {
4488 		__put_lkb(ls, lkb);
4489 		goto out;
4490 	}
4491 
4492 	error = request_lock(ls, lkb, name, namelen, &args);
4493 
4494 	switch (error) {
4495 	case 0:
4496 		break;
4497 	case -EINPROGRESS:
4498 		error = 0;
4499 		break;
4500 	case -EAGAIN:
4501 		error = 0;
4502 		/* fall through */
4503 	default:
4504 		__put_lkb(ls, lkb);
4505 		goto out;
4506 	}
4507 
4508 	/* add this new lkb to the per-process list of locks */
4509 	spin_lock(&ua->proc->locks_spin);
4510 	hold_lkb(lkb);
4511 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4512 	spin_unlock(&ua->proc->locks_spin);
4513  out:
4514 	dlm_unlock_recovery(ls);
4515 	return error;
4516 }
4517 
4518 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4519 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4520 		     unsigned long timeout_cs)
4521 {
4522 	struct dlm_lkb *lkb;
4523 	struct dlm_args args;
4524 	struct dlm_user_args *ua;
4525 	int error;
4526 
4527 	dlm_lock_recovery(ls);
4528 
4529 	error = find_lkb(ls, lkid, &lkb);
4530 	if (error)
4531 		goto out;
4532 
4533 	/* user can change the params on its lock when it converts it, or
4534 	   add an lvb that didn't exist before */
4535 
4536 	ua = lkb->lkb_ua;
4537 
4538 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4539 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4540 		if (!ua->lksb.sb_lvbptr) {
4541 			error = -ENOMEM;
4542 			goto out_put;
4543 		}
4544 	}
4545 	if (lvb_in && ua->lksb.sb_lvbptr)
4546 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4547 
4548 	ua->xid = ua_tmp->xid;
4549 	ua->castparam = ua_tmp->castparam;
4550 	ua->castaddr = ua_tmp->castaddr;
4551 	ua->bastparam = ua_tmp->bastparam;
4552 	ua->bastaddr = ua_tmp->bastaddr;
4553 	ua->user_lksb = ua_tmp->user_lksb;
4554 	ua->old_mode = lkb->lkb_grmode;
4555 
4556 	error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4557 			      fake_astfn, ua, fake_bastfn, &args);
4558 	if (error)
4559 		goto out_put;
4560 
4561 	error = convert_lock(ls, lkb, &args);
4562 
4563 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4564 		error = 0;
4565  out_put:
4566 	dlm_put_lkb(lkb);
4567  out:
4568 	dlm_unlock_recovery(ls);
4569 	kfree(ua_tmp);
4570 	return error;
4571 }
4572 
4573 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4574 		    uint32_t flags, uint32_t lkid, char *lvb_in)
4575 {
4576 	struct dlm_lkb *lkb;
4577 	struct dlm_args args;
4578 	struct dlm_user_args *ua;
4579 	int error;
4580 
4581 	dlm_lock_recovery(ls);
4582 
4583 	error = find_lkb(ls, lkid, &lkb);
4584 	if (error)
4585 		goto out;
4586 
4587 	ua = lkb->lkb_ua;
4588 
4589 	if (lvb_in && ua->lksb.sb_lvbptr)
4590 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4591 	if (ua_tmp->castparam)
4592 		ua->castparam = ua_tmp->castparam;
4593 	ua->user_lksb = ua_tmp->user_lksb;
4594 
4595 	error = set_unlock_args(flags, ua, &args);
4596 	if (error)
4597 		goto out_put;
4598 
4599 	error = unlock_lock(ls, lkb, &args);
4600 
4601 	if (error == -DLM_EUNLOCK)
4602 		error = 0;
4603 	/* from validate_unlock_args() */
4604 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4605 		error = 0;
4606 	if (error)
4607 		goto out_put;
4608 
4609 	spin_lock(&ua->proc->locks_spin);
4610 	/* dlm_user_add_ast() may have already taken lkb off the proc list */
4611 	if (!list_empty(&lkb->lkb_ownqueue))
4612 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4613 	spin_unlock(&ua->proc->locks_spin);
4614  out_put:
4615 	dlm_put_lkb(lkb);
4616  out:
4617 	dlm_unlock_recovery(ls);
4618 	kfree(ua_tmp);
4619 	return error;
4620 }
4621 
4622 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4623 		    uint32_t flags, uint32_t lkid)
4624 {
4625 	struct dlm_lkb *lkb;
4626 	struct dlm_args args;
4627 	struct dlm_user_args *ua;
4628 	int error;
4629 
4630 	dlm_lock_recovery(ls);
4631 
4632 	error = find_lkb(ls, lkid, &lkb);
4633 	if (error)
4634 		goto out;
4635 
4636 	ua = lkb->lkb_ua;
4637 	if (ua_tmp->castparam)
4638 		ua->castparam = ua_tmp->castparam;
4639 	ua->user_lksb = ua_tmp->user_lksb;
4640 
4641 	error = set_unlock_args(flags, ua, &args);
4642 	if (error)
4643 		goto out_put;
4644 
4645 	error = cancel_lock(ls, lkb, &args);
4646 
4647 	if (error == -DLM_ECANCEL)
4648 		error = 0;
4649 	/* from validate_unlock_args() */
4650 	if (error == -EBUSY)
4651 		error = 0;
4652  out_put:
4653 	dlm_put_lkb(lkb);
4654  out:
4655 	dlm_unlock_recovery(ls);
4656 	kfree(ua_tmp);
4657 	return error;
4658 }
4659 
4660 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4661 {
4662 	struct dlm_lkb *lkb;
4663 	struct dlm_args args;
4664 	struct dlm_user_args *ua;
4665 	struct dlm_rsb *r;
4666 	int error;
4667 
4668 	dlm_lock_recovery(ls);
4669 
4670 	error = find_lkb(ls, lkid, &lkb);
4671 	if (error)
4672 		goto out;
4673 
4674 	ua = lkb->lkb_ua;
4675 
4676 	error = set_unlock_args(flags, ua, &args);
4677 	if (error)
4678 		goto out_put;
4679 
4680 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4681 
4682 	r = lkb->lkb_resource;
4683 	hold_rsb(r);
4684 	lock_rsb(r);
4685 
4686 	error = validate_unlock_args(lkb, &args);
4687 	if (error)
4688 		goto out_r;
4689 	lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4690 
4691 	error = _cancel_lock(r, lkb);
4692  out_r:
4693 	unlock_rsb(r);
4694 	put_rsb(r);
4695 
4696 	if (error == -DLM_ECANCEL)
4697 		error = 0;
4698 	/* from validate_unlock_args() */
4699 	if (error == -EBUSY)
4700 		error = 0;
4701  out_put:
4702 	dlm_put_lkb(lkb);
4703  out:
4704 	dlm_unlock_recovery(ls);
4705 	return error;
4706 }
4707 
4708 /* lkb's that are removed from the waiters list by revert are just left on the
4709    orphans list with the granted orphan locks, to be freed by purge */
4710 
4711 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4712 {
4713 	struct dlm_args args;
4714 	int error;
4715 
4716 	hold_lkb(lkb);
4717 	mutex_lock(&ls->ls_orphans_mutex);
4718 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4719 	mutex_unlock(&ls->ls_orphans_mutex);
4720 
4721 	set_unlock_args(0, lkb->lkb_ua, &args);
4722 
4723 	error = cancel_lock(ls, lkb, &args);
4724 	if (error == -DLM_ECANCEL)
4725 		error = 0;
4726 	return error;
4727 }
4728 
4729 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4730    Regardless of what rsb queue the lock is on, it's removed and freed. */
4731 
4732 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4733 {
4734 	struct dlm_args args;
4735 	int error;
4736 
4737 	set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4738 
4739 	error = unlock_lock(ls, lkb, &args);
4740 	if (error == -DLM_EUNLOCK)
4741 		error = 0;
4742 	return error;
4743 }
4744 
4745 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4746    (which does lock_rsb) due to deadlock with receiving a message that does
4747    lock_rsb followed by dlm_user_add_ast() */
4748 
4749 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4750 				     struct dlm_user_proc *proc)
4751 {
4752 	struct dlm_lkb *lkb = NULL;
4753 
4754 	mutex_lock(&ls->ls_clear_proc_locks);
4755 	if (list_empty(&proc->locks))
4756 		goto out;
4757 
4758 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4759 	list_del_init(&lkb->lkb_ownqueue);
4760 
4761 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4762 		lkb->lkb_flags |= DLM_IFL_ORPHAN;
4763 	else
4764 		lkb->lkb_flags |= DLM_IFL_DEAD;
4765  out:
4766 	mutex_unlock(&ls->ls_clear_proc_locks);
4767 	return lkb;
4768 }
4769 
4770 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4771    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4772    which we clear here. */
4773 
4774 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4775    list, and no more device_writes should add lkb's to proc->locks list; so we
4776    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4777    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4778    them ourself. */
4779 
4780 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4781 {
4782 	struct dlm_lkb *lkb, *safe;
4783 
4784 	dlm_lock_recovery(ls);
4785 
4786 	while (1) {
4787 		lkb = del_proc_lock(ls, proc);
4788 		if (!lkb)
4789 			break;
4790 		del_timeout(lkb);
4791 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4792 			orphan_proc_lock(ls, lkb);
4793 		else
4794 			unlock_proc_lock(ls, lkb);
4795 
4796 		/* this removes the reference for the proc->locks list
4797 		   added by dlm_user_request, it may result in the lkb
4798 		   being freed */
4799 
4800 		dlm_put_lkb(lkb);
4801 	}
4802 
4803 	mutex_lock(&ls->ls_clear_proc_locks);
4804 
4805 	/* in-progress unlocks */
4806 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4807 		list_del_init(&lkb->lkb_ownqueue);
4808 		lkb->lkb_flags |= DLM_IFL_DEAD;
4809 		dlm_put_lkb(lkb);
4810 	}
4811 
4812 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4813 		lkb->lkb_ast_type = 0;
4814 		list_del(&lkb->lkb_astqueue);
4815 		dlm_put_lkb(lkb);
4816 	}
4817 
4818 	mutex_unlock(&ls->ls_clear_proc_locks);
4819 	dlm_unlock_recovery(ls);
4820 }
4821 
4822 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4823 {
4824 	struct dlm_lkb *lkb, *safe;
4825 
4826 	while (1) {
4827 		lkb = NULL;
4828 		spin_lock(&proc->locks_spin);
4829 		if (!list_empty(&proc->locks)) {
4830 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
4831 					 lkb_ownqueue);
4832 			list_del_init(&lkb->lkb_ownqueue);
4833 		}
4834 		spin_unlock(&proc->locks_spin);
4835 
4836 		if (!lkb)
4837 			break;
4838 
4839 		lkb->lkb_flags |= DLM_IFL_DEAD;
4840 		unlock_proc_lock(ls, lkb);
4841 		dlm_put_lkb(lkb); /* ref from proc->locks list */
4842 	}
4843 
4844 	spin_lock(&proc->locks_spin);
4845 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4846 		list_del_init(&lkb->lkb_ownqueue);
4847 		lkb->lkb_flags |= DLM_IFL_DEAD;
4848 		dlm_put_lkb(lkb);
4849 	}
4850 	spin_unlock(&proc->locks_spin);
4851 
4852 	spin_lock(&proc->asts_spin);
4853 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4854 		list_del(&lkb->lkb_astqueue);
4855 		dlm_put_lkb(lkb);
4856 	}
4857 	spin_unlock(&proc->asts_spin);
4858 }
4859 
4860 /* pid of 0 means purge all orphans */
4861 
4862 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4863 {
4864 	struct dlm_lkb *lkb, *safe;
4865 
4866 	mutex_lock(&ls->ls_orphans_mutex);
4867 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4868 		if (pid && lkb->lkb_ownpid != pid)
4869 			continue;
4870 		unlock_proc_lock(ls, lkb);
4871 		list_del_init(&lkb->lkb_ownqueue);
4872 		dlm_put_lkb(lkb);
4873 	}
4874 	mutex_unlock(&ls->ls_orphans_mutex);
4875 }
4876 
4877 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4878 {
4879 	struct dlm_message *ms;
4880 	struct dlm_mhandle *mh;
4881 	int error;
4882 
4883 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4884 				DLM_MSG_PURGE, &ms, &mh);
4885 	if (error)
4886 		return error;
4887 	ms->m_nodeid = nodeid;
4888 	ms->m_pid = pid;
4889 
4890 	return send_message(mh, ms);
4891 }
4892 
4893 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4894 		   int nodeid, int pid)
4895 {
4896 	int error = 0;
4897 
4898 	if (nodeid != dlm_our_nodeid()) {
4899 		error = send_purge(ls, nodeid, pid);
4900 	} else {
4901 		dlm_lock_recovery(ls);
4902 		if (pid == current->pid)
4903 			purge_proc_locks(ls, proc);
4904 		else
4905 			do_purge(ls, nodeid, pid);
4906 		dlm_unlock_recovery(ls);
4907 	}
4908 	return error;
4909 }
4910 
4911