xref: /openbmc/linux/fs/dlm/lock.c (revision 643d1f7f)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 /* Central locking logic has four stages:
14 
15    dlm_lock()
16    dlm_unlock()
17 
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22 
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27 
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32 
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35 
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40 
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43 
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46 
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49 
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53 
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75 
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87 				    struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91 
92 /*
93  * Lock compatibilty matrix - thanks Steve
94  * UN = Unlocked state. Not really a state, used as a flag
95  * PD = Padding. Used to make the matrix a nice power of two in size
96  * Other states are the same as the VMS DLM.
97  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
98  */
99 
100 static const int __dlm_compat_matrix[8][8] = {
101       /* UN NL CR CW PR PW EX PD */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
104         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
105         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
106         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
107         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
108         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
109         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
110 };
111 
112 /*
113  * This defines the direction of transfer of LVB data.
114  * Granted mode is the row; requested mode is the column.
115  * Usage: matrix[grmode+1][rqmode+1]
116  * 1 = LVB is returned to the caller
117  * 0 = LVB is written to the resource
118  * -1 = nothing happens to the LVB
119  */
120 
121 const int dlm_lvb_operations[8][8] = {
122         /* UN   NL  CR  CW  PR  PW  EX  PD*/
123         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
124         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
125         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
126         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
127         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
128         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
131 };
132 
133 #define modes_compat(gr, rq) \
134 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
135 
136 int dlm_modes_compat(int mode1, int mode2)
137 {
138 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
139 }
140 
141 /*
142  * Compatibility matrix for conversions with QUECVT set.
143  * Granted mode is the row; requested mode is the column.
144  * Usage: matrix[grmode+1][rqmode+1]
145  */
146 
147 static const int __quecvt_compat_matrix[8][8] = {
148       /* UN NL CR CW PR PW EX PD */
149         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
150         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
151         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
152         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
153         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
154         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
155         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
156         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
157 };
158 
159 void dlm_print_lkb(struct dlm_lkb *lkb)
160 {
161 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
162 	       "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
163 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
164 	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
165 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
166 }
167 
168 void dlm_print_rsb(struct dlm_rsb *r)
169 {
170 	printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
171 	       r->res_nodeid, r->res_flags, r->res_first_lkid,
172 	       r->res_recover_locks_count, r->res_name);
173 }
174 
175 void dlm_dump_rsb(struct dlm_rsb *r)
176 {
177 	struct dlm_lkb *lkb;
178 
179 	dlm_print_rsb(r);
180 
181 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
182 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
183 	printk(KERN_ERR "rsb lookup list\n");
184 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
185 		dlm_print_lkb(lkb);
186 	printk(KERN_ERR "rsb grant queue:\n");
187 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
188 		dlm_print_lkb(lkb);
189 	printk(KERN_ERR "rsb convert queue:\n");
190 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
191 		dlm_print_lkb(lkb);
192 	printk(KERN_ERR "rsb wait queue:\n");
193 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
194 		dlm_print_lkb(lkb);
195 }
196 
197 /* Threads cannot use the lockspace while it's being recovered */
198 
199 static inline void dlm_lock_recovery(struct dlm_ls *ls)
200 {
201 	down_read(&ls->ls_in_recovery);
202 }
203 
204 void dlm_unlock_recovery(struct dlm_ls *ls)
205 {
206 	up_read(&ls->ls_in_recovery);
207 }
208 
209 int dlm_lock_recovery_try(struct dlm_ls *ls)
210 {
211 	return down_read_trylock(&ls->ls_in_recovery);
212 }
213 
214 static inline int can_be_queued(struct dlm_lkb *lkb)
215 {
216 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 }
218 
219 static inline int force_blocking_asts(struct dlm_lkb *lkb)
220 {
221 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 }
223 
224 static inline int is_demoted(struct dlm_lkb *lkb)
225 {
226 	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 }
228 
229 static inline int is_altmode(struct dlm_lkb *lkb)
230 {
231 	return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 }
233 
234 static inline int is_granted(struct dlm_lkb *lkb)
235 {
236 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
237 }
238 
239 static inline int is_remote(struct dlm_rsb *r)
240 {
241 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
242 	return !!r->res_nodeid;
243 }
244 
245 static inline int is_process_copy(struct dlm_lkb *lkb)
246 {
247 	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 }
249 
250 static inline int is_master_copy(struct dlm_lkb *lkb)
251 {
252 	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
253 		DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
254 	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 }
256 
257 static inline int middle_conversion(struct dlm_lkb *lkb)
258 {
259 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
260 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
261 		return 1;
262 	return 0;
263 }
264 
265 static inline int down_conversion(struct dlm_lkb *lkb)
266 {
267 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 }
269 
270 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
271 {
272 	return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 }
274 
275 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
276 {
277 	return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 }
279 
280 static inline int is_overlap(struct dlm_lkb *lkb)
281 {
282 	return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
283 				  DLM_IFL_OVERLAP_CANCEL));
284 }
285 
286 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
287 {
288 	if (is_master_copy(lkb))
289 		return;
290 
291 	del_timeout(lkb);
292 
293 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
294 
295 	/* if the operation was a cancel, then return -DLM_ECANCEL, if a
296 	   timeout caused the cancel then return -ETIMEDOUT */
297 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
298 		lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
299 		rv = -ETIMEDOUT;
300 	}
301 
302 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
303 		lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
304 		rv = -EDEADLK;
305 	}
306 
307 	lkb->lkb_lksb->sb_status = rv;
308 	lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
309 
310 	dlm_add_ast(lkb, AST_COMP);
311 }
312 
313 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
314 {
315 	queue_cast(r, lkb,
316 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
317 }
318 
319 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
320 {
321 	if (is_master_copy(lkb))
322 		send_bast(r, lkb, rqmode);
323 	else {
324 		lkb->lkb_bastmode = rqmode;
325 		dlm_add_ast(lkb, AST_BAST);
326 	}
327 }
328 
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332 
333 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
334 {
335 	struct dlm_rsb *r;
336 
337 	r = dlm_allocate_rsb(ls, len);
338 	if (!r)
339 		return NULL;
340 
341 	r->res_ls = ls;
342 	r->res_length = len;
343 	memcpy(r->res_name, name, len);
344 	mutex_init(&r->res_mutex);
345 
346 	INIT_LIST_HEAD(&r->res_lookup);
347 	INIT_LIST_HEAD(&r->res_grantqueue);
348 	INIT_LIST_HEAD(&r->res_convertqueue);
349 	INIT_LIST_HEAD(&r->res_waitqueue);
350 	INIT_LIST_HEAD(&r->res_root_list);
351 	INIT_LIST_HEAD(&r->res_recover_list);
352 
353 	return r;
354 }
355 
356 static int search_rsb_list(struct list_head *head, char *name, int len,
357 			   unsigned int flags, struct dlm_rsb **r_ret)
358 {
359 	struct dlm_rsb *r;
360 	int error = 0;
361 
362 	list_for_each_entry(r, head, res_hashchain) {
363 		if (len == r->res_length && !memcmp(name, r->res_name, len))
364 			goto found;
365 	}
366 	return -EBADR;
367 
368  found:
369 	if (r->res_nodeid && (flags & R_MASTER))
370 		error = -ENOTBLK;
371 	*r_ret = r;
372 	return error;
373 }
374 
375 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
376 		       unsigned int flags, struct dlm_rsb **r_ret)
377 {
378 	struct dlm_rsb *r;
379 	int error;
380 
381 	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
382 	if (!error) {
383 		kref_get(&r->res_ref);
384 		goto out;
385 	}
386 	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
387 	if (error)
388 		goto out;
389 
390 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
391 
392 	if (dlm_no_directory(ls))
393 		goto out;
394 
395 	if (r->res_nodeid == -1) {
396 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
397 		r->res_first_lkid = 0;
398 	} else if (r->res_nodeid > 0) {
399 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
400 		r->res_first_lkid = 0;
401 	} else {
402 		DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
403 		DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
404 	}
405  out:
406 	*r_ret = r;
407 	return error;
408 }
409 
410 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
411 		      unsigned int flags, struct dlm_rsb **r_ret)
412 {
413 	int error;
414 	write_lock(&ls->ls_rsbtbl[b].lock);
415 	error = _search_rsb(ls, name, len, b, flags, r_ret);
416 	write_unlock(&ls->ls_rsbtbl[b].lock);
417 	return error;
418 }
419 
420 /*
421  * Find rsb in rsbtbl and potentially create/add one
422  *
423  * Delaying the release of rsb's has a similar benefit to applications keeping
424  * NL locks on an rsb, but without the guarantee that the cached master value
425  * will still be valid when the rsb is reused.  Apps aren't always smart enough
426  * to keep NL locks on an rsb that they may lock again shortly; this can lead
427  * to excessive master lookups and removals if we don't delay the release.
428  *
429  * Searching for an rsb means looking through both the normal list and toss
430  * list.  When found on the toss list the rsb is moved to the normal list with
431  * ref count of 1; when found on normal list the ref count is incremented.
432  */
433 
434 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
435 		    unsigned int flags, struct dlm_rsb **r_ret)
436 {
437 	struct dlm_rsb *r, *tmp;
438 	uint32_t hash, bucket;
439 	int error = 0;
440 
441 	if (dlm_no_directory(ls))
442 		flags |= R_CREATE;
443 
444 	hash = jhash(name, namelen, 0);
445 	bucket = hash & (ls->ls_rsbtbl_size - 1);
446 
447 	error = search_rsb(ls, name, namelen, bucket, flags, &r);
448 	if (!error)
449 		goto out;
450 
451 	if (error == -EBADR && !(flags & R_CREATE))
452 		goto out;
453 
454 	/* the rsb was found but wasn't a master copy */
455 	if (error == -ENOTBLK)
456 		goto out;
457 
458 	error = -ENOMEM;
459 	r = create_rsb(ls, name, namelen);
460 	if (!r)
461 		goto out;
462 
463 	r->res_hash = hash;
464 	r->res_bucket = bucket;
465 	r->res_nodeid = -1;
466 	kref_init(&r->res_ref);
467 
468 	/* With no directory, the master can be set immediately */
469 	if (dlm_no_directory(ls)) {
470 		int nodeid = dlm_dir_nodeid(r);
471 		if (nodeid == dlm_our_nodeid())
472 			nodeid = 0;
473 		r->res_nodeid = nodeid;
474 	}
475 
476 	write_lock(&ls->ls_rsbtbl[bucket].lock);
477 	error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
478 	if (!error) {
479 		write_unlock(&ls->ls_rsbtbl[bucket].lock);
480 		dlm_free_rsb(r);
481 		r = tmp;
482 		goto out;
483 	}
484 	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
485 	write_unlock(&ls->ls_rsbtbl[bucket].lock);
486 	error = 0;
487  out:
488 	*r_ret = r;
489 	return error;
490 }
491 
492 /* This is only called to add a reference when the code already holds
493    a valid reference to the rsb, so there's no need for locking. */
494 
495 static inline void hold_rsb(struct dlm_rsb *r)
496 {
497 	kref_get(&r->res_ref);
498 }
499 
500 void dlm_hold_rsb(struct dlm_rsb *r)
501 {
502 	hold_rsb(r);
503 }
504 
505 static void toss_rsb(struct kref *kref)
506 {
507 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
508 	struct dlm_ls *ls = r->res_ls;
509 
510 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
511 	kref_init(&r->res_ref);
512 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
513 	r->res_toss_time = jiffies;
514 	if (r->res_lvbptr) {
515 		dlm_free_lvb(r->res_lvbptr);
516 		r->res_lvbptr = NULL;
517 	}
518 }
519 
520 /* When all references to the rsb are gone it's transfered to
521    the tossed list for later disposal. */
522 
523 static void put_rsb(struct dlm_rsb *r)
524 {
525 	struct dlm_ls *ls = r->res_ls;
526 	uint32_t bucket = r->res_bucket;
527 
528 	write_lock(&ls->ls_rsbtbl[bucket].lock);
529 	kref_put(&r->res_ref, toss_rsb);
530 	write_unlock(&ls->ls_rsbtbl[bucket].lock);
531 }
532 
533 void dlm_put_rsb(struct dlm_rsb *r)
534 {
535 	put_rsb(r);
536 }
537 
538 /* See comment for unhold_lkb */
539 
540 static void unhold_rsb(struct dlm_rsb *r)
541 {
542 	int rv;
543 	rv = kref_put(&r->res_ref, toss_rsb);
544 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
545 }
546 
547 static void kill_rsb(struct kref *kref)
548 {
549 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
550 
551 	/* All work is done after the return from kref_put() so we
552 	   can release the write_lock before the remove and free. */
553 
554 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
555 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
556 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
557 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
558 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
559 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
560 }
561 
562 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
563    The rsb must exist as long as any lkb's for it do. */
564 
565 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
566 {
567 	hold_rsb(r);
568 	lkb->lkb_resource = r;
569 }
570 
571 static void detach_lkb(struct dlm_lkb *lkb)
572 {
573 	if (lkb->lkb_resource) {
574 		put_rsb(lkb->lkb_resource);
575 		lkb->lkb_resource = NULL;
576 	}
577 }
578 
579 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
580 {
581 	struct dlm_lkb *lkb, *tmp;
582 	uint32_t lkid = 0;
583 	uint16_t bucket;
584 
585 	lkb = dlm_allocate_lkb(ls);
586 	if (!lkb)
587 		return -ENOMEM;
588 
589 	lkb->lkb_nodeid = -1;
590 	lkb->lkb_grmode = DLM_LOCK_IV;
591 	kref_init(&lkb->lkb_ref);
592 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
593 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
594 	INIT_LIST_HEAD(&lkb->lkb_time_list);
595 
596 	get_random_bytes(&bucket, sizeof(bucket));
597 	bucket &= (ls->ls_lkbtbl_size - 1);
598 
599 	write_lock(&ls->ls_lkbtbl[bucket].lock);
600 
601 	/* counter can roll over so we must verify lkid is not in use */
602 
603 	while (lkid == 0) {
604 		lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
605 
606 		list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
607 				    lkb_idtbl_list) {
608 			if (tmp->lkb_id != lkid)
609 				continue;
610 			lkid = 0;
611 			break;
612 		}
613 	}
614 
615 	lkb->lkb_id = lkid;
616 	list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
617 	write_unlock(&ls->ls_lkbtbl[bucket].lock);
618 
619 	*lkb_ret = lkb;
620 	return 0;
621 }
622 
623 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
624 {
625 	struct dlm_lkb *lkb;
626 	uint16_t bucket = (lkid >> 16);
627 
628 	list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
629 		if (lkb->lkb_id == lkid)
630 			return lkb;
631 	}
632 	return NULL;
633 }
634 
635 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
636 {
637 	struct dlm_lkb *lkb;
638 	uint16_t bucket = (lkid >> 16);
639 
640 	if (bucket >= ls->ls_lkbtbl_size)
641 		return -EBADSLT;
642 
643 	read_lock(&ls->ls_lkbtbl[bucket].lock);
644 	lkb = __find_lkb(ls, lkid);
645 	if (lkb)
646 		kref_get(&lkb->lkb_ref);
647 	read_unlock(&ls->ls_lkbtbl[bucket].lock);
648 
649 	*lkb_ret = lkb;
650 	return lkb ? 0 : -ENOENT;
651 }
652 
653 static void kill_lkb(struct kref *kref)
654 {
655 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
656 
657 	/* All work is done after the return from kref_put() so we
658 	   can release the write_lock before the detach_lkb */
659 
660 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
661 }
662 
663 /* __put_lkb() is used when an lkb may not have an rsb attached to
664    it so we need to provide the lockspace explicitly */
665 
666 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
667 {
668 	uint16_t bucket = (lkb->lkb_id >> 16);
669 
670 	write_lock(&ls->ls_lkbtbl[bucket].lock);
671 	if (kref_put(&lkb->lkb_ref, kill_lkb)) {
672 		list_del(&lkb->lkb_idtbl_list);
673 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
674 
675 		detach_lkb(lkb);
676 
677 		/* for local/process lkbs, lvbptr points to caller's lksb */
678 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
679 			dlm_free_lvb(lkb->lkb_lvbptr);
680 		dlm_free_lkb(lkb);
681 		return 1;
682 	} else {
683 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
684 		return 0;
685 	}
686 }
687 
688 int dlm_put_lkb(struct dlm_lkb *lkb)
689 {
690 	struct dlm_ls *ls;
691 
692 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
693 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
694 
695 	ls = lkb->lkb_resource->res_ls;
696 	return __put_lkb(ls, lkb);
697 }
698 
699 /* This is only called to add a reference when the code already holds
700    a valid reference to the lkb, so there's no need for locking. */
701 
702 static inline void hold_lkb(struct dlm_lkb *lkb)
703 {
704 	kref_get(&lkb->lkb_ref);
705 }
706 
707 /* This is called when we need to remove a reference and are certain
708    it's not the last ref.  e.g. del_lkb is always called between a
709    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
710    put_lkb would work fine, but would involve unnecessary locking */
711 
712 static inline void unhold_lkb(struct dlm_lkb *lkb)
713 {
714 	int rv;
715 	rv = kref_put(&lkb->lkb_ref, kill_lkb);
716 	DLM_ASSERT(!rv, dlm_print_lkb(lkb););
717 }
718 
719 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
720 			    int mode)
721 {
722 	struct dlm_lkb *lkb = NULL;
723 
724 	list_for_each_entry(lkb, head, lkb_statequeue)
725 		if (lkb->lkb_rqmode < mode)
726 			break;
727 
728 	if (!lkb)
729 		list_add_tail(new, head);
730 	else
731 		__list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
732 }
733 
734 /* add/remove lkb to rsb's grant/convert/wait queue */
735 
736 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
737 {
738 	kref_get(&lkb->lkb_ref);
739 
740 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
741 
742 	lkb->lkb_status = status;
743 
744 	switch (status) {
745 	case DLM_LKSTS_WAITING:
746 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
747 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
748 		else
749 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
750 		break;
751 	case DLM_LKSTS_GRANTED:
752 		/* convention says granted locks kept in order of grmode */
753 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
754 				lkb->lkb_grmode);
755 		break;
756 	case DLM_LKSTS_CONVERT:
757 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
758 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
759 		else
760 			list_add_tail(&lkb->lkb_statequeue,
761 				      &r->res_convertqueue);
762 		break;
763 	default:
764 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
765 	}
766 }
767 
768 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
769 {
770 	lkb->lkb_status = 0;
771 	list_del(&lkb->lkb_statequeue);
772 	unhold_lkb(lkb);
773 }
774 
775 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
776 {
777 	hold_lkb(lkb);
778 	del_lkb(r, lkb);
779 	add_lkb(r, lkb, sts);
780 	unhold_lkb(lkb);
781 }
782 
783 static int msg_reply_type(int mstype)
784 {
785 	switch (mstype) {
786 	case DLM_MSG_REQUEST:
787 		return DLM_MSG_REQUEST_REPLY;
788 	case DLM_MSG_CONVERT:
789 		return DLM_MSG_CONVERT_REPLY;
790 	case DLM_MSG_UNLOCK:
791 		return DLM_MSG_UNLOCK_REPLY;
792 	case DLM_MSG_CANCEL:
793 		return DLM_MSG_CANCEL_REPLY;
794 	case DLM_MSG_LOOKUP:
795 		return DLM_MSG_LOOKUP_REPLY;
796 	}
797 	return -1;
798 }
799 
800 /* add/remove lkb from global waiters list of lkb's waiting for
801    a reply from a remote node */
802 
803 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
804 {
805 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
806 	int error = 0;
807 
808 	mutex_lock(&ls->ls_waiters_mutex);
809 
810 	if (is_overlap_unlock(lkb) ||
811 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
812 		error = -EINVAL;
813 		goto out;
814 	}
815 
816 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
817 		switch (mstype) {
818 		case DLM_MSG_UNLOCK:
819 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
820 			break;
821 		case DLM_MSG_CANCEL:
822 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
823 			break;
824 		default:
825 			error = -EBUSY;
826 			goto out;
827 		}
828 		lkb->lkb_wait_count++;
829 		hold_lkb(lkb);
830 
831 		log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
832 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
833 			  lkb->lkb_wait_count, lkb->lkb_flags);
834 		goto out;
835 	}
836 
837 	DLM_ASSERT(!lkb->lkb_wait_count,
838 		   dlm_print_lkb(lkb);
839 		   printk("wait_count %d\n", lkb->lkb_wait_count););
840 
841 	lkb->lkb_wait_count++;
842 	lkb->lkb_wait_type = mstype;
843 	hold_lkb(lkb);
844 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
845  out:
846 	if (error)
847 		log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
848 			  lkb->lkb_id, error, lkb->lkb_flags, mstype,
849 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
850 	mutex_unlock(&ls->ls_waiters_mutex);
851 	return error;
852 }
853 
854 /* We clear the RESEND flag because we might be taking an lkb off the waiters
855    list as part of process_requestqueue (e.g. a lookup that has an optimized
856    request reply on the requestqueue) between dlm_recover_waiters_pre() which
857    set RESEND and dlm_recover_waiters_post() */
858 
859 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
860 {
861 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
862 	int overlap_done = 0;
863 
864 	if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
865 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
866 		overlap_done = 1;
867 		goto out_del;
868 	}
869 
870 	if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
871 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
872 		overlap_done = 1;
873 		goto out_del;
874 	}
875 
876 	/* N.B. type of reply may not always correspond to type of original
877 	   msg due to lookup->request optimization, verify others? */
878 
879 	if (lkb->lkb_wait_type) {
880 		lkb->lkb_wait_type = 0;
881 		goto out_del;
882 	}
883 
884 	log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
885 		  lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
886 	return -1;
887 
888  out_del:
889 	/* the force-unlock/cancel has completed and we haven't recvd a reply
890 	   to the op that was in progress prior to the unlock/cancel; we
891 	   give up on any reply to the earlier op.  FIXME: not sure when/how
892 	   this would happen */
893 
894 	if (overlap_done && lkb->lkb_wait_type) {
895 		log_error(ls, "remove_from_waiters %x reply %d give up on %d",
896 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
897 		lkb->lkb_wait_count--;
898 		lkb->lkb_wait_type = 0;
899 	}
900 
901 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
902 
903 	lkb->lkb_flags &= ~DLM_IFL_RESEND;
904 	lkb->lkb_wait_count--;
905 	if (!lkb->lkb_wait_count)
906 		list_del_init(&lkb->lkb_wait_reply);
907 	unhold_lkb(lkb);
908 	return 0;
909 }
910 
911 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
912 {
913 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
914 	int error;
915 
916 	mutex_lock(&ls->ls_waiters_mutex);
917 	error = _remove_from_waiters(lkb, mstype);
918 	mutex_unlock(&ls->ls_waiters_mutex);
919 	return error;
920 }
921 
922 /* Handles situations where we might be processing a "fake" or "stub" reply in
923    which we can't try to take waiters_mutex again. */
924 
925 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
926 {
927 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
928 	int error;
929 
930 	if (ms != &ls->ls_stub_ms)
931 		mutex_lock(&ls->ls_waiters_mutex);
932 	error = _remove_from_waiters(lkb, ms->m_type);
933 	if (ms != &ls->ls_stub_ms)
934 		mutex_unlock(&ls->ls_waiters_mutex);
935 	return error;
936 }
937 
938 static void dir_remove(struct dlm_rsb *r)
939 {
940 	int to_nodeid;
941 
942 	if (dlm_no_directory(r->res_ls))
943 		return;
944 
945 	to_nodeid = dlm_dir_nodeid(r);
946 	if (to_nodeid != dlm_our_nodeid())
947 		send_remove(r);
948 	else
949 		dlm_dir_remove_entry(r->res_ls, to_nodeid,
950 				     r->res_name, r->res_length);
951 }
952 
953 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
954    found since they are in order of newest to oldest? */
955 
956 static int shrink_bucket(struct dlm_ls *ls, int b)
957 {
958 	struct dlm_rsb *r;
959 	int count = 0, found;
960 
961 	for (;;) {
962 		found = 0;
963 		write_lock(&ls->ls_rsbtbl[b].lock);
964 		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
965 					    res_hashchain) {
966 			if (!time_after_eq(jiffies, r->res_toss_time +
967 					   dlm_config.ci_toss_secs * HZ))
968 				continue;
969 			found = 1;
970 			break;
971 		}
972 
973 		if (!found) {
974 			write_unlock(&ls->ls_rsbtbl[b].lock);
975 			break;
976 		}
977 
978 		if (kref_put(&r->res_ref, kill_rsb)) {
979 			list_del(&r->res_hashchain);
980 			write_unlock(&ls->ls_rsbtbl[b].lock);
981 
982 			if (is_master(r))
983 				dir_remove(r);
984 			dlm_free_rsb(r);
985 			count++;
986 		} else {
987 			write_unlock(&ls->ls_rsbtbl[b].lock);
988 			log_error(ls, "tossed rsb in use %s", r->res_name);
989 		}
990 	}
991 
992 	return count;
993 }
994 
995 void dlm_scan_rsbs(struct dlm_ls *ls)
996 {
997 	int i;
998 
999 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1000 		shrink_bucket(ls, i);
1001 		if (dlm_locking_stopped(ls))
1002 			break;
1003 		cond_resched();
1004 	}
1005 }
1006 
1007 static void add_timeout(struct dlm_lkb *lkb)
1008 {
1009 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1010 
1011 	if (is_master_copy(lkb)) {
1012 		lkb->lkb_timestamp = jiffies;
1013 		return;
1014 	}
1015 
1016 	if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1017 	    !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1018 		lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1019 		goto add_it;
1020 	}
1021 	if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1022 		goto add_it;
1023 	return;
1024 
1025  add_it:
1026 	DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1027 	mutex_lock(&ls->ls_timeout_mutex);
1028 	hold_lkb(lkb);
1029 	lkb->lkb_timestamp = jiffies;
1030 	list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1031 	mutex_unlock(&ls->ls_timeout_mutex);
1032 }
1033 
1034 static void del_timeout(struct dlm_lkb *lkb)
1035 {
1036 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1037 
1038 	mutex_lock(&ls->ls_timeout_mutex);
1039 	if (!list_empty(&lkb->lkb_time_list)) {
1040 		list_del_init(&lkb->lkb_time_list);
1041 		unhold_lkb(lkb);
1042 	}
1043 	mutex_unlock(&ls->ls_timeout_mutex);
1044 }
1045 
1046 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1047    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1048    and then lock rsb because of lock ordering in add_timeout.  We may need
1049    to specify some special timeout-related bits in the lkb that are just to
1050    be accessed under the timeout_mutex. */
1051 
1052 void dlm_scan_timeout(struct dlm_ls *ls)
1053 {
1054 	struct dlm_rsb *r;
1055 	struct dlm_lkb *lkb;
1056 	int do_cancel, do_warn;
1057 
1058 	for (;;) {
1059 		if (dlm_locking_stopped(ls))
1060 			break;
1061 
1062 		do_cancel = 0;
1063 		do_warn = 0;
1064 		mutex_lock(&ls->ls_timeout_mutex);
1065 		list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1066 
1067 			if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1068 			    time_after_eq(jiffies, lkb->lkb_timestamp +
1069 					  lkb->lkb_timeout_cs * HZ/100))
1070 				do_cancel = 1;
1071 
1072 			if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1073 			    time_after_eq(jiffies, lkb->lkb_timestamp +
1074 				   	   dlm_config.ci_timewarn_cs * HZ/100))
1075 				do_warn = 1;
1076 
1077 			if (!do_cancel && !do_warn)
1078 				continue;
1079 			hold_lkb(lkb);
1080 			break;
1081 		}
1082 		mutex_unlock(&ls->ls_timeout_mutex);
1083 
1084 		if (!do_cancel && !do_warn)
1085 			break;
1086 
1087 		r = lkb->lkb_resource;
1088 		hold_rsb(r);
1089 		lock_rsb(r);
1090 
1091 		if (do_warn) {
1092 			/* clear flag so we only warn once */
1093 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1094 			if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1095 				del_timeout(lkb);
1096 			dlm_timeout_warn(lkb);
1097 		}
1098 
1099 		if (do_cancel) {
1100 			log_debug(ls, "timeout cancel %x node %d %s",
1101 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1102 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1103 			lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1104 			del_timeout(lkb);
1105 			_cancel_lock(r, lkb);
1106 		}
1107 
1108 		unlock_rsb(r);
1109 		unhold_rsb(r);
1110 		dlm_put_lkb(lkb);
1111 	}
1112 }
1113 
1114 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1115    dlm_recoverd before checking/setting ls_recover_begin. */
1116 
1117 void dlm_adjust_timeouts(struct dlm_ls *ls)
1118 {
1119 	struct dlm_lkb *lkb;
1120 	long adj = jiffies - ls->ls_recover_begin;
1121 
1122 	ls->ls_recover_begin = 0;
1123 	mutex_lock(&ls->ls_timeout_mutex);
1124 	list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1125 		lkb->lkb_timestamp += adj;
1126 	mutex_unlock(&ls->ls_timeout_mutex);
1127 }
1128 
1129 /* lkb is master or local copy */
1130 
1131 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1132 {
1133 	int b, len = r->res_ls->ls_lvblen;
1134 
1135 	/* b=1 lvb returned to caller
1136 	   b=0 lvb written to rsb or invalidated
1137 	   b=-1 do nothing */
1138 
1139 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1140 
1141 	if (b == 1) {
1142 		if (!lkb->lkb_lvbptr)
1143 			return;
1144 
1145 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1146 			return;
1147 
1148 		if (!r->res_lvbptr)
1149 			return;
1150 
1151 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1152 		lkb->lkb_lvbseq = r->res_lvbseq;
1153 
1154 	} else if (b == 0) {
1155 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1156 			rsb_set_flag(r, RSB_VALNOTVALID);
1157 			return;
1158 		}
1159 
1160 		if (!lkb->lkb_lvbptr)
1161 			return;
1162 
1163 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1164 			return;
1165 
1166 		if (!r->res_lvbptr)
1167 			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1168 
1169 		if (!r->res_lvbptr)
1170 			return;
1171 
1172 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1173 		r->res_lvbseq++;
1174 		lkb->lkb_lvbseq = r->res_lvbseq;
1175 		rsb_clear_flag(r, RSB_VALNOTVALID);
1176 	}
1177 
1178 	if (rsb_flag(r, RSB_VALNOTVALID))
1179 		lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1180 }
1181 
1182 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1183 {
1184 	if (lkb->lkb_grmode < DLM_LOCK_PW)
1185 		return;
1186 
1187 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1188 		rsb_set_flag(r, RSB_VALNOTVALID);
1189 		return;
1190 	}
1191 
1192 	if (!lkb->lkb_lvbptr)
1193 		return;
1194 
1195 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1196 		return;
1197 
1198 	if (!r->res_lvbptr)
1199 		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1200 
1201 	if (!r->res_lvbptr)
1202 		return;
1203 
1204 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1205 	r->res_lvbseq++;
1206 	rsb_clear_flag(r, RSB_VALNOTVALID);
1207 }
1208 
1209 /* lkb is process copy (pc) */
1210 
1211 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1212 			    struct dlm_message *ms)
1213 {
1214 	int b;
1215 
1216 	if (!lkb->lkb_lvbptr)
1217 		return;
1218 
1219 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1220 		return;
1221 
1222 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1223 	if (b == 1) {
1224 		int len = receive_extralen(ms);
1225 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1226 		lkb->lkb_lvbseq = ms->m_lvbseq;
1227 	}
1228 }
1229 
1230 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1231    remove_lock -- used for unlock, removes lkb from granted
1232    revert_lock -- used for cancel, moves lkb from convert to granted
1233    grant_lock  -- used for request and convert, adds lkb to granted or
1234                   moves lkb from convert or waiting to granted
1235 
1236    Each of these is used for master or local copy lkb's.  There is
1237    also a _pc() variation used to make the corresponding change on
1238    a process copy (pc) lkb. */
1239 
1240 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1241 {
1242 	del_lkb(r, lkb);
1243 	lkb->lkb_grmode = DLM_LOCK_IV;
1244 	/* this unhold undoes the original ref from create_lkb()
1245 	   so this leads to the lkb being freed */
1246 	unhold_lkb(lkb);
1247 }
1248 
1249 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1250 {
1251 	set_lvb_unlock(r, lkb);
1252 	_remove_lock(r, lkb);
1253 }
1254 
1255 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1256 {
1257 	_remove_lock(r, lkb);
1258 }
1259 
1260 /* returns: 0 did nothing
1261 	    1 moved lock to granted
1262 	   -1 removed lock */
1263 
1264 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1265 {
1266 	int rv = 0;
1267 
1268 	lkb->lkb_rqmode = DLM_LOCK_IV;
1269 
1270 	switch (lkb->lkb_status) {
1271 	case DLM_LKSTS_GRANTED:
1272 		break;
1273 	case DLM_LKSTS_CONVERT:
1274 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1275 		rv = 1;
1276 		break;
1277 	case DLM_LKSTS_WAITING:
1278 		del_lkb(r, lkb);
1279 		lkb->lkb_grmode = DLM_LOCK_IV;
1280 		/* this unhold undoes the original ref from create_lkb()
1281 		   so this leads to the lkb being freed */
1282 		unhold_lkb(lkb);
1283 		rv = -1;
1284 		break;
1285 	default:
1286 		log_print("invalid status for revert %d", lkb->lkb_status);
1287 	}
1288 	return rv;
1289 }
1290 
1291 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1292 {
1293 	return revert_lock(r, lkb);
1294 }
1295 
1296 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1297 {
1298 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1299 		lkb->lkb_grmode = lkb->lkb_rqmode;
1300 		if (lkb->lkb_status)
1301 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1302 		else
1303 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1304 	}
1305 
1306 	lkb->lkb_rqmode = DLM_LOCK_IV;
1307 }
1308 
1309 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1310 {
1311 	set_lvb_lock(r, lkb);
1312 	_grant_lock(r, lkb);
1313 	lkb->lkb_highbast = 0;
1314 }
1315 
1316 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1317 			  struct dlm_message *ms)
1318 {
1319 	set_lvb_lock_pc(r, lkb, ms);
1320 	_grant_lock(r, lkb);
1321 }
1322 
1323 /* called by grant_pending_locks() which means an async grant message must
1324    be sent to the requesting node in addition to granting the lock if the
1325    lkb belongs to a remote node. */
1326 
1327 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1328 {
1329 	grant_lock(r, lkb);
1330 	if (is_master_copy(lkb))
1331 		send_grant(r, lkb);
1332 	else
1333 		queue_cast(r, lkb, 0);
1334 }
1335 
1336 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1337    change the granted/requested modes.  We're munging things accordingly in
1338    the process copy.
1339    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1340    conversion deadlock
1341    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1342    compatible with other granted locks */
1343 
1344 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1345 {
1346 	if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1347 		log_print("munge_demoted %x invalid reply type %d",
1348 			  lkb->lkb_id, ms->m_type);
1349 		return;
1350 	}
1351 
1352 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1353 		log_print("munge_demoted %x invalid modes gr %d rq %d",
1354 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1355 		return;
1356 	}
1357 
1358 	lkb->lkb_grmode = DLM_LOCK_NL;
1359 }
1360 
1361 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1362 {
1363 	if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1364 	    ms->m_type != DLM_MSG_GRANT) {
1365 		log_print("munge_altmode %x invalid reply type %d",
1366 			  lkb->lkb_id, ms->m_type);
1367 		return;
1368 	}
1369 
1370 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1371 		lkb->lkb_rqmode = DLM_LOCK_PR;
1372 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1373 		lkb->lkb_rqmode = DLM_LOCK_CW;
1374 	else {
1375 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1376 		dlm_print_lkb(lkb);
1377 	}
1378 }
1379 
1380 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1381 {
1382 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1383 					   lkb_statequeue);
1384 	if (lkb->lkb_id == first->lkb_id)
1385 		return 1;
1386 
1387 	return 0;
1388 }
1389 
1390 /* Check if the given lkb conflicts with another lkb on the queue. */
1391 
1392 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1393 {
1394 	struct dlm_lkb *this;
1395 
1396 	list_for_each_entry(this, head, lkb_statequeue) {
1397 		if (this == lkb)
1398 			continue;
1399 		if (!modes_compat(this, lkb))
1400 			return 1;
1401 	}
1402 	return 0;
1403 }
1404 
1405 /*
1406  * "A conversion deadlock arises with a pair of lock requests in the converting
1407  * queue for one resource.  The granted mode of each lock blocks the requested
1408  * mode of the other lock."
1409  *
1410  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1411  * convert queue from being granted, then deadlk/demote lkb.
1412  *
1413  * Example:
1414  * Granted Queue: empty
1415  * Convert Queue: NL->EX (first lock)
1416  *                PR->EX (second lock)
1417  *
1418  * The first lock can't be granted because of the granted mode of the second
1419  * lock and the second lock can't be granted because it's not first in the
1420  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1421  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1422  * flag set and return DEMOTED in the lksb flags.
1423  *
1424  * Originally, this function detected conv-deadlk in a more limited scope:
1425  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1426  * - if lkb1 was the first entry in the queue (not just earlier), and was
1427  *   blocked by the granted mode of lkb2, and there was nothing on the
1428  *   granted queue preventing lkb1 from being granted immediately, i.e.
1429  *   lkb2 was the only thing preventing lkb1 from being granted.
1430  *
1431  * That second condition meant we'd only say there was conv-deadlk if
1432  * resolving it (by demotion) would lead to the first lock on the convert
1433  * queue being granted right away.  It allowed conversion deadlocks to exist
1434  * between locks on the convert queue while they couldn't be granted anyway.
1435  *
1436  * Now, we detect and take action on conversion deadlocks immediately when
1437  * they're created, even if they may not be immediately consequential.  If
1438  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1439  * mode that would prevent lkb1's conversion from being granted, we do a
1440  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1441  * I think this means that the lkb_is_ahead condition below should always
1442  * be zero, i.e. there will never be conv-deadlk between two locks that are
1443  * both already on the convert queue.
1444  */
1445 
1446 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1447 {
1448 	struct dlm_lkb *lkb1;
1449 	int lkb_is_ahead = 0;
1450 
1451 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1452 		if (lkb1 == lkb2) {
1453 			lkb_is_ahead = 1;
1454 			continue;
1455 		}
1456 
1457 		if (!lkb_is_ahead) {
1458 			if (!modes_compat(lkb2, lkb1))
1459 				return 1;
1460 		} else {
1461 			if (!modes_compat(lkb2, lkb1) &&
1462 			    !modes_compat(lkb1, lkb2))
1463 				return 1;
1464 		}
1465 	}
1466 	return 0;
1467 }
1468 
1469 /*
1470  * Return 1 if the lock can be granted, 0 otherwise.
1471  * Also detect and resolve conversion deadlocks.
1472  *
1473  * lkb is the lock to be granted
1474  *
1475  * now is 1 if the function is being called in the context of the
1476  * immediate request, it is 0 if called later, after the lock has been
1477  * queued.
1478  *
1479  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1480  */
1481 
1482 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1483 {
1484 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1485 
1486 	/*
1487 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1488 	 * a new request for a NL mode lock being blocked.
1489 	 *
1490 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1491 	 * request, then it would be granted.  In essence, the use of this flag
1492 	 * tells the Lock Manager to expedite theis request by not considering
1493 	 * what may be in the CONVERTING or WAITING queues...  As of this
1494 	 * writing, the EXPEDITE flag can be used only with new requests for NL
1495 	 * mode locks.  This flag is not valid for conversion requests.
1496 	 *
1497 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1498 	 * conversion or used with a non-NL requested mode.  We also know an
1499 	 * EXPEDITE request is always granted immediately, so now must always
1500 	 * be 1.  The full condition to grant an expedite request: (now &&
1501 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1502 	 * therefore be shortened to just checking the flag.
1503 	 */
1504 
1505 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1506 		return 1;
1507 
1508 	/*
1509 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1510 	 * added to the remaining conditions.
1511 	 */
1512 
1513 	if (queue_conflict(&r->res_grantqueue, lkb))
1514 		goto out;
1515 
1516 	/*
1517 	 * 6-3: By default, a conversion request is immediately granted if the
1518 	 * requested mode is compatible with the modes of all other granted
1519 	 * locks
1520 	 */
1521 
1522 	if (queue_conflict(&r->res_convertqueue, lkb))
1523 		goto out;
1524 
1525 	/*
1526 	 * 6-5: But the default algorithm for deciding whether to grant or
1527 	 * queue conversion requests does not by itself guarantee that such
1528 	 * requests are serviced on a "first come first serve" basis.  This, in
1529 	 * turn, can lead to a phenomenon known as "indefinate postponement".
1530 	 *
1531 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1532 	 * the system service employed to request a lock conversion.  This flag
1533 	 * forces certain conversion requests to be queued, even if they are
1534 	 * compatible with the granted modes of other locks on the same
1535 	 * resource.  Thus, the use of this flag results in conversion requests
1536 	 * being ordered on a "first come first servce" basis.
1537 	 *
1538 	 * DCT: This condition is all about new conversions being able to occur
1539 	 * "in place" while the lock remains on the granted queue (assuming
1540 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1541 	 * doesn't _have_ to go onto the convert queue where it's processed in
1542 	 * order.  The "now" variable is necessary to distinguish converts
1543 	 * being received and processed for the first time now, because once a
1544 	 * convert is moved to the conversion queue the condition below applies
1545 	 * requiring fifo granting.
1546 	 */
1547 
1548 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1549 		return 1;
1550 
1551 	/*
1552 	 * The NOORDER flag is set to avoid the standard vms rules on grant
1553 	 * order.
1554 	 */
1555 
1556 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1557 		return 1;
1558 
1559 	/*
1560 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1561 	 * granted until all other conversion requests ahead of it are granted
1562 	 * and/or canceled.
1563 	 */
1564 
1565 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1566 		return 1;
1567 
1568 	/*
1569 	 * 6-4: By default, a new request is immediately granted only if all
1570 	 * three of the following conditions are satisfied when the request is
1571 	 * issued:
1572 	 * - The queue of ungranted conversion requests for the resource is
1573 	 *   empty.
1574 	 * - The queue of ungranted new requests for the resource is empty.
1575 	 * - The mode of the new request is compatible with the most
1576 	 *   restrictive mode of all granted locks on the resource.
1577 	 */
1578 
1579 	if (now && !conv && list_empty(&r->res_convertqueue) &&
1580 	    list_empty(&r->res_waitqueue))
1581 		return 1;
1582 
1583 	/*
1584 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
1585 	 * it cannot be granted until the queue of ungranted conversion
1586 	 * requests is empty, all ungranted new requests ahead of it are
1587 	 * granted and/or canceled, and it is compatible with the granted mode
1588 	 * of the most restrictive lock granted on the resource.
1589 	 */
1590 
1591 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
1592 	    first_in_list(lkb, &r->res_waitqueue))
1593 		return 1;
1594  out:
1595 	return 0;
1596 }
1597 
1598 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1599 			  int *err)
1600 {
1601 	int rv;
1602 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1603 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1604 
1605 	if (err)
1606 		*err = 0;
1607 
1608 	rv = _can_be_granted(r, lkb, now);
1609 	if (rv)
1610 		goto out;
1611 
1612 	/*
1613 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1614 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1615 	 * cancels one of the locks.
1616 	 */
1617 
1618 	if (is_convert && can_be_queued(lkb) &&
1619 	    conversion_deadlock_detect(r, lkb)) {
1620 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1621 			lkb->lkb_grmode = DLM_LOCK_NL;
1622 			lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1623 		} else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1624 			if (err)
1625 				*err = -EDEADLK;
1626 			else {
1627 				log_print("can_be_granted deadlock %x now %d",
1628 					  lkb->lkb_id, now);
1629 				dlm_dump_rsb(r);
1630 			}
1631 		}
1632 		goto out;
1633 	}
1634 
1635 	/*
1636 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1637 	 * to grant a request in a mode other than the normal rqmode.  It's a
1638 	 * simple way to provide a big optimization to applications that can
1639 	 * use them.
1640 	 */
1641 
1642 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1643 		alt = DLM_LOCK_PR;
1644 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1645 		alt = DLM_LOCK_CW;
1646 
1647 	if (alt) {
1648 		lkb->lkb_rqmode = alt;
1649 		rv = _can_be_granted(r, lkb, now);
1650 		if (rv)
1651 			lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1652 		else
1653 			lkb->lkb_rqmode = rqmode;
1654 	}
1655  out:
1656 	return rv;
1657 }
1658 
1659 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1660    for locks pending on the convert list.  Once verified (watch for these
1661    log_prints), we should be able to just call _can_be_granted() and not
1662    bother with the demote/deadlk cases here (and there's no easy way to deal
1663    with a deadlk here, we'd have to generate something like grant_lock with
1664    the deadlk error.) */
1665 
1666 /* Returns the highest requested mode of all blocked conversions; sets
1667    cw if there's a blocked conversion to DLM_LOCK_CW. */
1668 
1669 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1670 {
1671 	struct dlm_lkb *lkb, *s;
1672 	int hi, demoted, quit, grant_restart, demote_restart;
1673 	int deadlk;
1674 
1675 	quit = 0;
1676  restart:
1677 	grant_restart = 0;
1678 	demote_restart = 0;
1679 	hi = DLM_LOCK_IV;
1680 
1681 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1682 		demoted = is_demoted(lkb);
1683 		deadlk = 0;
1684 
1685 		if (can_be_granted(r, lkb, 0, &deadlk)) {
1686 			grant_lock_pending(r, lkb);
1687 			grant_restart = 1;
1688 			continue;
1689 		}
1690 
1691 		if (!demoted && is_demoted(lkb)) {
1692 			log_print("WARN: pending demoted %x node %d %s",
1693 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1694 			demote_restart = 1;
1695 			continue;
1696 		}
1697 
1698 		if (deadlk) {
1699 			log_print("WARN: pending deadlock %x node %d %s",
1700 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1701 			dlm_dump_rsb(r);
1702 			continue;
1703 		}
1704 
1705 		hi = max_t(int, lkb->lkb_rqmode, hi);
1706 
1707 		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1708 			*cw = 1;
1709 	}
1710 
1711 	if (grant_restart)
1712 		goto restart;
1713 	if (demote_restart && !quit) {
1714 		quit = 1;
1715 		goto restart;
1716 	}
1717 
1718 	return max_t(int, high, hi);
1719 }
1720 
1721 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1722 {
1723 	struct dlm_lkb *lkb, *s;
1724 
1725 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1726 		if (can_be_granted(r, lkb, 0, NULL))
1727 			grant_lock_pending(r, lkb);
1728                 else {
1729 			high = max_t(int, lkb->lkb_rqmode, high);
1730 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
1731 				*cw = 1;
1732 		}
1733 	}
1734 
1735 	return high;
1736 }
1737 
1738 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1739    on either the convert or waiting queue.
1740    high is the largest rqmode of all locks blocked on the convert or
1741    waiting queue. */
1742 
1743 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1744 {
1745 	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1746 		if (gr->lkb_highbast < DLM_LOCK_EX)
1747 			return 1;
1748 		return 0;
1749 	}
1750 
1751 	if (gr->lkb_highbast < high &&
1752 	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1753 		return 1;
1754 	return 0;
1755 }
1756 
1757 static void grant_pending_locks(struct dlm_rsb *r)
1758 {
1759 	struct dlm_lkb *lkb, *s;
1760 	int high = DLM_LOCK_IV;
1761 	int cw = 0;
1762 
1763 	DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1764 
1765 	high = grant_pending_convert(r, high, &cw);
1766 	high = grant_pending_wait(r, high, &cw);
1767 
1768 	if (high == DLM_LOCK_IV)
1769 		return;
1770 
1771 	/*
1772 	 * If there are locks left on the wait/convert queue then send blocking
1773 	 * ASTs to granted locks based on the largest requested mode (high)
1774 	 * found above.
1775 	 */
1776 
1777 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1778 		if (lkb->lkb_bastaddr && lock_requires_bast(lkb, high, cw)) {
1779 			if (cw && high == DLM_LOCK_PR)
1780 				queue_bast(r, lkb, DLM_LOCK_CW);
1781 			else
1782 				queue_bast(r, lkb, high);
1783 			lkb->lkb_highbast = high;
1784 		}
1785 	}
1786 }
1787 
1788 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1789 {
1790 	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1791 	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1792 		if (gr->lkb_highbast < DLM_LOCK_EX)
1793 			return 1;
1794 		return 0;
1795 	}
1796 
1797 	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1798 		return 1;
1799 	return 0;
1800 }
1801 
1802 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1803 			    struct dlm_lkb *lkb)
1804 {
1805 	struct dlm_lkb *gr;
1806 
1807 	list_for_each_entry(gr, head, lkb_statequeue) {
1808 		if (gr->lkb_bastaddr && modes_require_bast(gr, lkb)) {
1809 			queue_bast(r, gr, lkb->lkb_rqmode);
1810 			gr->lkb_highbast = lkb->lkb_rqmode;
1811 		}
1812 	}
1813 }
1814 
1815 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1816 {
1817 	send_bast_queue(r, &r->res_grantqueue, lkb);
1818 }
1819 
1820 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1821 {
1822 	send_bast_queue(r, &r->res_grantqueue, lkb);
1823 	send_bast_queue(r, &r->res_convertqueue, lkb);
1824 }
1825 
1826 /* set_master(r, lkb) -- set the master nodeid of a resource
1827 
1828    The purpose of this function is to set the nodeid field in the given
1829    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1830    known, it can just be copied to the lkb and the function will return
1831    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1832    before it can be copied to the lkb.
1833 
1834    When the rsb nodeid is being looked up remotely, the initial lkb
1835    causing the lookup is kept on the ls_waiters list waiting for the
1836    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1837    on the rsb's res_lookup list until the master is verified.
1838 
1839    Return values:
1840    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1841    1: the rsb master is not available and the lkb has been placed on
1842       a wait queue
1843 */
1844 
1845 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1846 {
1847 	struct dlm_ls *ls = r->res_ls;
1848 	int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1849 
1850 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1851 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1852 		r->res_first_lkid = lkb->lkb_id;
1853 		lkb->lkb_nodeid = r->res_nodeid;
1854 		return 0;
1855 	}
1856 
1857 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1858 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1859 		return 1;
1860 	}
1861 
1862 	if (r->res_nodeid == 0) {
1863 		lkb->lkb_nodeid = 0;
1864 		return 0;
1865 	}
1866 
1867 	if (r->res_nodeid > 0) {
1868 		lkb->lkb_nodeid = r->res_nodeid;
1869 		return 0;
1870 	}
1871 
1872 	DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1873 
1874 	dir_nodeid = dlm_dir_nodeid(r);
1875 
1876 	if (dir_nodeid != our_nodeid) {
1877 		r->res_first_lkid = lkb->lkb_id;
1878 		send_lookup(r, lkb);
1879 		return 1;
1880 	}
1881 
1882 	for (i = 0; i < 2; i++) {
1883 		/* It's possible for dlm_scand to remove an old rsb for
1884 		   this same resource from the toss list, us to create
1885 		   a new one, look up the master locally, and find it
1886 		   already exists just before dlm_scand does the
1887 		   dir_remove() on the previous rsb. */
1888 
1889 		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1890 				       r->res_length, &ret_nodeid);
1891 		if (!error)
1892 			break;
1893 		log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1894 		schedule();
1895 	}
1896 	if (error && error != -EEXIST)
1897 		return error;
1898 
1899 	if (ret_nodeid == our_nodeid) {
1900 		r->res_first_lkid = 0;
1901 		r->res_nodeid = 0;
1902 		lkb->lkb_nodeid = 0;
1903 	} else {
1904 		r->res_first_lkid = lkb->lkb_id;
1905 		r->res_nodeid = ret_nodeid;
1906 		lkb->lkb_nodeid = ret_nodeid;
1907 	}
1908 	return 0;
1909 }
1910 
1911 static void process_lookup_list(struct dlm_rsb *r)
1912 {
1913 	struct dlm_lkb *lkb, *safe;
1914 
1915 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1916 		list_del_init(&lkb->lkb_rsb_lookup);
1917 		_request_lock(r, lkb);
1918 		schedule();
1919 	}
1920 }
1921 
1922 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1923 
1924 static void confirm_master(struct dlm_rsb *r, int error)
1925 {
1926 	struct dlm_lkb *lkb;
1927 
1928 	if (!r->res_first_lkid)
1929 		return;
1930 
1931 	switch (error) {
1932 	case 0:
1933 	case -EINPROGRESS:
1934 		r->res_first_lkid = 0;
1935 		process_lookup_list(r);
1936 		break;
1937 
1938 	case -EAGAIN:
1939 	case -EBADR:
1940 	case -ENOTBLK:
1941 		/* the remote request failed and won't be retried (it was
1942 		   a NOQUEUE, or has been canceled/unlocked); make a waiting
1943 		   lkb the first_lkid */
1944 
1945 		r->res_first_lkid = 0;
1946 
1947 		if (!list_empty(&r->res_lookup)) {
1948 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1949 					 lkb_rsb_lookup);
1950 			list_del_init(&lkb->lkb_rsb_lookup);
1951 			r->res_first_lkid = lkb->lkb_id;
1952 			_request_lock(r, lkb);
1953 		} else
1954 			r->res_nodeid = -1;
1955 		break;
1956 
1957 	default:
1958 		log_error(r->res_ls, "confirm_master unknown error %d", error);
1959 	}
1960 }
1961 
1962 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1963 			 int namelen, unsigned long timeout_cs, void *ast,
1964 			 void *astarg, void *bast, struct dlm_args *args)
1965 {
1966 	int rv = -EINVAL;
1967 
1968 	/* check for invalid arg usage */
1969 
1970 	if (mode < 0 || mode > DLM_LOCK_EX)
1971 		goto out;
1972 
1973 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1974 		goto out;
1975 
1976 	if (flags & DLM_LKF_CANCEL)
1977 		goto out;
1978 
1979 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1980 		goto out;
1981 
1982 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1983 		goto out;
1984 
1985 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1986 		goto out;
1987 
1988 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1989 		goto out;
1990 
1991 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1992 		goto out;
1993 
1994 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1995 		goto out;
1996 
1997 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1998 		goto out;
1999 
2000 	if (!ast || !lksb)
2001 		goto out;
2002 
2003 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2004 		goto out;
2005 
2006 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2007 		goto out;
2008 
2009 	/* these args will be copied to the lkb in validate_lock_args,
2010 	   it cannot be done now because when converting locks, fields in
2011 	   an active lkb cannot be modified before locking the rsb */
2012 
2013 	args->flags = flags;
2014 	args->astaddr = ast;
2015 	args->astparam = (long) astarg;
2016 	args->bastaddr = bast;
2017 	args->timeout = timeout_cs;
2018 	args->mode = mode;
2019 	args->lksb = lksb;
2020 	rv = 0;
2021  out:
2022 	return rv;
2023 }
2024 
2025 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2026 {
2027 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2028  		      DLM_LKF_FORCEUNLOCK))
2029 		return -EINVAL;
2030 
2031 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2032 		return -EINVAL;
2033 
2034 	args->flags = flags;
2035 	args->astparam = (long) astarg;
2036 	return 0;
2037 }
2038 
2039 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2040 			      struct dlm_args *args)
2041 {
2042 	int rv = -EINVAL;
2043 
2044 	if (args->flags & DLM_LKF_CONVERT) {
2045 		if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2046 			goto out;
2047 
2048 		if (args->flags & DLM_LKF_QUECVT &&
2049 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2050 			goto out;
2051 
2052 		rv = -EBUSY;
2053 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2054 			goto out;
2055 
2056 		if (lkb->lkb_wait_type)
2057 			goto out;
2058 
2059 		if (is_overlap(lkb))
2060 			goto out;
2061 	}
2062 
2063 	lkb->lkb_exflags = args->flags;
2064 	lkb->lkb_sbflags = 0;
2065 	lkb->lkb_astaddr = args->astaddr;
2066 	lkb->lkb_astparam = args->astparam;
2067 	lkb->lkb_bastaddr = args->bastaddr;
2068 	lkb->lkb_rqmode = args->mode;
2069 	lkb->lkb_lksb = args->lksb;
2070 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2071 	lkb->lkb_ownpid = (int) current->pid;
2072 	lkb->lkb_timeout_cs = args->timeout;
2073 	rv = 0;
2074  out:
2075 	return rv;
2076 }
2077 
2078 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2079    for success */
2080 
2081 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2082    because there may be a lookup in progress and it's valid to do
2083    cancel/unlockf on it */
2084 
2085 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2086 {
2087 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2088 	int rv = -EINVAL;
2089 
2090 	if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2091 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2092 		dlm_print_lkb(lkb);
2093 		goto out;
2094 	}
2095 
2096 	/* an lkb may still exist even though the lock is EOL'ed due to a
2097 	   cancel, unlock or failed noqueue request; an app can't use these
2098 	   locks; return same error as if the lkid had not been found at all */
2099 
2100 	if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2101 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2102 		rv = -ENOENT;
2103 		goto out;
2104 	}
2105 
2106 	/* an lkb may be waiting for an rsb lookup to complete where the
2107 	   lookup was initiated by another lock */
2108 
2109 	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2110 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2111 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2112 			list_del_init(&lkb->lkb_rsb_lookup);
2113 			queue_cast(lkb->lkb_resource, lkb,
2114 				   args->flags & DLM_LKF_CANCEL ?
2115 				   -DLM_ECANCEL : -DLM_EUNLOCK);
2116 			unhold_lkb(lkb); /* undoes create_lkb() */
2117 		}
2118 		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2119 		rv = -EBUSY;
2120 		goto out;
2121 	}
2122 
2123 	/* cancel not allowed with another cancel/unlock in progress */
2124 
2125 	if (args->flags & DLM_LKF_CANCEL) {
2126 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2127 			goto out;
2128 
2129 		if (is_overlap(lkb))
2130 			goto out;
2131 
2132 		/* don't let scand try to do a cancel */
2133 		del_timeout(lkb);
2134 
2135 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2136 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2137 			rv = -EBUSY;
2138 			goto out;
2139 		}
2140 
2141 		switch (lkb->lkb_wait_type) {
2142 		case DLM_MSG_LOOKUP:
2143 		case DLM_MSG_REQUEST:
2144 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2145 			rv = -EBUSY;
2146 			goto out;
2147 		case DLM_MSG_UNLOCK:
2148 		case DLM_MSG_CANCEL:
2149 			goto out;
2150 		}
2151 		/* add_to_waiters() will set OVERLAP_CANCEL */
2152 		goto out_ok;
2153 	}
2154 
2155 	/* do we need to allow a force-unlock if there's a normal unlock
2156 	   already in progress?  in what conditions could the normal unlock
2157 	   fail such that we'd want to send a force-unlock to be sure? */
2158 
2159 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2160 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2161 			goto out;
2162 
2163 		if (is_overlap_unlock(lkb))
2164 			goto out;
2165 
2166 		/* don't let scand try to do a cancel */
2167 		del_timeout(lkb);
2168 
2169 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2170 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2171 			rv = -EBUSY;
2172 			goto out;
2173 		}
2174 
2175 		switch (lkb->lkb_wait_type) {
2176 		case DLM_MSG_LOOKUP:
2177 		case DLM_MSG_REQUEST:
2178 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2179 			rv = -EBUSY;
2180 			goto out;
2181 		case DLM_MSG_UNLOCK:
2182 			goto out;
2183 		}
2184 		/* add_to_waiters() will set OVERLAP_UNLOCK */
2185 		goto out_ok;
2186 	}
2187 
2188 	/* normal unlock not allowed if there's any op in progress */
2189 	rv = -EBUSY;
2190 	if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2191 		goto out;
2192 
2193  out_ok:
2194 	/* an overlapping op shouldn't blow away exflags from other op */
2195 	lkb->lkb_exflags |= args->flags;
2196 	lkb->lkb_sbflags = 0;
2197 	lkb->lkb_astparam = args->astparam;
2198 	rv = 0;
2199  out:
2200 	if (rv)
2201 		log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2202 			  lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2203 			  args->flags, lkb->lkb_wait_type,
2204 			  lkb->lkb_resource->res_name);
2205 	return rv;
2206 }
2207 
2208 /*
2209  * Four stage 4 varieties:
2210  * do_request(), do_convert(), do_unlock(), do_cancel()
2211  * These are called on the master node for the given lock and
2212  * from the central locking logic.
2213  */
2214 
2215 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2216 {
2217 	int error = 0;
2218 
2219 	if (can_be_granted(r, lkb, 1, NULL)) {
2220 		grant_lock(r, lkb);
2221 		queue_cast(r, lkb, 0);
2222 		goto out;
2223 	}
2224 
2225 	if (can_be_queued(lkb)) {
2226 		error = -EINPROGRESS;
2227 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
2228 		send_blocking_asts(r, lkb);
2229 		add_timeout(lkb);
2230 		goto out;
2231 	}
2232 
2233 	error = -EAGAIN;
2234 	if (force_blocking_asts(lkb))
2235 		send_blocking_asts_all(r, lkb);
2236 	queue_cast(r, lkb, -EAGAIN);
2237 
2238  out:
2239 	return error;
2240 }
2241 
2242 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2243 {
2244 	int error = 0;
2245 	int deadlk = 0;
2246 
2247 	/* changing an existing lock may allow others to be granted */
2248 
2249 	if (can_be_granted(r, lkb, 1, &deadlk)) {
2250 		grant_lock(r, lkb);
2251 		queue_cast(r, lkb, 0);
2252 		grant_pending_locks(r);
2253 		goto out;
2254 	}
2255 
2256 	/* can_be_granted() detected that this lock would block in a conversion
2257 	   deadlock, so we leave it on the granted queue and return EDEADLK in
2258 	   the ast for the convert. */
2259 
2260 	if (deadlk) {
2261 		/* it's left on the granted queue */
2262 		log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2263 			  lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2264 			  lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2265 		revert_lock(r, lkb);
2266 		queue_cast(r, lkb, -EDEADLK);
2267 		error = -EDEADLK;
2268 		goto out;
2269 	}
2270 
2271 	/* is_demoted() means the can_be_granted() above set the grmode
2272 	   to NL, and left us on the granted queue.  This auto-demotion
2273 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
2274 	   now grantable.  We have to try to grant other converting locks
2275 	   before we try again to grant this one. */
2276 
2277 	if (is_demoted(lkb)) {
2278 		grant_pending_convert(r, DLM_LOCK_IV, NULL);
2279 		if (_can_be_granted(r, lkb, 1)) {
2280 			grant_lock(r, lkb);
2281 			queue_cast(r, lkb, 0);
2282 			grant_pending_locks(r);
2283 			goto out;
2284 		}
2285 		/* else fall through and move to convert queue */
2286 	}
2287 
2288 	if (can_be_queued(lkb)) {
2289 		error = -EINPROGRESS;
2290 		del_lkb(r, lkb);
2291 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2292 		send_blocking_asts(r, lkb);
2293 		add_timeout(lkb);
2294 		goto out;
2295 	}
2296 
2297 	error = -EAGAIN;
2298 	if (force_blocking_asts(lkb))
2299 		send_blocking_asts_all(r, lkb);
2300 	queue_cast(r, lkb, -EAGAIN);
2301 
2302  out:
2303 	return error;
2304 }
2305 
2306 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2307 {
2308 	remove_lock(r, lkb);
2309 	queue_cast(r, lkb, -DLM_EUNLOCK);
2310 	grant_pending_locks(r);
2311 	return -DLM_EUNLOCK;
2312 }
2313 
2314 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2315 
2316 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2317 {
2318 	int error;
2319 
2320 	error = revert_lock(r, lkb);
2321 	if (error) {
2322 		queue_cast(r, lkb, -DLM_ECANCEL);
2323 		grant_pending_locks(r);
2324 		return -DLM_ECANCEL;
2325 	}
2326 	return 0;
2327 }
2328 
2329 /*
2330  * Four stage 3 varieties:
2331  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2332  */
2333 
2334 /* add a new lkb to a possibly new rsb, called by requesting process */
2335 
2336 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2337 {
2338 	int error;
2339 
2340 	/* set_master: sets lkb nodeid from r */
2341 
2342 	error = set_master(r, lkb);
2343 	if (error < 0)
2344 		goto out;
2345 	if (error) {
2346 		error = 0;
2347 		goto out;
2348 	}
2349 
2350 	if (is_remote(r))
2351 		/* receive_request() calls do_request() on remote node */
2352 		error = send_request(r, lkb);
2353 	else
2354 		error = do_request(r, lkb);
2355  out:
2356 	return error;
2357 }
2358 
2359 /* change some property of an existing lkb, e.g. mode */
2360 
2361 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2362 {
2363 	int error;
2364 
2365 	if (is_remote(r))
2366 		/* receive_convert() calls do_convert() on remote node */
2367 		error = send_convert(r, lkb);
2368 	else
2369 		error = do_convert(r, lkb);
2370 
2371 	return error;
2372 }
2373 
2374 /* remove an existing lkb from the granted queue */
2375 
2376 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2377 {
2378 	int error;
2379 
2380 	if (is_remote(r))
2381 		/* receive_unlock() calls do_unlock() on remote node */
2382 		error = send_unlock(r, lkb);
2383 	else
2384 		error = do_unlock(r, lkb);
2385 
2386 	return error;
2387 }
2388 
2389 /* remove an existing lkb from the convert or wait queue */
2390 
2391 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2392 {
2393 	int error;
2394 
2395 	if (is_remote(r))
2396 		/* receive_cancel() calls do_cancel() on remote node */
2397 		error = send_cancel(r, lkb);
2398 	else
2399 		error = do_cancel(r, lkb);
2400 
2401 	return error;
2402 }
2403 
2404 /*
2405  * Four stage 2 varieties:
2406  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2407  */
2408 
2409 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2410 			int len, struct dlm_args *args)
2411 {
2412 	struct dlm_rsb *r;
2413 	int error;
2414 
2415 	error = validate_lock_args(ls, lkb, args);
2416 	if (error)
2417 		goto out;
2418 
2419 	error = find_rsb(ls, name, len, R_CREATE, &r);
2420 	if (error)
2421 		goto out;
2422 
2423 	lock_rsb(r);
2424 
2425 	attach_lkb(r, lkb);
2426 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2427 
2428 	error = _request_lock(r, lkb);
2429 
2430 	unlock_rsb(r);
2431 	put_rsb(r);
2432 
2433  out:
2434 	return error;
2435 }
2436 
2437 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2438 			struct dlm_args *args)
2439 {
2440 	struct dlm_rsb *r;
2441 	int error;
2442 
2443 	r = lkb->lkb_resource;
2444 
2445 	hold_rsb(r);
2446 	lock_rsb(r);
2447 
2448 	error = validate_lock_args(ls, lkb, args);
2449 	if (error)
2450 		goto out;
2451 
2452 	error = _convert_lock(r, lkb);
2453  out:
2454 	unlock_rsb(r);
2455 	put_rsb(r);
2456 	return error;
2457 }
2458 
2459 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2460 		       struct dlm_args *args)
2461 {
2462 	struct dlm_rsb *r;
2463 	int error;
2464 
2465 	r = lkb->lkb_resource;
2466 
2467 	hold_rsb(r);
2468 	lock_rsb(r);
2469 
2470 	error = validate_unlock_args(lkb, args);
2471 	if (error)
2472 		goto out;
2473 
2474 	error = _unlock_lock(r, lkb);
2475  out:
2476 	unlock_rsb(r);
2477 	put_rsb(r);
2478 	return error;
2479 }
2480 
2481 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2482 		       struct dlm_args *args)
2483 {
2484 	struct dlm_rsb *r;
2485 	int error;
2486 
2487 	r = lkb->lkb_resource;
2488 
2489 	hold_rsb(r);
2490 	lock_rsb(r);
2491 
2492 	error = validate_unlock_args(lkb, args);
2493 	if (error)
2494 		goto out;
2495 
2496 	error = _cancel_lock(r, lkb);
2497  out:
2498 	unlock_rsb(r);
2499 	put_rsb(r);
2500 	return error;
2501 }
2502 
2503 /*
2504  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2505  */
2506 
2507 int dlm_lock(dlm_lockspace_t *lockspace,
2508 	     int mode,
2509 	     struct dlm_lksb *lksb,
2510 	     uint32_t flags,
2511 	     void *name,
2512 	     unsigned int namelen,
2513 	     uint32_t parent_lkid,
2514 	     void (*ast) (void *astarg),
2515 	     void *astarg,
2516 	     void (*bast) (void *astarg, int mode))
2517 {
2518 	struct dlm_ls *ls;
2519 	struct dlm_lkb *lkb;
2520 	struct dlm_args args;
2521 	int error, convert = flags & DLM_LKF_CONVERT;
2522 
2523 	ls = dlm_find_lockspace_local(lockspace);
2524 	if (!ls)
2525 		return -EINVAL;
2526 
2527 	dlm_lock_recovery(ls);
2528 
2529 	if (convert)
2530 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
2531 	else
2532 		error = create_lkb(ls, &lkb);
2533 
2534 	if (error)
2535 		goto out;
2536 
2537 	error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2538 			      astarg, bast, &args);
2539 	if (error)
2540 		goto out_put;
2541 
2542 	if (convert)
2543 		error = convert_lock(ls, lkb, &args);
2544 	else
2545 		error = request_lock(ls, lkb, name, namelen, &args);
2546 
2547 	if (error == -EINPROGRESS)
2548 		error = 0;
2549  out_put:
2550 	if (convert || error)
2551 		__put_lkb(ls, lkb);
2552 	if (error == -EAGAIN || error == -EDEADLK)
2553 		error = 0;
2554  out:
2555 	dlm_unlock_recovery(ls);
2556 	dlm_put_lockspace(ls);
2557 	return error;
2558 }
2559 
2560 int dlm_unlock(dlm_lockspace_t *lockspace,
2561 	       uint32_t lkid,
2562 	       uint32_t flags,
2563 	       struct dlm_lksb *lksb,
2564 	       void *astarg)
2565 {
2566 	struct dlm_ls *ls;
2567 	struct dlm_lkb *lkb;
2568 	struct dlm_args args;
2569 	int error;
2570 
2571 	ls = dlm_find_lockspace_local(lockspace);
2572 	if (!ls)
2573 		return -EINVAL;
2574 
2575 	dlm_lock_recovery(ls);
2576 
2577 	error = find_lkb(ls, lkid, &lkb);
2578 	if (error)
2579 		goto out;
2580 
2581 	error = set_unlock_args(flags, astarg, &args);
2582 	if (error)
2583 		goto out_put;
2584 
2585 	if (flags & DLM_LKF_CANCEL)
2586 		error = cancel_lock(ls, lkb, &args);
2587 	else
2588 		error = unlock_lock(ls, lkb, &args);
2589 
2590 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2591 		error = 0;
2592 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2593 		error = 0;
2594  out_put:
2595 	dlm_put_lkb(lkb);
2596  out:
2597 	dlm_unlock_recovery(ls);
2598 	dlm_put_lockspace(ls);
2599 	return error;
2600 }
2601 
2602 /*
2603  * send/receive routines for remote operations and replies
2604  *
2605  * send_args
2606  * send_common
2607  * send_request			receive_request
2608  * send_convert			receive_convert
2609  * send_unlock			receive_unlock
2610  * send_cancel			receive_cancel
2611  * send_grant			receive_grant
2612  * send_bast			receive_bast
2613  * send_lookup			receive_lookup
2614  * send_remove			receive_remove
2615  *
2616  * 				send_common_reply
2617  * receive_request_reply	send_request_reply
2618  * receive_convert_reply	send_convert_reply
2619  * receive_unlock_reply		send_unlock_reply
2620  * receive_cancel_reply		send_cancel_reply
2621  * receive_lookup_reply		send_lookup_reply
2622  */
2623 
2624 static int _create_message(struct dlm_ls *ls, int mb_len,
2625 			   int to_nodeid, int mstype,
2626 			   struct dlm_message **ms_ret,
2627 			   struct dlm_mhandle **mh_ret)
2628 {
2629 	struct dlm_message *ms;
2630 	struct dlm_mhandle *mh;
2631 	char *mb;
2632 
2633 	/* get_buffer gives us a message handle (mh) that we need to
2634 	   pass into lowcomms_commit and a message buffer (mb) that we
2635 	   write our data into */
2636 
2637 	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
2638 	if (!mh)
2639 		return -ENOBUFS;
2640 
2641 	memset(mb, 0, mb_len);
2642 
2643 	ms = (struct dlm_message *) mb;
2644 
2645 	ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2646 	ms->m_header.h_lockspace = ls->ls_global_id;
2647 	ms->m_header.h_nodeid = dlm_our_nodeid();
2648 	ms->m_header.h_length = mb_len;
2649 	ms->m_header.h_cmd = DLM_MSG;
2650 
2651 	ms->m_type = mstype;
2652 
2653 	*mh_ret = mh;
2654 	*ms_ret = ms;
2655 	return 0;
2656 }
2657 
2658 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2659 			  int to_nodeid, int mstype,
2660 			  struct dlm_message **ms_ret,
2661 			  struct dlm_mhandle **mh_ret)
2662 {
2663 	int mb_len = sizeof(struct dlm_message);
2664 
2665 	switch (mstype) {
2666 	case DLM_MSG_REQUEST:
2667 	case DLM_MSG_LOOKUP:
2668 	case DLM_MSG_REMOVE:
2669 		mb_len += r->res_length;
2670 		break;
2671 	case DLM_MSG_CONVERT:
2672 	case DLM_MSG_UNLOCK:
2673 	case DLM_MSG_REQUEST_REPLY:
2674 	case DLM_MSG_CONVERT_REPLY:
2675 	case DLM_MSG_GRANT:
2676 		if (lkb && lkb->lkb_lvbptr)
2677 			mb_len += r->res_ls->ls_lvblen;
2678 		break;
2679 	}
2680 
2681 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2682 			       ms_ret, mh_ret);
2683 }
2684 
2685 /* further lowcomms enhancements or alternate implementations may make
2686    the return value from this function useful at some point */
2687 
2688 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2689 {
2690 	dlm_message_out(ms);
2691 	dlm_lowcomms_commit_buffer(mh);
2692 	return 0;
2693 }
2694 
2695 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2696 		      struct dlm_message *ms)
2697 {
2698 	ms->m_nodeid   = lkb->lkb_nodeid;
2699 	ms->m_pid      = lkb->lkb_ownpid;
2700 	ms->m_lkid     = lkb->lkb_id;
2701 	ms->m_remid    = lkb->lkb_remid;
2702 	ms->m_exflags  = lkb->lkb_exflags;
2703 	ms->m_sbflags  = lkb->lkb_sbflags;
2704 	ms->m_flags    = lkb->lkb_flags;
2705 	ms->m_lvbseq   = lkb->lkb_lvbseq;
2706 	ms->m_status   = lkb->lkb_status;
2707 	ms->m_grmode   = lkb->lkb_grmode;
2708 	ms->m_rqmode   = lkb->lkb_rqmode;
2709 	ms->m_hash     = r->res_hash;
2710 
2711 	/* m_result and m_bastmode are set from function args,
2712 	   not from lkb fields */
2713 
2714 	if (lkb->lkb_bastaddr)
2715 		ms->m_asts |= AST_BAST;
2716 	if (lkb->lkb_astaddr)
2717 		ms->m_asts |= AST_COMP;
2718 
2719 	/* compare with switch in create_message; send_remove() doesn't
2720 	   use send_args() */
2721 
2722 	switch (ms->m_type) {
2723 	case DLM_MSG_REQUEST:
2724 	case DLM_MSG_LOOKUP:
2725 		memcpy(ms->m_extra, r->res_name, r->res_length);
2726 		break;
2727 	case DLM_MSG_CONVERT:
2728 	case DLM_MSG_UNLOCK:
2729 	case DLM_MSG_REQUEST_REPLY:
2730 	case DLM_MSG_CONVERT_REPLY:
2731 	case DLM_MSG_GRANT:
2732 		if (!lkb->lkb_lvbptr)
2733 			break;
2734 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2735 		break;
2736 	}
2737 }
2738 
2739 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2740 {
2741 	struct dlm_message *ms;
2742 	struct dlm_mhandle *mh;
2743 	int to_nodeid, error;
2744 
2745 	error = add_to_waiters(lkb, mstype);
2746 	if (error)
2747 		return error;
2748 
2749 	to_nodeid = r->res_nodeid;
2750 
2751 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2752 	if (error)
2753 		goto fail;
2754 
2755 	send_args(r, lkb, ms);
2756 
2757 	error = send_message(mh, ms);
2758 	if (error)
2759 		goto fail;
2760 	return 0;
2761 
2762  fail:
2763 	remove_from_waiters(lkb, msg_reply_type(mstype));
2764 	return error;
2765 }
2766 
2767 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2768 {
2769 	return send_common(r, lkb, DLM_MSG_REQUEST);
2770 }
2771 
2772 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2773 {
2774 	int error;
2775 
2776 	error = send_common(r, lkb, DLM_MSG_CONVERT);
2777 
2778 	/* down conversions go without a reply from the master */
2779 	if (!error && down_conversion(lkb)) {
2780 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2781 		r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2782 		r->res_ls->ls_stub_ms.m_result = 0;
2783 		r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2784 		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2785 	}
2786 
2787 	return error;
2788 }
2789 
2790 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2791    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2792    that the master is still correct. */
2793 
2794 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2795 {
2796 	return send_common(r, lkb, DLM_MSG_UNLOCK);
2797 }
2798 
2799 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2800 {
2801 	return send_common(r, lkb, DLM_MSG_CANCEL);
2802 }
2803 
2804 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2805 {
2806 	struct dlm_message *ms;
2807 	struct dlm_mhandle *mh;
2808 	int to_nodeid, error;
2809 
2810 	to_nodeid = lkb->lkb_nodeid;
2811 
2812 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2813 	if (error)
2814 		goto out;
2815 
2816 	send_args(r, lkb, ms);
2817 
2818 	ms->m_result = 0;
2819 
2820 	error = send_message(mh, ms);
2821  out:
2822 	return error;
2823 }
2824 
2825 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2826 {
2827 	struct dlm_message *ms;
2828 	struct dlm_mhandle *mh;
2829 	int to_nodeid, error;
2830 
2831 	to_nodeid = lkb->lkb_nodeid;
2832 
2833 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2834 	if (error)
2835 		goto out;
2836 
2837 	send_args(r, lkb, ms);
2838 
2839 	ms->m_bastmode = mode;
2840 
2841 	error = send_message(mh, ms);
2842  out:
2843 	return error;
2844 }
2845 
2846 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2847 {
2848 	struct dlm_message *ms;
2849 	struct dlm_mhandle *mh;
2850 	int to_nodeid, error;
2851 
2852 	error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2853 	if (error)
2854 		return error;
2855 
2856 	to_nodeid = dlm_dir_nodeid(r);
2857 
2858 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2859 	if (error)
2860 		goto fail;
2861 
2862 	send_args(r, lkb, ms);
2863 
2864 	error = send_message(mh, ms);
2865 	if (error)
2866 		goto fail;
2867 	return 0;
2868 
2869  fail:
2870 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2871 	return error;
2872 }
2873 
2874 static int send_remove(struct dlm_rsb *r)
2875 {
2876 	struct dlm_message *ms;
2877 	struct dlm_mhandle *mh;
2878 	int to_nodeid, error;
2879 
2880 	to_nodeid = dlm_dir_nodeid(r);
2881 
2882 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2883 	if (error)
2884 		goto out;
2885 
2886 	memcpy(ms->m_extra, r->res_name, r->res_length);
2887 	ms->m_hash = r->res_hash;
2888 
2889 	error = send_message(mh, ms);
2890  out:
2891 	return error;
2892 }
2893 
2894 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2895 			     int mstype, int rv)
2896 {
2897 	struct dlm_message *ms;
2898 	struct dlm_mhandle *mh;
2899 	int to_nodeid, error;
2900 
2901 	to_nodeid = lkb->lkb_nodeid;
2902 
2903 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2904 	if (error)
2905 		goto out;
2906 
2907 	send_args(r, lkb, ms);
2908 
2909 	ms->m_result = rv;
2910 
2911 	error = send_message(mh, ms);
2912  out:
2913 	return error;
2914 }
2915 
2916 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2917 {
2918 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2919 }
2920 
2921 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2922 {
2923 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2924 }
2925 
2926 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2927 {
2928 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2929 }
2930 
2931 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2932 {
2933 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2934 }
2935 
2936 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2937 			     int ret_nodeid, int rv)
2938 {
2939 	struct dlm_rsb *r = &ls->ls_stub_rsb;
2940 	struct dlm_message *ms;
2941 	struct dlm_mhandle *mh;
2942 	int error, nodeid = ms_in->m_header.h_nodeid;
2943 
2944 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2945 	if (error)
2946 		goto out;
2947 
2948 	ms->m_lkid = ms_in->m_lkid;
2949 	ms->m_result = rv;
2950 	ms->m_nodeid = ret_nodeid;
2951 
2952 	error = send_message(mh, ms);
2953  out:
2954 	return error;
2955 }
2956 
2957 /* which args we save from a received message depends heavily on the type
2958    of message, unlike the send side where we can safely send everything about
2959    the lkb for any type of message */
2960 
2961 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2962 {
2963 	lkb->lkb_exflags = ms->m_exflags;
2964 	lkb->lkb_sbflags = ms->m_sbflags;
2965 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2966 		         (ms->m_flags & 0x0000FFFF);
2967 }
2968 
2969 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2970 {
2971 	lkb->lkb_sbflags = ms->m_sbflags;
2972 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2973 		         (ms->m_flags & 0x0000FFFF);
2974 }
2975 
2976 static int receive_extralen(struct dlm_message *ms)
2977 {
2978 	return (ms->m_header.h_length - sizeof(struct dlm_message));
2979 }
2980 
2981 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2982 		       struct dlm_message *ms)
2983 {
2984 	int len;
2985 
2986 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2987 		if (!lkb->lkb_lvbptr)
2988 			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
2989 		if (!lkb->lkb_lvbptr)
2990 			return -ENOMEM;
2991 		len = receive_extralen(ms);
2992 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2993 	}
2994 	return 0;
2995 }
2996 
2997 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2998 				struct dlm_message *ms)
2999 {
3000 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3001 	lkb->lkb_ownpid = ms->m_pid;
3002 	lkb->lkb_remid = ms->m_lkid;
3003 	lkb->lkb_grmode = DLM_LOCK_IV;
3004 	lkb->lkb_rqmode = ms->m_rqmode;
3005 	lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
3006 	lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
3007 
3008 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3009 		/* lkb was just created so there won't be an lvb yet */
3010 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3011 		if (!lkb->lkb_lvbptr)
3012 			return -ENOMEM;
3013 	}
3014 
3015 	return 0;
3016 }
3017 
3018 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3019 				struct dlm_message *ms)
3020 {
3021 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3022 		return -EBUSY;
3023 
3024 	if (receive_lvb(ls, lkb, ms))
3025 		return -ENOMEM;
3026 
3027 	lkb->lkb_rqmode = ms->m_rqmode;
3028 	lkb->lkb_lvbseq = ms->m_lvbseq;
3029 
3030 	return 0;
3031 }
3032 
3033 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3034 			       struct dlm_message *ms)
3035 {
3036 	if (receive_lvb(ls, lkb, ms))
3037 		return -ENOMEM;
3038 	return 0;
3039 }
3040 
3041 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3042    uses to send a reply and that the remote end uses to process the reply. */
3043 
3044 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3045 {
3046 	struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3047 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3048 	lkb->lkb_remid = ms->m_lkid;
3049 }
3050 
3051 /* This is called after the rsb is locked so that we can safely inspect
3052    fields in the lkb. */
3053 
3054 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3055 {
3056 	int from = ms->m_header.h_nodeid;
3057 	int error = 0;
3058 
3059 	switch (ms->m_type) {
3060 	case DLM_MSG_CONVERT:
3061 	case DLM_MSG_UNLOCK:
3062 	case DLM_MSG_CANCEL:
3063 		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3064 			error = -EINVAL;
3065 		break;
3066 
3067 	case DLM_MSG_CONVERT_REPLY:
3068 	case DLM_MSG_UNLOCK_REPLY:
3069 	case DLM_MSG_CANCEL_REPLY:
3070 	case DLM_MSG_GRANT:
3071 	case DLM_MSG_BAST:
3072 		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3073 			error = -EINVAL;
3074 		break;
3075 
3076 	case DLM_MSG_REQUEST_REPLY:
3077 		if (!is_process_copy(lkb))
3078 			error = -EINVAL;
3079 		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3080 			error = -EINVAL;
3081 		break;
3082 
3083 	default:
3084 		error = -EINVAL;
3085 	}
3086 
3087 	if (error)
3088 		log_error(lkb->lkb_resource->res_ls,
3089 			  "ignore invalid message %d from %d %x %x %x %d",
3090 			  ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3091 			  lkb->lkb_flags, lkb->lkb_nodeid);
3092 	return error;
3093 }
3094 
3095 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3096 {
3097 	struct dlm_lkb *lkb;
3098 	struct dlm_rsb *r;
3099 	int error, namelen;
3100 
3101 	error = create_lkb(ls, &lkb);
3102 	if (error)
3103 		goto fail;
3104 
3105 	receive_flags(lkb, ms);
3106 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
3107 	error = receive_request_args(ls, lkb, ms);
3108 	if (error) {
3109 		__put_lkb(ls, lkb);
3110 		goto fail;
3111 	}
3112 
3113 	namelen = receive_extralen(ms);
3114 
3115 	error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3116 	if (error) {
3117 		__put_lkb(ls, lkb);
3118 		goto fail;
3119 	}
3120 
3121 	lock_rsb(r);
3122 
3123 	attach_lkb(r, lkb);
3124 	error = do_request(r, lkb);
3125 	send_request_reply(r, lkb, error);
3126 
3127 	unlock_rsb(r);
3128 	put_rsb(r);
3129 
3130 	if (error == -EINPROGRESS)
3131 		error = 0;
3132 	if (error)
3133 		dlm_put_lkb(lkb);
3134 	return;
3135 
3136  fail:
3137 	setup_stub_lkb(ls, ms);
3138 	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3139 }
3140 
3141 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3142 {
3143 	struct dlm_lkb *lkb;
3144 	struct dlm_rsb *r;
3145 	int error, reply = 1;
3146 
3147 	error = find_lkb(ls, ms->m_remid, &lkb);
3148 	if (error)
3149 		goto fail;
3150 
3151 	r = lkb->lkb_resource;
3152 
3153 	hold_rsb(r);
3154 	lock_rsb(r);
3155 
3156 	error = validate_message(lkb, ms);
3157 	if (error)
3158 		goto out;
3159 
3160 	receive_flags(lkb, ms);
3161 	error = receive_convert_args(ls, lkb, ms);
3162 	if (error)
3163 		goto out_reply;
3164 	reply = !down_conversion(lkb);
3165 
3166 	error = do_convert(r, lkb);
3167  out_reply:
3168 	if (reply)
3169 		send_convert_reply(r, lkb, error);
3170  out:
3171 	unlock_rsb(r);
3172 	put_rsb(r);
3173 	dlm_put_lkb(lkb);
3174 	return;
3175 
3176  fail:
3177 	setup_stub_lkb(ls, ms);
3178 	send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3179 }
3180 
3181 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3182 {
3183 	struct dlm_lkb *lkb;
3184 	struct dlm_rsb *r;
3185 	int error;
3186 
3187 	error = find_lkb(ls, ms->m_remid, &lkb);
3188 	if (error)
3189 		goto fail;
3190 
3191 	r = lkb->lkb_resource;
3192 
3193 	hold_rsb(r);
3194 	lock_rsb(r);
3195 
3196 	error = validate_message(lkb, ms);
3197 	if (error)
3198 		goto out;
3199 
3200 	receive_flags(lkb, ms);
3201 	error = receive_unlock_args(ls, lkb, ms);
3202 	if (error)
3203 		goto out_reply;
3204 
3205 	error = do_unlock(r, lkb);
3206  out_reply:
3207 	send_unlock_reply(r, lkb, error);
3208  out:
3209 	unlock_rsb(r);
3210 	put_rsb(r);
3211 	dlm_put_lkb(lkb);
3212 	return;
3213 
3214  fail:
3215 	setup_stub_lkb(ls, ms);
3216 	send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3217 }
3218 
3219 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3220 {
3221 	struct dlm_lkb *lkb;
3222 	struct dlm_rsb *r;
3223 	int error;
3224 
3225 	error = find_lkb(ls, ms->m_remid, &lkb);
3226 	if (error)
3227 		goto fail;
3228 
3229 	receive_flags(lkb, ms);
3230 
3231 	r = lkb->lkb_resource;
3232 
3233 	hold_rsb(r);
3234 	lock_rsb(r);
3235 
3236 	error = validate_message(lkb, ms);
3237 	if (error)
3238 		goto out;
3239 
3240 	error = do_cancel(r, lkb);
3241 	send_cancel_reply(r, lkb, error);
3242  out:
3243 	unlock_rsb(r);
3244 	put_rsb(r);
3245 	dlm_put_lkb(lkb);
3246 	return;
3247 
3248  fail:
3249 	setup_stub_lkb(ls, ms);
3250 	send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3251 }
3252 
3253 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3254 {
3255 	struct dlm_lkb *lkb;
3256 	struct dlm_rsb *r;
3257 	int error;
3258 
3259 	error = find_lkb(ls, ms->m_remid, &lkb);
3260 	if (error) {
3261 		log_debug(ls, "receive_grant from %d no lkb %x",
3262 			  ms->m_header.h_nodeid, ms->m_remid);
3263 		return;
3264 	}
3265 
3266 	r = lkb->lkb_resource;
3267 
3268 	hold_rsb(r);
3269 	lock_rsb(r);
3270 
3271 	error = validate_message(lkb, ms);
3272 	if (error)
3273 		goto out;
3274 
3275 	receive_flags_reply(lkb, ms);
3276 	if (is_altmode(lkb))
3277 		munge_altmode(lkb, ms);
3278 	grant_lock_pc(r, lkb, ms);
3279 	queue_cast(r, lkb, 0);
3280  out:
3281 	unlock_rsb(r);
3282 	put_rsb(r);
3283 	dlm_put_lkb(lkb);
3284 }
3285 
3286 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3287 {
3288 	struct dlm_lkb *lkb;
3289 	struct dlm_rsb *r;
3290 	int error;
3291 
3292 	error = find_lkb(ls, ms->m_remid, &lkb);
3293 	if (error) {
3294 		log_debug(ls, "receive_bast from %d no lkb %x",
3295 			  ms->m_header.h_nodeid, ms->m_remid);
3296 		return;
3297 	}
3298 
3299 	r = lkb->lkb_resource;
3300 
3301 	hold_rsb(r);
3302 	lock_rsb(r);
3303 
3304 	error = validate_message(lkb, ms);
3305 	if (error)
3306 		goto out;
3307 
3308 	queue_bast(r, lkb, ms->m_bastmode);
3309  out:
3310 	unlock_rsb(r);
3311 	put_rsb(r);
3312 	dlm_put_lkb(lkb);
3313 }
3314 
3315 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3316 {
3317 	int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3318 
3319 	from_nodeid = ms->m_header.h_nodeid;
3320 	our_nodeid = dlm_our_nodeid();
3321 
3322 	len = receive_extralen(ms);
3323 
3324 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3325 	if (dir_nodeid != our_nodeid) {
3326 		log_error(ls, "lookup dir_nodeid %d from %d",
3327 			  dir_nodeid, from_nodeid);
3328 		error = -EINVAL;
3329 		ret_nodeid = -1;
3330 		goto out;
3331 	}
3332 
3333 	error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3334 
3335 	/* Optimization: we're master so treat lookup as a request */
3336 	if (!error && ret_nodeid == our_nodeid) {
3337 		receive_request(ls, ms);
3338 		return;
3339 	}
3340  out:
3341 	send_lookup_reply(ls, ms, ret_nodeid, error);
3342 }
3343 
3344 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3345 {
3346 	int len, dir_nodeid, from_nodeid;
3347 
3348 	from_nodeid = ms->m_header.h_nodeid;
3349 
3350 	len = receive_extralen(ms);
3351 
3352 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3353 	if (dir_nodeid != dlm_our_nodeid()) {
3354 		log_error(ls, "remove dir entry dir_nodeid %d from %d",
3355 			  dir_nodeid, from_nodeid);
3356 		return;
3357 	}
3358 
3359 	dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3360 }
3361 
3362 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3363 {
3364 	do_purge(ls, ms->m_nodeid, ms->m_pid);
3365 }
3366 
3367 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3368 {
3369 	struct dlm_lkb *lkb;
3370 	struct dlm_rsb *r;
3371 	int error, mstype, result;
3372 
3373 	error = find_lkb(ls, ms->m_remid, &lkb);
3374 	if (error) {
3375 		log_debug(ls, "receive_request_reply from %d no lkb %x",
3376 			  ms->m_header.h_nodeid, ms->m_remid);
3377 		return;
3378 	}
3379 
3380 	r = lkb->lkb_resource;
3381 	hold_rsb(r);
3382 	lock_rsb(r);
3383 
3384 	error = validate_message(lkb, ms);
3385 	if (error)
3386 		goto out;
3387 
3388 	mstype = lkb->lkb_wait_type;
3389 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3390 	if (error)
3391 		goto out;
3392 
3393 	/* Optimization: the dir node was also the master, so it took our
3394 	   lookup as a request and sent request reply instead of lookup reply */
3395 	if (mstype == DLM_MSG_LOOKUP) {
3396 		r->res_nodeid = ms->m_header.h_nodeid;
3397 		lkb->lkb_nodeid = r->res_nodeid;
3398 	}
3399 
3400 	/* this is the value returned from do_request() on the master */
3401 	result = ms->m_result;
3402 
3403 	switch (result) {
3404 	case -EAGAIN:
3405 		/* request would block (be queued) on remote master */
3406 		queue_cast(r, lkb, -EAGAIN);
3407 		confirm_master(r, -EAGAIN);
3408 		unhold_lkb(lkb); /* undoes create_lkb() */
3409 		break;
3410 
3411 	case -EINPROGRESS:
3412 	case 0:
3413 		/* request was queued or granted on remote master */
3414 		receive_flags_reply(lkb, ms);
3415 		lkb->lkb_remid = ms->m_lkid;
3416 		if (is_altmode(lkb))
3417 			munge_altmode(lkb, ms);
3418 		if (result) {
3419 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
3420 			add_timeout(lkb);
3421 		} else {
3422 			grant_lock_pc(r, lkb, ms);
3423 			queue_cast(r, lkb, 0);
3424 		}
3425 		confirm_master(r, result);
3426 		break;
3427 
3428 	case -EBADR:
3429 	case -ENOTBLK:
3430 		/* find_rsb failed to find rsb or rsb wasn't master */
3431 		log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3432 			  lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3433 		r->res_nodeid = -1;
3434 		lkb->lkb_nodeid = -1;
3435 
3436 		if (is_overlap(lkb)) {
3437 			/* we'll ignore error in cancel/unlock reply */
3438 			queue_cast_overlap(r, lkb);
3439 			confirm_master(r, result);
3440 			unhold_lkb(lkb); /* undoes create_lkb() */
3441 		} else
3442 			_request_lock(r, lkb);
3443 		break;
3444 
3445 	default:
3446 		log_error(ls, "receive_request_reply %x error %d",
3447 			  lkb->lkb_id, result);
3448 	}
3449 
3450 	if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3451 		log_debug(ls, "receive_request_reply %x result %d unlock",
3452 			  lkb->lkb_id, result);
3453 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3454 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3455 		send_unlock(r, lkb);
3456 	} else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3457 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3458 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3459 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3460 		send_cancel(r, lkb);
3461 	} else {
3462 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3463 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3464 	}
3465  out:
3466 	unlock_rsb(r);
3467 	put_rsb(r);
3468 	dlm_put_lkb(lkb);
3469 }
3470 
3471 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3472 				    struct dlm_message *ms)
3473 {
3474 	/* this is the value returned from do_convert() on the master */
3475 	switch (ms->m_result) {
3476 	case -EAGAIN:
3477 		/* convert would block (be queued) on remote master */
3478 		queue_cast(r, lkb, -EAGAIN);
3479 		break;
3480 
3481 	case -EDEADLK:
3482 		receive_flags_reply(lkb, ms);
3483 		revert_lock_pc(r, lkb);
3484 		queue_cast(r, lkb, -EDEADLK);
3485 		break;
3486 
3487 	case -EINPROGRESS:
3488 		/* convert was queued on remote master */
3489 		receive_flags_reply(lkb, ms);
3490 		if (is_demoted(lkb))
3491 			munge_demoted(lkb, ms);
3492 		del_lkb(r, lkb);
3493 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3494 		add_timeout(lkb);
3495 		break;
3496 
3497 	case 0:
3498 		/* convert was granted on remote master */
3499 		receive_flags_reply(lkb, ms);
3500 		if (is_demoted(lkb))
3501 			munge_demoted(lkb, ms);
3502 		grant_lock_pc(r, lkb, ms);
3503 		queue_cast(r, lkb, 0);
3504 		break;
3505 
3506 	default:
3507 		log_error(r->res_ls, "receive_convert_reply %x error %d",
3508 			  lkb->lkb_id, ms->m_result);
3509 	}
3510 }
3511 
3512 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3513 {
3514 	struct dlm_rsb *r = lkb->lkb_resource;
3515 	int error;
3516 
3517 	hold_rsb(r);
3518 	lock_rsb(r);
3519 
3520 	error = validate_message(lkb, ms);
3521 	if (error)
3522 		goto out;
3523 
3524 	/* stub reply can happen with waiters_mutex held */
3525 	error = remove_from_waiters_ms(lkb, ms);
3526 	if (error)
3527 		goto out;
3528 
3529 	__receive_convert_reply(r, lkb, ms);
3530  out:
3531 	unlock_rsb(r);
3532 	put_rsb(r);
3533 }
3534 
3535 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3536 {
3537 	struct dlm_lkb *lkb;
3538 	int error;
3539 
3540 	error = find_lkb(ls, ms->m_remid, &lkb);
3541 	if (error) {
3542 		log_debug(ls, "receive_convert_reply from %d no lkb %x",
3543 			  ms->m_header.h_nodeid, ms->m_remid);
3544 		return;
3545 	}
3546 
3547 	_receive_convert_reply(lkb, ms);
3548 	dlm_put_lkb(lkb);
3549 }
3550 
3551 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3552 {
3553 	struct dlm_rsb *r = lkb->lkb_resource;
3554 	int error;
3555 
3556 	hold_rsb(r);
3557 	lock_rsb(r);
3558 
3559 	error = validate_message(lkb, ms);
3560 	if (error)
3561 		goto out;
3562 
3563 	/* stub reply can happen with waiters_mutex held */
3564 	error = remove_from_waiters_ms(lkb, ms);
3565 	if (error)
3566 		goto out;
3567 
3568 	/* this is the value returned from do_unlock() on the master */
3569 
3570 	switch (ms->m_result) {
3571 	case -DLM_EUNLOCK:
3572 		receive_flags_reply(lkb, ms);
3573 		remove_lock_pc(r, lkb);
3574 		queue_cast(r, lkb, -DLM_EUNLOCK);
3575 		break;
3576 	case -ENOENT:
3577 		break;
3578 	default:
3579 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
3580 			  lkb->lkb_id, ms->m_result);
3581 	}
3582  out:
3583 	unlock_rsb(r);
3584 	put_rsb(r);
3585 }
3586 
3587 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3588 {
3589 	struct dlm_lkb *lkb;
3590 	int error;
3591 
3592 	error = find_lkb(ls, ms->m_remid, &lkb);
3593 	if (error) {
3594 		log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3595 			  ms->m_header.h_nodeid, ms->m_remid);
3596 		return;
3597 	}
3598 
3599 	_receive_unlock_reply(lkb, ms);
3600 	dlm_put_lkb(lkb);
3601 }
3602 
3603 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3604 {
3605 	struct dlm_rsb *r = lkb->lkb_resource;
3606 	int error;
3607 
3608 	hold_rsb(r);
3609 	lock_rsb(r);
3610 
3611 	error = validate_message(lkb, ms);
3612 	if (error)
3613 		goto out;
3614 
3615 	/* stub reply can happen with waiters_mutex held */
3616 	error = remove_from_waiters_ms(lkb, ms);
3617 	if (error)
3618 		goto out;
3619 
3620 	/* this is the value returned from do_cancel() on the master */
3621 
3622 	switch (ms->m_result) {
3623 	case -DLM_ECANCEL:
3624 		receive_flags_reply(lkb, ms);
3625 		revert_lock_pc(r, lkb);
3626 		queue_cast(r, lkb, -DLM_ECANCEL);
3627 		break;
3628 	case 0:
3629 		break;
3630 	default:
3631 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
3632 			  lkb->lkb_id, ms->m_result);
3633 	}
3634  out:
3635 	unlock_rsb(r);
3636 	put_rsb(r);
3637 }
3638 
3639 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3640 {
3641 	struct dlm_lkb *lkb;
3642 	int error;
3643 
3644 	error = find_lkb(ls, ms->m_remid, &lkb);
3645 	if (error) {
3646 		log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3647 			  ms->m_header.h_nodeid, ms->m_remid);
3648 		return;
3649 	}
3650 
3651 	_receive_cancel_reply(lkb, ms);
3652 	dlm_put_lkb(lkb);
3653 }
3654 
3655 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3656 {
3657 	struct dlm_lkb *lkb;
3658 	struct dlm_rsb *r;
3659 	int error, ret_nodeid;
3660 
3661 	error = find_lkb(ls, ms->m_lkid, &lkb);
3662 	if (error) {
3663 		log_error(ls, "receive_lookup_reply no lkb");
3664 		return;
3665 	}
3666 
3667 	/* ms->m_result is the value returned by dlm_dir_lookup on dir node
3668 	   FIXME: will a non-zero error ever be returned? */
3669 
3670 	r = lkb->lkb_resource;
3671 	hold_rsb(r);
3672 	lock_rsb(r);
3673 
3674 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3675 	if (error)
3676 		goto out;
3677 
3678 	ret_nodeid = ms->m_nodeid;
3679 	if (ret_nodeid == dlm_our_nodeid()) {
3680 		r->res_nodeid = 0;
3681 		ret_nodeid = 0;
3682 		r->res_first_lkid = 0;
3683 	} else {
3684 		/* set_master() will copy res_nodeid to lkb_nodeid */
3685 		r->res_nodeid = ret_nodeid;
3686 	}
3687 
3688 	if (is_overlap(lkb)) {
3689 		log_debug(ls, "receive_lookup_reply %x unlock %x",
3690 			  lkb->lkb_id, lkb->lkb_flags);
3691 		queue_cast_overlap(r, lkb);
3692 		unhold_lkb(lkb); /* undoes create_lkb() */
3693 		goto out_list;
3694 	}
3695 
3696 	_request_lock(r, lkb);
3697 
3698  out_list:
3699 	if (!ret_nodeid)
3700 		process_lookup_list(r);
3701  out:
3702 	unlock_rsb(r);
3703 	put_rsb(r);
3704 	dlm_put_lkb(lkb);
3705 }
3706 
3707 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3708 {
3709 	if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3710 		log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3711 			  ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3712 			  ms->m_remid, ms->m_result);
3713 		return;
3714 	}
3715 
3716 	switch (ms->m_type) {
3717 
3718 	/* messages sent to a master node */
3719 
3720 	case DLM_MSG_REQUEST:
3721 		receive_request(ls, ms);
3722 		break;
3723 
3724 	case DLM_MSG_CONVERT:
3725 		receive_convert(ls, ms);
3726 		break;
3727 
3728 	case DLM_MSG_UNLOCK:
3729 		receive_unlock(ls, ms);
3730 		break;
3731 
3732 	case DLM_MSG_CANCEL:
3733 		receive_cancel(ls, ms);
3734 		break;
3735 
3736 	/* messages sent from a master node (replies to above) */
3737 
3738 	case DLM_MSG_REQUEST_REPLY:
3739 		receive_request_reply(ls, ms);
3740 		break;
3741 
3742 	case DLM_MSG_CONVERT_REPLY:
3743 		receive_convert_reply(ls, ms);
3744 		break;
3745 
3746 	case DLM_MSG_UNLOCK_REPLY:
3747 		receive_unlock_reply(ls, ms);
3748 		break;
3749 
3750 	case DLM_MSG_CANCEL_REPLY:
3751 		receive_cancel_reply(ls, ms);
3752 		break;
3753 
3754 	/* messages sent from a master node (only two types of async msg) */
3755 
3756 	case DLM_MSG_GRANT:
3757 		receive_grant(ls, ms);
3758 		break;
3759 
3760 	case DLM_MSG_BAST:
3761 		receive_bast(ls, ms);
3762 		break;
3763 
3764 	/* messages sent to a dir node */
3765 
3766 	case DLM_MSG_LOOKUP:
3767 		receive_lookup(ls, ms);
3768 		break;
3769 
3770 	case DLM_MSG_REMOVE:
3771 		receive_remove(ls, ms);
3772 		break;
3773 
3774 	/* messages sent from a dir node (remove has no reply) */
3775 
3776 	case DLM_MSG_LOOKUP_REPLY:
3777 		receive_lookup_reply(ls, ms);
3778 		break;
3779 
3780 	/* other messages */
3781 
3782 	case DLM_MSG_PURGE:
3783 		receive_purge(ls, ms);
3784 		break;
3785 
3786 	default:
3787 		log_error(ls, "unknown message type %d", ms->m_type);
3788 	}
3789 
3790 	dlm_astd_wake();
3791 }
3792 
3793 /* If the lockspace is in recovery mode (locking stopped), then normal
3794    messages are saved on the requestqueue for processing after recovery is
3795    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3796    messages off the requestqueue before we process new ones. This occurs right
3797    after recovery completes when we transition from saving all messages on
3798    requestqueue, to processing all the saved messages, to processing new
3799    messages as they arrive. */
3800 
3801 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3802 				int nodeid)
3803 {
3804 	if (dlm_locking_stopped(ls)) {
3805 		dlm_add_requestqueue(ls, nodeid, (struct dlm_header *) ms);
3806 	} else {
3807 		dlm_wait_requestqueue(ls);
3808 		_receive_message(ls, ms);
3809 	}
3810 }
3811 
3812 /* This is called by dlm_recoverd to process messages that were saved on
3813    the requestqueue. */
3814 
3815 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3816 {
3817 	_receive_message(ls, ms);
3818 }
3819 
3820 /* This is called by the midcomms layer when something is received for
3821    the lockspace.  It could be either a MSG (normal message sent as part of
3822    standard locking activity) or an RCOM (recovery message sent as part of
3823    lockspace recovery). */
3824 
3825 void dlm_receive_buffer(struct dlm_header *hd, int nodeid)
3826 {
3827 	struct dlm_message *ms = (struct dlm_message *) hd;
3828 	struct dlm_rcom *rc = (struct dlm_rcom *) hd;
3829 	struct dlm_ls *ls;
3830 	int type = 0;
3831 
3832 	switch (hd->h_cmd) {
3833 	case DLM_MSG:
3834 		dlm_message_in(ms);
3835 		type = ms->m_type;
3836 		break;
3837 	case DLM_RCOM:
3838 		dlm_rcom_in(rc);
3839 		type = rc->rc_type;
3840 		break;
3841 	default:
3842 		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3843 		return;
3844 	}
3845 
3846 	if (hd->h_nodeid != nodeid) {
3847 		log_print("invalid h_nodeid %d from %d lockspace %x",
3848 			  hd->h_nodeid, nodeid, hd->h_lockspace);
3849 		return;
3850 	}
3851 
3852 	ls = dlm_find_lockspace_global(hd->h_lockspace);
3853 	if (!ls) {
3854 		if (dlm_config.ci_log_debug)
3855 			log_print("invalid lockspace %x from %d cmd %d type %d",
3856 				  hd->h_lockspace, nodeid, hd->h_cmd, type);
3857 
3858 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3859 			dlm_send_ls_not_ready(nodeid, rc);
3860 		return;
3861 	}
3862 
3863 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3864 	   be inactive (in this ls) before transitioning to recovery mode */
3865 
3866 	down_read(&ls->ls_recv_active);
3867 	if (hd->h_cmd == DLM_MSG)
3868 		dlm_receive_message(ls, ms, nodeid);
3869 	else
3870 		dlm_receive_rcom(ls, rc, nodeid);
3871 	up_read(&ls->ls_recv_active);
3872 
3873 	dlm_put_lockspace(ls);
3874 }
3875 
3876 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3877 {
3878 	if (middle_conversion(lkb)) {
3879 		hold_lkb(lkb);
3880 		ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3881 		ls->ls_stub_ms.m_result = -EINPROGRESS;
3882 		ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3883 		ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3884 		_receive_convert_reply(lkb, &ls->ls_stub_ms);
3885 
3886 		/* Same special case as in receive_rcom_lock_args() */
3887 		lkb->lkb_grmode = DLM_LOCK_IV;
3888 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3889 		unhold_lkb(lkb);
3890 
3891 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3892 		lkb->lkb_flags |= DLM_IFL_RESEND;
3893 	}
3894 
3895 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3896 	   conversions are async; there's no reply from the remote master */
3897 }
3898 
3899 /* A waiting lkb needs recovery if the master node has failed, or
3900    the master node is changing (only when no directory is used) */
3901 
3902 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3903 {
3904 	if (dlm_is_removed(ls, lkb->lkb_nodeid))
3905 		return 1;
3906 
3907 	if (!dlm_no_directory(ls))
3908 		return 0;
3909 
3910 	if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3911 		return 1;
3912 
3913 	return 0;
3914 }
3915 
3916 /* Recovery for locks that are waiting for replies from nodes that are now
3917    gone.  We can just complete unlocks and cancels by faking a reply from the
3918    dead node.  Requests and up-conversions we flag to be resent after
3919    recovery.  Down-conversions can just be completed with a fake reply like
3920    unlocks.  Conversions between PR and CW need special attention. */
3921 
3922 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3923 {
3924 	struct dlm_lkb *lkb, *safe;
3925 	int wait_type, stub_unlock_result, stub_cancel_result;
3926 
3927 	mutex_lock(&ls->ls_waiters_mutex);
3928 
3929 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3930 		log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3931 			  lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3932 
3933 		/* all outstanding lookups, regardless of destination  will be
3934 		   resent after recovery is done */
3935 
3936 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3937 			lkb->lkb_flags |= DLM_IFL_RESEND;
3938 			continue;
3939 		}
3940 
3941 		if (!waiter_needs_recovery(ls, lkb))
3942 			continue;
3943 
3944 		wait_type = lkb->lkb_wait_type;
3945 		stub_unlock_result = -DLM_EUNLOCK;
3946 		stub_cancel_result = -DLM_ECANCEL;
3947 
3948 		/* Main reply may have been received leaving a zero wait_type,
3949 		   but a reply for the overlapping op may not have been
3950 		   received.  In that case we need to fake the appropriate
3951 		   reply for the overlap op. */
3952 
3953 		if (!wait_type) {
3954 			if (is_overlap_cancel(lkb)) {
3955 				wait_type = DLM_MSG_CANCEL;
3956 				if (lkb->lkb_grmode == DLM_LOCK_IV)
3957 					stub_cancel_result = 0;
3958 			}
3959 			if (is_overlap_unlock(lkb)) {
3960 				wait_type = DLM_MSG_UNLOCK;
3961 				if (lkb->lkb_grmode == DLM_LOCK_IV)
3962 					stub_unlock_result = -ENOENT;
3963 			}
3964 
3965 			log_debug(ls, "rwpre overlap %x %x %d %d %d",
3966 				  lkb->lkb_id, lkb->lkb_flags, wait_type,
3967 				  stub_cancel_result, stub_unlock_result);
3968 		}
3969 
3970 		switch (wait_type) {
3971 
3972 		case DLM_MSG_REQUEST:
3973 			lkb->lkb_flags |= DLM_IFL_RESEND;
3974 			break;
3975 
3976 		case DLM_MSG_CONVERT:
3977 			recover_convert_waiter(ls, lkb);
3978 			break;
3979 
3980 		case DLM_MSG_UNLOCK:
3981 			hold_lkb(lkb);
3982 			ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
3983 			ls->ls_stub_ms.m_result = stub_unlock_result;
3984 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3985 			ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3986 			_receive_unlock_reply(lkb, &ls->ls_stub_ms);
3987 			dlm_put_lkb(lkb);
3988 			break;
3989 
3990 		case DLM_MSG_CANCEL:
3991 			hold_lkb(lkb);
3992 			ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
3993 			ls->ls_stub_ms.m_result = stub_cancel_result;
3994 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3995 			ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3996 			_receive_cancel_reply(lkb, &ls->ls_stub_ms);
3997 			dlm_put_lkb(lkb);
3998 			break;
3999 
4000 		default:
4001 			log_error(ls, "invalid lkb wait_type %d %d",
4002 				  lkb->lkb_wait_type, wait_type);
4003 		}
4004 		schedule();
4005 	}
4006 	mutex_unlock(&ls->ls_waiters_mutex);
4007 }
4008 
4009 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4010 {
4011 	struct dlm_lkb *lkb;
4012 	int found = 0;
4013 
4014 	mutex_lock(&ls->ls_waiters_mutex);
4015 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4016 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
4017 			hold_lkb(lkb);
4018 			found = 1;
4019 			break;
4020 		}
4021 	}
4022 	mutex_unlock(&ls->ls_waiters_mutex);
4023 
4024 	if (!found)
4025 		lkb = NULL;
4026 	return lkb;
4027 }
4028 
4029 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4030    master or dir-node for r.  Processing the lkb may result in it being placed
4031    back on waiters. */
4032 
4033 /* We do this after normal locking has been enabled and any saved messages
4034    (in requestqueue) have been processed.  We should be confident that at
4035    this point we won't get or process a reply to any of these waiting
4036    operations.  But, new ops may be coming in on the rsbs/locks here from
4037    userspace or remotely. */
4038 
4039 /* there may have been an overlap unlock/cancel prior to recovery or after
4040    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4041    overlap flag would just have been set and nothing new sent.  we can be
4042    confident here than any replies to either the initial op or overlap ops
4043    prior to recovery have been received. */
4044 
4045 int dlm_recover_waiters_post(struct dlm_ls *ls)
4046 {
4047 	struct dlm_lkb *lkb;
4048 	struct dlm_rsb *r;
4049 	int error = 0, mstype, err, oc, ou;
4050 
4051 	while (1) {
4052 		if (dlm_locking_stopped(ls)) {
4053 			log_debug(ls, "recover_waiters_post aborted");
4054 			error = -EINTR;
4055 			break;
4056 		}
4057 
4058 		lkb = find_resend_waiter(ls);
4059 		if (!lkb)
4060 			break;
4061 
4062 		r = lkb->lkb_resource;
4063 		hold_rsb(r);
4064 		lock_rsb(r);
4065 
4066 		mstype = lkb->lkb_wait_type;
4067 		oc = is_overlap_cancel(lkb);
4068 		ou = is_overlap_unlock(lkb);
4069 		err = 0;
4070 
4071 		log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4072 			  lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4073 
4074 		/* At this point we assume that we won't get a reply to any
4075 		   previous op or overlap op on this lock.  First, do a big
4076 		   remove_from_waiters() for all previous ops. */
4077 
4078 		lkb->lkb_flags &= ~DLM_IFL_RESEND;
4079 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4080 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4081 		lkb->lkb_wait_type = 0;
4082 		lkb->lkb_wait_count = 0;
4083 		mutex_lock(&ls->ls_waiters_mutex);
4084 		list_del_init(&lkb->lkb_wait_reply);
4085 		mutex_unlock(&ls->ls_waiters_mutex);
4086 		unhold_lkb(lkb); /* for waiters list */
4087 
4088 		if (oc || ou) {
4089 			/* do an unlock or cancel instead of resending */
4090 			switch (mstype) {
4091 			case DLM_MSG_LOOKUP:
4092 			case DLM_MSG_REQUEST:
4093 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4094 							-DLM_ECANCEL);
4095 				unhold_lkb(lkb); /* undoes create_lkb() */
4096 				break;
4097 			case DLM_MSG_CONVERT:
4098 				if (oc) {
4099 					queue_cast(r, lkb, -DLM_ECANCEL);
4100 				} else {
4101 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4102 					_unlock_lock(r, lkb);
4103 				}
4104 				break;
4105 			default:
4106 				err = 1;
4107 			}
4108 		} else {
4109 			switch (mstype) {
4110 			case DLM_MSG_LOOKUP:
4111 			case DLM_MSG_REQUEST:
4112 				_request_lock(r, lkb);
4113 				if (is_master(r))
4114 					confirm_master(r, 0);
4115 				break;
4116 			case DLM_MSG_CONVERT:
4117 				_convert_lock(r, lkb);
4118 				break;
4119 			default:
4120 				err = 1;
4121 			}
4122 		}
4123 
4124 		if (err)
4125 			log_error(ls, "recover_waiters_post %x %d %x %d %d",
4126 			  	  lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4127 		unlock_rsb(r);
4128 		put_rsb(r);
4129 		dlm_put_lkb(lkb);
4130 	}
4131 
4132 	return error;
4133 }
4134 
4135 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4136 			int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4137 {
4138 	struct dlm_ls *ls = r->res_ls;
4139 	struct dlm_lkb *lkb, *safe;
4140 
4141 	list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4142 		if (test(ls, lkb)) {
4143 			rsb_set_flag(r, RSB_LOCKS_PURGED);
4144 			del_lkb(r, lkb);
4145 			/* this put should free the lkb */
4146 			if (!dlm_put_lkb(lkb))
4147 				log_error(ls, "purged lkb not released");
4148 		}
4149 	}
4150 }
4151 
4152 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4153 {
4154 	return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4155 }
4156 
4157 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4158 {
4159 	return is_master_copy(lkb);
4160 }
4161 
4162 static void purge_dead_locks(struct dlm_rsb *r)
4163 {
4164 	purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4165 	purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4166 	purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4167 }
4168 
4169 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4170 {
4171 	purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4172 	purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4173 	purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4174 }
4175 
4176 /* Get rid of locks held by nodes that are gone. */
4177 
4178 int dlm_purge_locks(struct dlm_ls *ls)
4179 {
4180 	struct dlm_rsb *r;
4181 
4182 	log_debug(ls, "dlm_purge_locks");
4183 
4184 	down_write(&ls->ls_root_sem);
4185 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4186 		hold_rsb(r);
4187 		lock_rsb(r);
4188 		if (is_master(r))
4189 			purge_dead_locks(r);
4190 		unlock_rsb(r);
4191 		unhold_rsb(r);
4192 
4193 		schedule();
4194 	}
4195 	up_write(&ls->ls_root_sem);
4196 
4197 	return 0;
4198 }
4199 
4200 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4201 {
4202 	struct dlm_rsb *r, *r_ret = NULL;
4203 
4204 	read_lock(&ls->ls_rsbtbl[bucket].lock);
4205 	list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4206 		if (!rsb_flag(r, RSB_LOCKS_PURGED))
4207 			continue;
4208 		hold_rsb(r);
4209 		rsb_clear_flag(r, RSB_LOCKS_PURGED);
4210 		r_ret = r;
4211 		break;
4212 	}
4213 	read_unlock(&ls->ls_rsbtbl[bucket].lock);
4214 	return r_ret;
4215 }
4216 
4217 void dlm_grant_after_purge(struct dlm_ls *ls)
4218 {
4219 	struct dlm_rsb *r;
4220 	int bucket = 0;
4221 
4222 	while (1) {
4223 		r = find_purged_rsb(ls, bucket);
4224 		if (!r) {
4225 			if (bucket == ls->ls_rsbtbl_size - 1)
4226 				break;
4227 			bucket++;
4228 			continue;
4229 		}
4230 		lock_rsb(r);
4231 		if (is_master(r)) {
4232 			grant_pending_locks(r);
4233 			confirm_master(r, 0);
4234 		}
4235 		unlock_rsb(r);
4236 		put_rsb(r);
4237 		schedule();
4238 	}
4239 }
4240 
4241 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4242 					 uint32_t remid)
4243 {
4244 	struct dlm_lkb *lkb;
4245 
4246 	list_for_each_entry(lkb, head, lkb_statequeue) {
4247 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4248 			return lkb;
4249 	}
4250 	return NULL;
4251 }
4252 
4253 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4254 				    uint32_t remid)
4255 {
4256 	struct dlm_lkb *lkb;
4257 
4258 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4259 	if (lkb)
4260 		return lkb;
4261 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4262 	if (lkb)
4263 		return lkb;
4264 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4265 	if (lkb)
4266 		return lkb;
4267 	return NULL;
4268 }
4269 
4270 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4271 				  struct dlm_rsb *r, struct dlm_rcom *rc)
4272 {
4273 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4274 	int lvblen;
4275 
4276 	lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4277 	lkb->lkb_ownpid = rl->rl_ownpid;
4278 	lkb->lkb_remid = rl->rl_lkid;
4279 	lkb->lkb_exflags = rl->rl_exflags;
4280 	lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
4281 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
4282 	lkb->lkb_lvbseq = rl->rl_lvbseq;
4283 	lkb->lkb_rqmode = rl->rl_rqmode;
4284 	lkb->lkb_grmode = rl->rl_grmode;
4285 	/* don't set lkb_status because add_lkb wants to itself */
4286 
4287 	lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
4288 	lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
4289 
4290 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4291 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4292 		if (!lkb->lkb_lvbptr)
4293 			return -ENOMEM;
4294 		lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4295 			 sizeof(struct rcom_lock);
4296 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4297 	}
4298 
4299 	/* Conversions between PR and CW (middle modes) need special handling.
4300 	   The real granted mode of these converting locks cannot be determined
4301 	   until all locks have been rebuilt on the rsb (recover_conversion) */
4302 
4303 	if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
4304 		rl->rl_status = DLM_LKSTS_CONVERT;
4305 		lkb->lkb_grmode = DLM_LOCK_IV;
4306 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
4307 	}
4308 
4309 	return 0;
4310 }
4311 
4312 /* This lkb may have been recovered in a previous aborted recovery so we need
4313    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4314    If so we just send back a standard reply.  If not, we create a new lkb with
4315    the given values and send back our lkid.  We send back our lkid by sending
4316    back the rcom_lock struct we got but with the remid field filled in. */
4317 
4318 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4319 {
4320 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4321 	struct dlm_rsb *r;
4322 	struct dlm_lkb *lkb;
4323 	int error;
4324 
4325 	if (rl->rl_parent_lkid) {
4326 		error = -EOPNOTSUPP;
4327 		goto out;
4328 	}
4329 
4330 	error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
4331 	if (error)
4332 		goto out;
4333 
4334 	lock_rsb(r);
4335 
4336 	lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
4337 	if (lkb) {
4338 		error = -EEXIST;
4339 		goto out_remid;
4340 	}
4341 
4342 	error = create_lkb(ls, &lkb);
4343 	if (error)
4344 		goto out_unlock;
4345 
4346 	error = receive_rcom_lock_args(ls, lkb, r, rc);
4347 	if (error) {
4348 		__put_lkb(ls, lkb);
4349 		goto out_unlock;
4350 	}
4351 
4352 	attach_lkb(r, lkb);
4353 	add_lkb(r, lkb, rl->rl_status);
4354 	error = 0;
4355 
4356  out_remid:
4357 	/* this is the new value returned to the lock holder for
4358 	   saving in its process-copy lkb */
4359 	rl->rl_remid = lkb->lkb_id;
4360 
4361  out_unlock:
4362 	unlock_rsb(r);
4363 	put_rsb(r);
4364  out:
4365 	if (error)
4366 		log_debug(ls, "recover_master_copy %d %x", error, rl->rl_lkid);
4367 	rl->rl_result = error;
4368 	return error;
4369 }
4370 
4371 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4372 {
4373 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4374 	struct dlm_rsb *r;
4375 	struct dlm_lkb *lkb;
4376 	int error;
4377 
4378 	error = find_lkb(ls, rl->rl_lkid, &lkb);
4379 	if (error) {
4380 		log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
4381 		return error;
4382 	}
4383 
4384 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4385 
4386 	error = rl->rl_result;
4387 
4388 	r = lkb->lkb_resource;
4389 	hold_rsb(r);
4390 	lock_rsb(r);
4391 
4392 	switch (error) {
4393 	case -EBADR:
4394 		/* There's a chance the new master received our lock before
4395 		   dlm_recover_master_reply(), this wouldn't happen if we did
4396 		   a barrier between recover_masters and recover_locks. */
4397 		log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4398 			  (unsigned long)r, r->res_name);
4399 		dlm_send_rcom_lock(r, lkb);
4400 		goto out;
4401 	case -EEXIST:
4402 		log_debug(ls, "master copy exists %x", lkb->lkb_id);
4403 		/* fall through */
4404 	case 0:
4405 		lkb->lkb_remid = rl->rl_remid;
4406 		break;
4407 	default:
4408 		log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4409 			  error, lkb->lkb_id);
4410 	}
4411 
4412 	/* an ack for dlm_recover_locks() which waits for replies from
4413 	   all the locks it sends to new masters */
4414 	dlm_recovered_lock(r);
4415  out:
4416 	unlock_rsb(r);
4417 	put_rsb(r);
4418 	dlm_put_lkb(lkb);
4419 
4420 	return 0;
4421 }
4422 
4423 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4424 		     int mode, uint32_t flags, void *name, unsigned int namelen,
4425 		     unsigned long timeout_cs)
4426 {
4427 	struct dlm_lkb *lkb;
4428 	struct dlm_args args;
4429 	int error;
4430 
4431 	dlm_lock_recovery(ls);
4432 
4433 	error = create_lkb(ls, &lkb);
4434 	if (error) {
4435 		kfree(ua);
4436 		goto out;
4437 	}
4438 
4439 	if (flags & DLM_LKF_VALBLK) {
4440 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4441 		if (!ua->lksb.sb_lvbptr) {
4442 			kfree(ua);
4443 			__put_lkb(ls, lkb);
4444 			error = -ENOMEM;
4445 			goto out;
4446 		}
4447 	}
4448 
4449 	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
4450 	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
4451 	   lock and that lkb_astparam is the dlm_user_args structure. */
4452 
4453 	error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4454 			      DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
4455 	lkb->lkb_flags |= DLM_IFL_USER;
4456 	ua->old_mode = DLM_LOCK_IV;
4457 
4458 	if (error) {
4459 		__put_lkb(ls, lkb);
4460 		goto out;
4461 	}
4462 
4463 	error = request_lock(ls, lkb, name, namelen, &args);
4464 
4465 	switch (error) {
4466 	case 0:
4467 		break;
4468 	case -EINPROGRESS:
4469 		error = 0;
4470 		break;
4471 	case -EAGAIN:
4472 		error = 0;
4473 		/* fall through */
4474 	default:
4475 		__put_lkb(ls, lkb);
4476 		goto out;
4477 	}
4478 
4479 	/* add this new lkb to the per-process list of locks */
4480 	spin_lock(&ua->proc->locks_spin);
4481 	hold_lkb(lkb);
4482 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4483 	spin_unlock(&ua->proc->locks_spin);
4484  out:
4485 	dlm_unlock_recovery(ls);
4486 	return error;
4487 }
4488 
4489 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4490 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4491 		     unsigned long timeout_cs)
4492 {
4493 	struct dlm_lkb *lkb;
4494 	struct dlm_args args;
4495 	struct dlm_user_args *ua;
4496 	int error;
4497 
4498 	dlm_lock_recovery(ls);
4499 
4500 	error = find_lkb(ls, lkid, &lkb);
4501 	if (error)
4502 		goto out;
4503 
4504 	/* user can change the params on its lock when it converts it, or
4505 	   add an lvb that didn't exist before */
4506 
4507 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
4508 
4509 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4510 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4511 		if (!ua->lksb.sb_lvbptr) {
4512 			error = -ENOMEM;
4513 			goto out_put;
4514 		}
4515 	}
4516 	if (lvb_in && ua->lksb.sb_lvbptr)
4517 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4518 
4519 	ua->xid = ua_tmp->xid;
4520 	ua->castparam = ua_tmp->castparam;
4521 	ua->castaddr = ua_tmp->castaddr;
4522 	ua->bastparam = ua_tmp->bastparam;
4523 	ua->bastaddr = ua_tmp->bastaddr;
4524 	ua->user_lksb = ua_tmp->user_lksb;
4525 	ua->old_mode = lkb->lkb_grmode;
4526 
4527 	error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4528 			      DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
4529 	if (error)
4530 		goto out_put;
4531 
4532 	error = convert_lock(ls, lkb, &args);
4533 
4534 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4535 		error = 0;
4536  out_put:
4537 	dlm_put_lkb(lkb);
4538  out:
4539 	dlm_unlock_recovery(ls);
4540 	kfree(ua_tmp);
4541 	return error;
4542 }
4543 
4544 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4545 		    uint32_t flags, uint32_t lkid, char *lvb_in)
4546 {
4547 	struct dlm_lkb *lkb;
4548 	struct dlm_args args;
4549 	struct dlm_user_args *ua;
4550 	int error;
4551 
4552 	dlm_lock_recovery(ls);
4553 
4554 	error = find_lkb(ls, lkid, &lkb);
4555 	if (error)
4556 		goto out;
4557 
4558 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
4559 
4560 	if (lvb_in && ua->lksb.sb_lvbptr)
4561 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4562 	if (ua_tmp->castparam)
4563 		ua->castparam = ua_tmp->castparam;
4564 	ua->user_lksb = ua_tmp->user_lksb;
4565 
4566 	error = set_unlock_args(flags, ua, &args);
4567 	if (error)
4568 		goto out_put;
4569 
4570 	error = unlock_lock(ls, lkb, &args);
4571 
4572 	if (error == -DLM_EUNLOCK)
4573 		error = 0;
4574 	/* from validate_unlock_args() */
4575 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4576 		error = 0;
4577 	if (error)
4578 		goto out_put;
4579 
4580 	spin_lock(&ua->proc->locks_spin);
4581 	/* dlm_user_add_ast() may have already taken lkb off the proc list */
4582 	if (!list_empty(&lkb->lkb_ownqueue))
4583 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4584 	spin_unlock(&ua->proc->locks_spin);
4585  out_put:
4586 	dlm_put_lkb(lkb);
4587  out:
4588 	dlm_unlock_recovery(ls);
4589 	kfree(ua_tmp);
4590 	return error;
4591 }
4592 
4593 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4594 		    uint32_t flags, uint32_t lkid)
4595 {
4596 	struct dlm_lkb *lkb;
4597 	struct dlm_args args;
4598 	struct dlm_user_args *ua;
4599 	int error;
4600 
4601 	dlm_lock_recovery(ls);
4602 
4603 	error = find_lkb(ls, lkid, &lkb);
4604 	if (error)
4605 		goto out;
4606 
4607 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
4608 	if (ua_tmp->castparam)
4609 		ua->castparam = ua_tmp->castparam;
4610 	ua->user_lksb = ua_tmp->user_lksb;
4611 
4612 	error = set_unlock_args(flags, ua, &args);
4613 	if (error)
4614 		goto out_put;
4615 
4616 	error = cancel_lock(ls, lkb, &args);
4617 
4618 	if (error == -DLM_ECANCEL)
4619 		error = 0;
4620 	/* from validate_unlock_args() */
4621 	if (error == -EBUSY)
4622 		error = 0;
4623  out_put:
4624 	dlm_put_lkb(lkb);
4625  out:
4626 	dlm_unlock_recovery(ls);
4627 	kfree(ua_tmp);
4628 	return error;
4629 }
4630 
4631 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4632 {
4633 	struct dlm_lkb *lkb;
4634 	struct dlm_args args;
4635 	struct dlm_user_args *ua;
4636 	struct dlm_rsb *r;
4637 	int error;
4638 
4639 	dlm_lock_recovery(ls);
4640 
4641 	error = find_lkb(ls, lkid, &lkb);
4642 	if (error)
4643 		goto out;
4644 
4645 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
4646 
4647 	error = set_unlock_args(flags, ua, &args);
4648 	if (error)
4649 		goto out_put;
4650 
4651 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4652 
4653 	r = lkb->lkb_resource;
4654 	hold_rsb(r);
4655 	lock_rsb(r);
4656 
4657 	error = validate_unlock_args(lkb, &args);
4658 	if (error)
4659 		goto out_r;
4660 	lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4661 
4662 	error = _cancel_lock(r, lkb);
4663  out_r:
4664 	unlock_rsb(r);
4665 	put_rsb(r);
4666 
4667 	if (error == -DLM_ECANCEL)
4668 		error = 0;
4669 	/* from validate_unlock_args() */
4670 	if (error == -EBUSY)
4671 		error = 0;
4672  out_put:
4673 	dlm_put_lkb(lkb);
4674  out:
4675 	dlm_unlock_recovery(ls);
4676 	return error;
4677 }
4678 
4679 /* lkb's that are removed from the waiters list by revert are just left on the
4680    orphans list with the granted orphan locks, to be freed by purge */
4681 
4682 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4683 {
4684 	struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4685 	struct dlm_args args;
4686 	int error;
4687 
4688 	hold_lkb(lkb);
4689 	mutex_lock(&ls->ls_orphans_mutex);
4690 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4691 	mutex_unlock(&ls->ls_orphans_mutex);
4692 
4693 	set_unlock_args(0, ua, &args);
4694 
4695 	error = cancel_lock(ls, lkb, &args);
4696 	if (error == -DLM_ECANCEL)
4697 		error = 0;
4698 	return error;
4699 }
4700 
4701 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4702    Regardless of what rsb queue the lock is on, it's removed and freed. */
4703 
4704 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4705 {
4706 	struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4707 	struct dlm_args args;
4708 	int error;
4709 
4710 	set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4711 
4712 	error = unlock_lock(ls, lkb, &args);
4713 	if (error == -DLM_EUNLOCK)
4714 		error = 0;
4715 	return error;
4716 }
4717 
4718 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4719    (which does lock_rsb) due to deadlock with receiving a message that does
4720    lock_rsb followed by dlm_user_add_ast() */
4721 
4722 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4723 				     struct dlm_user_proc *proc)
4724 {
4725 	struct dlm_lkb *lkb = NULL;
4726 
4727 	mutex_lock(&ls->ls_clear_proc_locks);
4728 	if (list_empty(&proc->locks))
4729 		goto out;
4730 
4731 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4732 	list_del_init(&lkb->lkb_ownqueue);
4733 
4734 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4735 		lkb->lkb_flags |= DLM_IFL_ORPHAN;
4736 	else
4737 		lkb->lkb_flags |= DLM_IFL_DEAD;
4738  out:
4739 	mutex_unlock(&ls->ls_clear_proc_locks);
4740 	return lkb;
4741 }
4742 
4743 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4744    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4745    which we clear here. */
4746 
4747 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4748    list, and no more device_writes should add lkb's to proc->locks list; so we
4749    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4750    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4751    them ourself. */
4752 
4753 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4754 {
4755 	struct dlm_lkb *lkb, *safe;
4756 
4757 	dlm_lock_recovery(ls);
4758 
4759 	while (1) {
4760 		lkb = del_proc_lock(ls, proc);
4761 		if (!lkb)
4762 			break;
4763 		del_timeout(lkb);
4764 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4765 			orphan_proc_lock(ls, lkb);
4766 		else
4767 			unlock_proc_lock(ls, lkb);
4768 
4769 		/* this removes the reference for the proc->locks list
4770 		   added by dlm_user_request, it may result in the lkb
4771 		   being freed */
4772 
4773 		dlm_put_lkb(lkb);
4774 	}
4775 
4776 	mutex_lock(&ls->ls_clear_proc_locks);
4777 
4778 	/* in-progress unlocks */
4779 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4780 		list_del_init(&lkb->lkb_ownqueue);
4781 		lkb->lkb_flags |= DLM_IFL_DEAD;
4782 		dlm_put_lkb(lkb);
4783 	}
4784 
4785 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4786 		lkb->lkb_ast_type = 0;
4787 		list_del(&lkb->lkb_astqueue);
4788 		dlm_put_lkb(lkb);
4789 	}
4790 
4791 	mutex_unlock(&ls->ls_clear_proc_locks);
4792 	dlm_unlock_recovery(ls);
4793 }
4794 
4795 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4796 {
4797 	struct dlm_lkb *lkb, *safe;
4798 
4799 	while (1) {
4800 		lkb = NULL;
4801 		spin_lock(&proc->locks_spin);
4802 		if (!list_empty(&proc->locks)) {
4803 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
4804 					 lkb_ownqueue);
4805 			list_del_init(&lkb->lkb_ownqueue);
4806 		}
4807 		spin_unlock(&proc->locks_spin);
4808 
4809 		if (!lkb)
4810 			break;
4811 
4812 		lkb->lkb_flags |= DLM_IFL_DEAD;
4813 		unlock_proc_lock(ls, lkb);
4814 		dlm_put_lkb(lkb); /* ref from proc->locks list */
4815 	}
4816 
4817 	spin_lock(&proc->locks_spin);
4818 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4819 		list_del_init(&lkb->lkb_ownqueue);
4820 		lkb->lkb_flags |= DLM_IFL_DEAD;
4821 		dlm_put_lkb(lkb);
4822 	}
4823 	spin_unlock(&proc->locks_spin);
4824 
4825 	spin_lock(&proc->asts_spin);
4826 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4827 		list_del(&lkb->lkb_astqueue);
4828 		dlm_put_lkb(lkb);
4829 	}
4830 	spin_unlock(&proc->asts_spin);
4831 }
4832 
4833 /* pid of 0 means purge all orphans */
4834 
4835 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4836 {
4837 	struct dlm_lkb *lkb, *safe;
4838 
4839 	mutex_lock(&ls->ls_orphans_mutex);
4840 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4841 		if (pid && lkb->lkb_ownpid != pid)
4842 			continue;
4843 		unlock_proc_lock(ls, lkb);
4844 		list_del_init(&lkb->lkb_ownqueue);
4845 		dlm_put_lkb(lkb);
4846 	}
4847 	mutex_unlock(&ls->ls_orphans_mutex);
4848 }
4849 
4850 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4851 {
4852 	struct dlm_message *ms;
4853 	struct dlm_mhandle *mh;
4854 	int error;
4855 
4856 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4857 				DLM_MSG_PURGE, &ms, &mh);
4858 	if (error)
4859 		return error;
4860 	ms->m_nodeid = nodeid;
4861 	ms->m_pid = pid;
4862 
4863 	return send_message(mh, ms);
4864 }
4865 
4866 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4867 		   int nodeid, int pid)
4868 {
4869 	int error = 0;
4870 
4871 	if (nodeid != dlm_our_nodeid()) {
4872 		error = send_purge(ls, nodeid, pid);
4873 	} else {
4874 		dlm_lock_recovery(ls);
4875 		if (pid == current->pid)
4876 			purge_proc_locks(ls, proc);
4877 		else
4878 			do_purge(ls, nodeid, pid);
4879 		dlm_unlock_recovery(ls);
4880 	}
4881 	return error;
4882 }
4883 
4884