xref: /openbmc/linux/fs/dlm/lock.c (revision 64c70b1c)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 /* Central locking logic has four stages:
14 
15    dlm_lock()
16    dlm_unlock()
17 
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22 
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27 
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32 
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35 
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40 
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43 
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46 
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49 
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53 
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75 
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87 				    struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91 void dlm_timeout_warn(struct dlm_lkb *lkb);
92 
93 /*
94  * Lock compatibilty matrix - thanks Steve
95  * UN = Unlocked state. Not really a state, used as a flag
96  * PD = Padding. Used to make the matrix a nice power of two in size
97  * Other states are the same as the VMS DLM.
98  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
99  */
100 
101 static const int __dlm_compat_matrix[8][8] = {
102       /* UN NL CR CW PR PW EX PD */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
105         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
106         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
107         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
108         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
109         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
110         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
111 };
112 
113 /*
114  * This defines the direction of transfer of LVB data.
115  * Granted mode is the row; requested mode is the column.
116  * Usage: matrix[grmode+1][rqmode+1]
117  * 1 = LVB is returned to the caller
118  * 0 = LVB is written to the resource
119  * -1 = nothing happens to the LVB
120  */
121 
122 const int dlm_lvb_operations[8][8] = {
123         /* UN   NL  CR  CW  PR  PW  EX  PD*/
124         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
125         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
126         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
127         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
128         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
129         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
132 };
133 
134 #define modes_compat(gr, rq) \
135 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
136 
137 int dlm_modes_compat(int mode1, int mode2)
138 {
139 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140 }
141 
142 /*
143  * Compatibility matrix for conversions with QUECVT set.
144  * Granted mode is the row; requested mode is the column.
145  * Usage: matrix[grmode+1][rqmode+1]
146  */
147 
148 static const int __quecvt_compat_matrix[8][8] = {
149       /* UN NL CR CW PR PW EX PD */
150         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
151         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
152         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
153         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
154         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
155         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
156         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
157         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
158 };
159 
160 void dlm_print_lkb(struct dlm_lkb *lkb)
161 {
162 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
163 	       "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
164 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165 	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
166 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
167 }
168 
169 void dlm_print_rsb(struct dlm_rsb *r)
170 {
171 	printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
172 	       r->res_nodeid, r->res_flags, r->res_first_lkid,
173 	       r->res_recover_locks_count, r->res_name);
174 }
175 
176 void dlm_dump_rsb(struct dlm_rsb *r)
177 {
178 	struct dlm_lkb *lkb;
179 
180 	dlm_print_rsb(r);
181 
182 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
183 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
184 	printk(KERN_ERR "rsb lookup list\n");
185 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
186 		dlm_print_lkb(lkb);
187 	printk(KERN_ERR "rsb grant queue:\n");
188 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
189 		dlm_print_lkb(lkb);
190 	printk(KERN_ERR "rsb convert queue:\n");
191 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
192 		dlm_print_lkb(lkb);
193 	printk(KERN_ERR "rsb wait queue:\n");
194 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
195 		dlm_print_lkb(lkb);
196 }
197 
198 /* Threads cannot use the lockspace while it's being recovered */
199 
200 static inline void dlm_lock_recovery(struct dlm_ls *ls)
201 {
202 	down_read(&ls->ls_in_recovery);
203 }
204 
205 void dlm_unlock_recovery(struct dlm_ls *ls)
206 {
207 	up_read(&ls->ls_in_recovery);
208 }
209 
210 int dlm_lock_recovery_try(struct dlm_ls *ls)
211 {
212 	return down_read_trylock(&ls->ls_in_recovery);
213 }
214 
215 static inline int can_be_queued(struct dlm_lkb *lkb)
216 {
217 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
218 }
219 
220 static inline int force_blocking_asts(struct dlm_lkb *lkb)
221 {
222 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
223 }
224 
225 static inline int is_demoted(struct dlm_lkb *lkb)
226 {
227 	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
228 }
229 
230 static inline int is_altmode(struct dlm_lkb *lkb)
231 {
232 	return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
233 }
234 
235 static inline int is_granted(struct dlm_lkb *lkb)
236 {
237 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
238 }
239 
240 static inline int is_remote(struct dlm_rsb *r)
241 {
242 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
243 	return !!r->res_nodeid;
244 }
245 
246 static inline int is_process_copy(struct dlm_lkb *lkb)
247 {
248 	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
249 }
250 
251 static inline int is_master_copy(struct dlm_lkb *lkb)
252 {
253 	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
254 		DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
255 	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
256 }
257 
258 static inline int middle_conversion(struct dlm_lkb *lkb)
259 {
260 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
262 		return 1;
263 	return 0;
264 }
265 
266 static inline int down_conversion(struct dlm_lkb *lkb)
267 {
268 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
269 }
270 
271 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
272 {
273 	return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
274 }
275 
276 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
277 {
278 	return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
279 }
280 
281 static inline int is_overlap(struct dlm_lkb *lkb)
282 {
283 	return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284 				  DLM_IFL_OVERLAP_CANCEL));
285 }
286 
287 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
288 {
289 	if (is_master_copy(lkb))
290 		return;
291 
292 	del_timeout(lkb);
293 
294 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
295 
296 	/* if the operation was a cancel, then return -DLM_ECANCEL, if a
297 	   timeout caused the cancel then return -ETIMEDOUT */
298 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299 		lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
300 		rv = -ETIMEDOUT;
301 	}
302 
303 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
304 		lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
305 		rv = -EDEADLK;
306 	}
307 
308 	lkb->lkb_lksb->sb_status = rv;
309 	lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
310 
311 	dlm_add_ast(lkb, AST_COMP);
312 }
313 
314 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
315 {
316 	queue_cast(r, lkb,
317 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
318 }
319 
320 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
321 {
322 	if (is_master_copy(lkb))
323 		send_bast(r, lkb, rqmode);
324 	else {
325 		lkb->lkb_bastmode = rqmode;
326 		dlm_add_ast(lkb, AST_BAST);
327 	}
328 }
329 
330 /*
331  * Basic operations on rsb's and lkb's
332  */
333 
334 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
335 {
336 	struct dlm_rsb *r;
337 
338 	r = allocate_rsb(ls, len);
339 	if (!r)
340 		return NULL;
341 
342 	r->res_ls = ls;
343 	r->res_length = len;
344 	memcpy(r->res_name, name, len);
345 	mutex_init(&r->res_mutex);
346 
347 	INIT_LIST_HEAD(&r->res_lookup);
348 	INIT_LIST_HEAD(&r->res_grantqueue);
349 	INIT_LIST_HEAD(&r->res_convertqueue);
350 	INIT_LIST_HEAD(&r->res_waitqueue);
351 	INIT_LIST_HEAD(&r->res_root_list);
352 	INIT_LIST_HEAD(&r->res_recover_list);
353 
354 	return r;
355 }
356 
357 static int search_rsb_list(struct list_head *head, char *name, int len,
358 			   unsigned int flags, struct dlm_rsb **r_ret)
359 {
360 	struct dlm_rsb *r;
361 	int error = 0;
362 
363 	list_for_each_entry(r, head, res_hashchain) {
364 		if (len == r->res_length && !memcmp(name, r->res_name, len))
365 			goto found;
366 	}
367 	return -EBADR;
368 
369  found:
370 	if (r->res_nodeid && (flags & R_MASTER))
371 		error = -ENOTBLK;
372 	*r_ret = r;
373 	return error;
374 }
375 
376 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
377 		       unsigned int flags, struct dlm_rsb **r_ret)
378 {
379 	struct dlm_rsb *r;
380 	int error;
381 
382 	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
383 	if (!error) {
384 		kref_get(&r->res_ref);
385 		goto out;
386 	}
387 	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
388 	if (error)
389 		goto out;
390 
391 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
392 
393 	if (dlm_no_directory(ls))
394 		goto out;
395 
396 	if (r->res_nodeid == -1) {
397 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
398 		r->res_first_lkid = 0;
399 	} else if (r->res_nodeid > 0) {
400 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
401 		r->res_first_lkid = 0;
402 	} else {
403 		DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
404 		DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
405 	}
406  out:
407 	*r_ret = r;
408 	return error;
409 }
410 
411 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
412 		      unsigned int flags, struct dlm_rsb **r_ret)
413 {
414 	int error;
415 	write_lock(&ls->ls_rsbtbl[b].lock);
416 	error = _search_rsb(ls, name, len, b, flags, r_ret);
417 	write_unlock(&ls->ls_rsbtbl[b].lock);
418 	return error;
419 }
420 
421 /*
422  * Find rsb in rsbtbl and potentially create/add one
423  *
424  * Delaying the release of rsb's has a similar benefit to applications keeping
425  * NL locks on an rsb, but without the guarantee that the cached master value
426  * will still be valid when the rsb is reused.  Apps aren't always smart enough
427  * to keep NL locks on an rsb that they may lock again shortly; this can lead
428  * to excessive master lookups and removals if we don't delay the release.
429  *
430  * Searching for an rsb means looking through both the normal list and toss
431  * list.  When found on the toss list the rsb is moved to the normal list with
432  * ref count of 1; when found on normal list the ref count is incremented.
433  */
434 
435 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
436 		    unsigned int flags, struct dlm_rsb **r_ret)
437 {
438 	struct dlm_rsb *r, *tmp;
439 	uint32_t hash, bucket;
440 	int error = 0;
441 
442 	if (dlm_no_directory(ls))
443 		flags |= R_CREATE;
444 
445 	hash = jhash(name, namelen, 0);
446 	bucket = hash & (ls->ls_rsbtbl_size - 1);
447 
448 	error = search_rsb(ls, name, namelen, bucket, flags, &r);
449 	if (!error)
450 		goto out;
451 
452 	if (error == -EBADR && !(flags & R_CREATE))
453 		goto out;
454 
455 	/* the rsb was found but wasn't a master copy */
456 	if (error == -ENOTBLK)
457 		goto out;
458 
459 	error = -ENOMEM;
460 	r = create_rsb(ls, name, namelen);
461 	if (!r)
462 		goto out;
463 
464 	r->res_hash = hash;
465 	r->res_bucket = bucket;
466 	r->res_nodeid = -1;
467 	kref_init(&r->res_ref);
468 
469 	/* With no directory, the master can be set immediately */
470 	if (dlm_no_directory(ls)) {
471 		int nodeid = dlm_dir_nodeid(r);
472 		if (nodeid == dlm_our_nodeid())
473 			nodeid = 0;
474 		r->res_nodeid = nodeid;
475 	}
476 
477 	write_lock(&ls->ls_rsbtbl[bucket].lock);
478 	error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
479 	if (!error) {
480 		write_unlock(&ls->ls_rsbtbl[bucket].lock);
481 		free_rsb(r);
482 		r = tmp;
483 		goto out;
484 	}
485 	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
486 	write_unlock(&ls->ls_rsbtbl[bucket].lock);
487 	error = 0;
488  out:
489 	*r_ret = r;
490 	return error;
491 }
492 
493 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
494 		 unsigned int flags, struct dlm_rsb **r_ret)
495 {
496 	return find_rsb(ls, name, namelen, flags, r_ret);
497 }
498 
499 /* This is only called to add a reference when the code already holds
500    a valid reference to the rsb, so there's no need for locking. */
501 
502 static inline void hold_rsb(struct dlm_rsb *r)
503 {
504 	kref_get(&r->res_ref);
505 }
506 
507 void dlm_hold_rsb(struct dlm_rsb *r)
508 {
509 	hold_rsb(r);
510 }
511 
512 static void toss_rsb(struct kref *kref)
513 {
514 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
515 	struct dlm_ls *ls = r->res_ls;
516 
517 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
518 	kref_init(&r->res_ref);
519 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
520 	r->res_toss_time = jiffies;
521 	if (r->res_lvbptr) {
522 		free_lvb(r->res_lvbptr);
523 		r->res_lvbptr = NULL;
524 	}
525 }
526 
527 /* When all references to the rsb are gone it's transfered to
528    the tossed list for later disposal. */
529 
530 static void put_rsb(struct dlm_rsb *r)
531 {
532 	struct dlm_ls *ls = r->res_ls;
533 	uint32_t bucket = r->res_bucket;
534 
535 	write_lock(&ls->ls_rsbtbl[bucket].lock);
536 	kref_put(&r->res_ref, toss_rsb);
537 	write_unlock(&ls->ls_rsbtbl[bucket].lock);
538 }
539 
540 void dlm_put_rsb(struct dlm_rsb *r)
541 {
542 	put_rsb(r);
543 }
544 
545 /* See comment for unhold_lkb */
546 
547 static void unhold_rsb(struct dlm_rsb *r)
548 {
549 	int rv;
550 	rv = kref_put(&r->res_ref, toss_rsb);
551 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
552 }
553 
554 static void kill_rsb(struct kref *kref)
555 {
556 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
557 
558 	/* All work is done after the return from kref_put() so we
559 	   can release the write_lock before the remove and free. */
560 
561 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
562 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
563 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
564 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
565 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
566 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
567 }
568 
569 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
570    The rsb must exist as long as any lkb's for it do. */
571 
572 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
573 {
574 	hold_rsb(r);
575 	lkb->lkb_resource = r;
576 }
577 
578 static void detach_lkb(struct dlm_lkb *lkb)
579 {
580 	if (lkb->lkb_resource) {
581 		put_rsb(lkb->lkb_resource);
582 		lkb->lkb_resource = NULL;
583 	}
584 }
585 
586 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
587 {
588 	struct dlm_lkb *lkb, *tmp;
589 	uint32_t lkid = 0;
590 	uint16_t bucket;
591 
592 	lkb = allocate_lkb(ls);
593 	if (!lkb)
594 		return -ENOMEM;
595 
596 	lkb->lkb_nodeid = -1;
597 	lkb->lkb_grmode = DLM_LOCK_IV;
598 	kref_init(&lkb->lkb_ref);
599 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
600 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
601 	INIT_LIST_HEAD(&lkb->lkb_time_list);
602 
603 	get_random_bytes(&bucket, sizeof(bucket));
604 	bucket &= (ls->ls_lkbtbl_size - 1);
605 
606 	write_lock(&ls->ls_lkbtbl[bucket].lock);
607 
608 	/* counter can roll over so we must verify lkid is not in use */
609 
610 	while (lkid == 0) {
611 		lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
612 
613 		list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
614 				    lkb_idtbl_list) {
615 			if (tmp->lkb_id != lkid)
616 				continue;
617 			lkid = 0;
618 			break;
619 		}
620 	}
621 
622 	lkb->lkb_id = lkid;
623 	list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
624 	write_unlock(&ls->ls_lkbtbl[bucket].lock);
625 
626 	*lkb_ret = lkb;
627 	return 0;
628 }
629 
630 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
631 {
632 	struct dlm_lkb *lkb;
633 	uint16_t bucket = (lkid >> 16);
634 
635 	list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
636 		if (lkb->lkb_id == lkid)
637 			return lkb;
638 	}
639 	return NULL;
640 }
641 
642 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
643 {
644 	struct dlm_lkb *lkb;
645 	uint16_t bucket = (lkid >> 16);
646 
647 	if (bucket >= ls->ls_lkbtbl_size)
648 		return -EBADSLT;
649 
650 	read_lock(&ls->ls_lkbtbl[bucket].lock);
651 	lkb = __find_lkb(ls, lkid);
652 	if (lkb)
653 		kref_get(&lkb->lkb_ref);
654 	read_unlock(&ls->ls_lkbtbl[bucket].lock);
655 
656 	*lkb_ret = lkb;
657 	return lkb ? 0 : -ENOENT;
658 }
659 
660 static void kill_lkb(struct kref *kref)
661 {
662 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
663 
664 	/* All work is done after the return from kref_put() so we
665 	   can release the write_lock before the detach_lkb */
666 
667 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
668 }
669 
670 /* __put_lkb() is used when an lkb may not have an rsb attached to
671    it so we need to provide the lockspace explicitly */
672 
673 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
674 {
675 	uint16_t bucket = (lkb->lkb_id >> 16);
676 
677 	write_lock(&ls->ls_lkbtbl[bucket].lock);
678 	if (kref_put(&lkb->lkb_ref, kill_lkb)) {
679 		list_del(&lkb->lkb_idtbl_list);
680 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
681 
682 		detach_lkb(lkb);
683 
684 		/* for local/process lkbs, lvbptr points to caller's lksb */
685 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
686 			free_lvb(lkb->lkb_lvbptr);
687 		free_lkb(lkb);
688 		return 1;
689 	} else {
690 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
691 		return 0;
692 	}
693 }
694 
695 int dlm_put_lkb(struct dlm_lkb *lkb)
696 {
697 	struct dlm_ls *ls;
698 
699 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
700 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
701 
702 	ls = lkb->lkb_resource->res_ls;
703 	return __put_lkb(ls, lkb);
704 }
705 
706 /* This is only called to add a reference when the code already holds
707    a valid reference to the lkb, so there's no need for locking. */
708 
709 static inline void hold_lkb(struct dlm_lkb *lkb)
710 {
711 	kref_get(&lkb->lkb_ref);
712 }
713 
714 /* This is called when we need to remove a reference and are certain
715    it's not the last ref.  e.g. del_lkb is always called between a
716    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
717    put_lkb would work fine, but would involve unnecessary locking */
718 
719 static inline void unhold_lkb(struct dlm_lkb *lkb)
720 {
721 	int rv;
722 	rv = kref_put(&lkb->lkb_ref, kill_lkb);
723 	DLM_ASSERT(!rv, dlm_print_lkb(lkb););
724 }
725 
726 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
727 			    int mode)
728 {
729 	struct dlm_lkb *lkb = NULL;
730 
731 	list_for_each_entry(lkb, head, lkb_statequeue)
732 		if (lkb->lkb_rqmode < mode)
733 			break;
734 
735 	if (!lkb)
736 		list_add_tail(new, head);
737 	else
738 		__list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
739 }
740 
741 /* add/remove lkb to rsb's grant/convert/wait queue */
742 
743 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
744 {
745 	kref_get(&lkb->lkb_ref);
746 
747 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
748 
749 	lkb->lkb_status = status;
750 
751 	switch (status) {
752 	case DLM_LKSTS_WAITING:
753 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
754 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
755 		else
756 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
757 		break;
758 	case DLM_LKSTS_GRANTED:
759 		/* convention says granted locks kept in order of grmode */
760 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
761 				lkb->lkb_grmode);
762 		break;
763 	case DLM_LKSTS_CONVERT:
764 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
765 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
766 		else
767 			list_add_tail(&lkb->lkb_statequeue,
768 				      &r->res_convertqueue);
769 		break;
770 	default:
771 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
772 	}
773 }
774 
775 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
776 {
777 	lkb->lkb_status = 0;
778 	list_del(&lkb->lkb_statequeue);
779 	unhold_lkb(lkb);
780 }
781 
782 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
783 {
784 	hold_lkb(lkb);
785 	del_lkb(r, lkb);
786 	add_lkb(r, lkb, sts);
787 	unhold_lkb(lkb);
788 }
789 
790 static int msg_reply_type(int mstype)
791 {
792 	switch (mstype) {
793 	case DLM_MSG_REQUEST:
794 		return DLM_MSG_REQUEST_REPLY;
795 	case DLM_MSG_CONVERT:
796 		return DLM_MSG_CONVERT_REPLY;
797 	case DLM_MSG_UNLOCK:
798 		return DLM_MSG_UNLOCK_REPLY;
799 	case DLM_MSG_CANCEL:
800 		return DLM_MSG_CANCEL_REPLY;
801 	case DLM_MSG_LOOKUP:
802 		return DLM_MSG_LOOKUP_REPLY;
803 	}
804 	return -1;
805 }
806 
807 /* add/remove lkb from global waiters list of lkb's waiting for
808    a reply from a remote node */
809 
810 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
811 {
812 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
813 	int error = 0;
814 
815 	mutex_lock(&ls->ls_waiters_mutex);
816 
817 	if (is_overlap_unlock(lkb) ||
818 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
819 		error = -EINVAL;
820 		goto out;
821 	}
822 
823 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
824 		switch (mstype) {
825 		case DLM_MSG_UNLOCK:
826 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
827 			break;
828 		case DLM_MSG_CANCEL:
829 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
830 			break;
831 		default:
832 			error = -EBUSY;
833 			goto out;
834 		}
835 		lkb->lkb_wait_count++;
836 		hold_lkb(lkb);
837 
838 		log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
839 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
840 			  lkb->lkb_wait_count, lkb->lkb_flags);
841 		goto out;
842 	}
843 
844 	DLM_ASSERT(!lkb->lkb_wait_count,
845 		   dlm_print_lkb(lkb);
846 		   printk("wait_count %d\n", lkb->lkb_wait_count););
847 
848 	lkb->lkb_wait_count++;
849 	lkb->lkb_wait_type = mstype;
850 	hold_lkb(lkb);
851 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
852  out:
853 	if (error)
854 		log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
855 			  lkb->lkb_id, error, lkb->lkb_flags, mstype,
856 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
857 	mutex_unlock(&ls->ls_waiters_mutex);
858 	return error;
859 }
860 
861 /* We clear the RESEND flag because we might be taking an lkb off the waiters
862    list as part of process_requestqueue (e.g. a lookup that has an optimized
863    request reply on the requestqueue) between dlm_recover_waiters_pre() which
864    set RESEND and dlm_recover_waiters_post() */
865 
866 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
867 {
868 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
869 	int overlap_done = 0;
870 
871 	if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
872 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
873 		overlap_done = 1;
874 		goto out_del;
875 	}
876 
877 	if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
878 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
879 		overlap_done = 1;
880 		goto out_del;
881 	}
882 
883 	/* N.B. type of reply may not always correspond to type of original
884 	   msg due to lookup->request optimization, verify others? */
885 
886 	if (lkb->lkb_wait_type) {
887 		lkb->lkb_wait_type = 0;
888 		goto out_del;
889 	}
890 
891 	log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
892 		  lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
893 	return -1;
894 
895  out_del:
896 	/* the force-unlock/cancel has completed and we haven't recvd a reply
897 	   to the op that was in progress prior to the unlock/cancel; we
898 	   give up on any reply to the earlier op.  FIXME: not sure when/how
899 	   this would happen */
900 
901 	if (overlap_done && lkb->lkb_wait_type) {
902 		log_error(ls, "remove_from_waiters %x reply %d give up on %d",
903 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
904 		lkb->lkb_wait_count--;
905 		lkb->lkb_wait_type = 0;
906 	}
907 
908 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
909 
910 	lkb->lkb_flags &= ~DLM_IFL_RESEND;
911 	lkb->lkb_wait_count--;
912 	if (!lkb->lkb_wait_count)
913 		list_del_init(&lkb->lkb_wait_reply);
914 	unhold_lkb(lkb);
915 	return 0;
916 }
917 
918 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
919 {
920 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
921 	int error;
922 
923 	mutex_lock(&ls->ls_waiters_mutex);
924 	error = _remove_from_waiters(lkb, mstype);
925 	mutex_unlock(&ls->ls_waiters_mutex);
926 	return error;
927 }
928 
929 /* Handles situations where we might be processing a "fake" or "stub" reply in
930    which we can't try to take waiters_mutex again. */
931 
932 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
933 {
934 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
935 	int error;
936 
937 	if (ms != &ls->ls_stub_ms)
938 		mutex_lock(&ls->ls_waiters_mutex);
939 	error = _remove_from_waiters(lkb, ms->m_type);
940 	if (ms != &ls->ls_stub_ms)
941 		mutex_unlock(&ls->ls_waiters_mutex);
942 	return error;
943 }
944 
945 static void dir_remove(struct dlm_rsb *r)
946 {
947 	int to_nodeid;
948 
949 	if (dlm_no_directory(r->res_ls))
950 		return;
951 
952 	to_nodeid = dlm_dir_nodeid(r);
953 	if (to_nodeid != dlm_our_nodeid())
954 		send_remove(r);
955 	else
956 		dlm_dir_remove_entry(r->res_ls, to_nodeid,
957 				     r->res_name, r->res_length);
958 }
959 
960 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
961    found since they are in order of newest to oldest? */
962 
963 static int shrink_bucket(struct dlm_ls *ls, int b)
964 {
965 	struct dlm_rsb *r;
966 	int count = 0, found;
967 
968 	for (;;) {
969 		found = 0;
970 		write_lock(&ls->ls_rsbtbl[b].lock);
971 		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
972 					    res_hashchain) {
973 			if (!time_after_eq(jiffies, r->res_toss_time +
974 					   dlm_config.ci_toss_secs * HZ))
975 				continue;
976 			found = 1;
977 			break;
978 		}
979 
980 		if (!found) {
981 			write_unlock(&ls->ls_rsbtbl[b].lock);
982 			break;
983 		}
984 
985 		if (kref_put(&r->res_ref, kill_rsb)) {
986 			list_del(&r->res_hashchain);
987 			write_unlock(&ls->ls_rsbtbl[b].lock);
988 
989 			if (is_master(r))
990 				dir_remove(r);
991 			free_rsb(r);
992 			count++;
993 		} else {
994 			write_unlock(&ls->ls_rsbtbl[b].lock);
995 			log_error(ls, "tossed rsb in use %s", r->res_name);
996 		}
997 	}
998 
999 	return count;
1000 }
1001 
1002 void dlm_scan_rsbs(struct dlm_ls *ls)
1003 {
1004 	int i;
1005 
1006 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1007 		shrink_bucket(ls, i);
1008 		if (dlm_locking_stopped(ls))
1009 			break;
1010 		cond_resched();
1011 	}
1012 }
1013 
1014 static void add_timeout(struct dlm_lkb *lkb)
1015 {
1016 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1017 
1018 	if (is_master_copy(lkb)) {
1019 		lkb->lkb_timestamp = jiffies;
1020 		return;
1021 	}
1022 
1023 	if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1024 	    !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1025 		lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1026 		goto add_it;
1027 	}
1028 	if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1029 		goto add_it;
1030 	return;
1031 
1032  add_it:
1033 	DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1034 	mutex_lock(&ls->ls_timeout_mutex);
1035 	hold_lkb(lkb);
1036 	lkb->lkb_timestamp = jiffies;
1037 	list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1038 	mutex_unlock(&ls->ls_timeout_mutex);
1039 }
1040 
1041 static void del_timeout(struct dlm_lkb *lkb)
1042 {
1043 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1044 
1045 	mutex_lock(&ls->ls_timeout_mutex);
1046 	if (!list_empty(&lkb->lkb_time_list)) {
1047 		list_del_init(&lkb->lkb_time_list);
1048 		unhold_lkb(lkb);
1049 	}
1050 	mutex_unlock(&ls->ls_timeout_mutex);
1051 }
1052 
1053 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1054    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1055    and then lock rsb because of lock ordering in add_timeout.  We may need
1056    to specify some special timeout-related bits in the lkb that are just to
1057    be accessed under the timeout_mutex. */
1058 
1059 void dlm_scan_timeout(struct dlm_ls *ls)
1060 {
1061 	struct dlm_rsb *r;
1062 	struct dlm_lkb *lkb;
1063 	int do_cancel, do_warn;
1064 
1065 	for (;;) {
1066 		if (dlm_locking_stopped(ls))
1067 			break;
1068 
1069 		do_cancel = 0;
1070 		do_warn = 0;
1071 		mutex_lock(&ls->ls_timeout_mutex);
1072 		list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1073 
1074 			if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1075 			    time_after_eq(jiffies, lkb->lkb_timestamp +
1076 					  lkb->lkb_timeout_cs * HZ/100))
1077 				do_cancel = 1;
1078 
1079 			if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1080 			    time_after_eq(jiffies, lkb->lkb_timestamp +
1081 				   	   dlm_config.ci_timewarn_cs * HZ/100))
1082 				do_warn = 1;
1083 
1084 			if (!do_cancel && !do_warn)
1085 				continue;
1086 			hold_lkb(lkb);
1087 			break;
1088 		}
1089 		mutex_unlock(&ls->ls_timeout_mutex);
1090 
1091 		if (!do_cancel && !do_warn)
1092 			break;
1093 
1094 		r = lkb->lkb_resource;
1095 		hold_rsb(r);
1096 		lock_rsb(r);
1097 
1098 		if (do_warn) {
1099 			/* clear flag so we only warn once */
1100 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1101 			if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1102 				del_timeout(lkb);
1103 			dlm_timeout_warn(lkb);
1104 		}
1105 
1106 		if (do_cancel) {
1107 			log_debug(ls, "timeout cancel %x node %d %s",
1108 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1109 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1110 			lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1111 			del_timeout(lkb);
1112 			_cancel_lock(r, lkb);
1113 		}
1114 
1115 		unlock_rsb(r);
1116 		unhold_rsb(r);
1117 		dlm_put_lkb(lkb);
1118 	}
1119 }
1120 
1121 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1122    dlm_recoverd before checking/setting ls_recover_begin. */
1123 
1124 void dlm_adjust_timeouts(struct dlm_ls *ls)
1125 {
1126 	struct dlm_lkb *lkb;
1127 	long adj = jiffies - ls->ls_recover_begin;
1128 
1129 	ls->ls_recover_begin = 0;
1130 	mutex_lock(&ls->ls_timeout_mutex);
1131 	list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1132 		lkb->lkb_timestamp += adj;
1133 	mutex_unlock(&ls->ls_timeout_mutex);
1134 }
1135 
1136 /* lkb is master or local copy */
1137 
1138 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1139 {
1140 	int b, len = r->res_ls->ls_lvblen;
1141 
1142 	/* b=1 lvb returned to caller
1143 	   b=0 lvb written to rsb or invalidated
1144 	   b=-1 do nothing */
1145 
1146 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1147 
1148 	if (b == 1) {
1149 		if (!lkb->lkb_lvbptr)
1150 			return;
1151 
1152 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1153 			return;
1154 
1155 		if (!r->res_lvbptr)
1156 			return;
1157 
1158 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1159 		lkb->lkb_lvbseq = r->res_lvbseq;
1160 
1161 	} else if (b == 0) {
1162 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1163 			rsb_set_flag(r, RSB_VALNOTVALID);
1164 			return;
1165 		}
1166 
1167 		if (!lkb->lkb_lvbptr)
1168 			return;
1169 
1170 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1171 			return;
1172 
1173 		if (!r->res_lvbptr)
1174 			r->res_lvbptr = allocate_lvb(r->res_ls);
1175 
1176 		if (!r->res_lvbptr)
1177 			return;
1178 
1179 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1180 		r->res_lvbseq++;
1181 		lkb->lkb_lvbseq = r->res_lvbseq;
1182 		rsb_clear_flag(r, RSB_VALNOTVALID);
1183 	}
1184 
1185 	if (rsb_flag(r, RSB_VALNOTVALID))
1186 		lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1187 }
1188 
1189 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1190 {
1191 	if (lkb->lkb_grmode < DLM_LOCK_PW)
1192 		return;
1193 
1194 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1195 		rsb_set_flag(r, RSB_VALNOTVALID);
1196 		return;
1197 	}
1198 
1199 	if (!lkb->lkb_lvbptr)
1200 		return;
1201 
1202 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1203 		return;
1204 
1205 	if (!r->res_lvbptr)
1206 		r->res_lvbptr = allocate_lvb(r->res_ls);
1207 
1208 	if (!r->res_lvbptr)
1209 		return;
1210 
1211 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1212 	r->res_lvbseq++;
1213 	rsb_clear_flag(r, RSB_VALNOTVALID);
1214 }
1215 
1216 /* lkb is process copy (pc) */
1217 
1218 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1219 			    struct dlm_message *ms)
1220 {
1221 	int b;
1222 
1223 	if (!lkb->lkb_lvbptr)
1224 		return;
1225 
1226 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1227 		return;
1228 
1229 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1230 	if (b == 1) {
1231 		int len = receive_extralen(ms);
1232 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1233 		lkb->lkb_lvbseq = ms->m_lvbseq;
1234 	}
1235 }
1236 
1237 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1238    remove_lock -- used for unlock, removes lkb from granted
1239    revert_lock -- used for cancel, moves lkb from convert to granted
1240    grant_lock  -- used for request and convert, adds lkb to granted or
1241                   moves lkb from convert or waiting to granted
1242 
1243    Each of these is used for master or local copy lkb's.  There is
1244    also a _pc() variation used to make the corresponding change on
1245    a process copy (pc) lkb. */
1246 
1247 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1248 {
1249 	del_lkb(r, lkb);
1250 	lkb->lkb_grmode = DLM_LOCK_IV;
1251 	/* this unhold undoes the original ref from create_lkb()
1252 	   so this leads to the lkb being freed */
1253 	unhold_lkb(lkb);
1254 }
1255 
1256 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1257 {
1258 	set_lvb_unlock(r, lkb);
1259 	_remove_lock(r, lkb);
1260 }
1261 
1262 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1263 {
1264 	_remove_lock(r, lkb);
1265 }
1266 
1267 /* returns: 0 did nothing
1268 	    1 moved lock to granted
1269 	   -1 removed lock */
1270 
1271 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1272 {
1273 	int rv = 0;
1274 
1275 	lkb->lkb_rqmode = DLM_LOCK_IV;
1276 
1277 	switch (lkb->lkb_status) {
1278 	case DLM_LKSTS_GRANTED:
1279 		break;
1280 	case DLM_LKSTS_CONVERT:
1281 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1282 		rv = 1;
1283 		break;
1284 	case DLM_LKSTS_WAITING:
1285 		del_lkb(r, lkb);
1286 		lkb->lkb_grmode = DLM_LOCK_IV;
1287 		/* this unhold undoes the original ref from create_lkb()
1288 		   so this leads to the lkb being freed */
1289 		unhold_lkb(lkb);
1290 		rv = -1;
1291 		break;
1292 	default:
1293 		log_print("invalid status for revert %d", lkb->lkb_status);
1294 	}
1295 	return rv;
1296 }
1297 
1298 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1299 {
1300 	return revert_lock(r, lkb);
1301 }
1302 
1303 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1304 {
1305 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1306 		lkb->lkb_grmode = lkb->lkb_rqmode;
1307 		if (lkb->lkb_status)
1308 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1309 		else
1310 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1311 	}
1312 
1313 	lkb->lkb_rqmode = DLM_LOCK_IV;
1314 }
1315 
1316 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1317 {
1318 	set_lvb_lock(r, lkb);
1319 	_grant_lock(r, lkb);
1320 	lkb->lkb_highbast = 0;
1321 }
1322 
1323 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1324 			  struct dlm_message *ms)
1325 {
1326 	set_lvb_lock_pc(r, lkb, ms);
1327 	_grant_lock(r, lkb);
1328 }
1329 
1330 /* called by grant_pending_locks() which means an async grant message must
1331    be sent to the requesting node in addition to granting the lock if the
1332    lkb belongs to a remote node. */
1333 
1334 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1335 {
1336 	grant_lock(r, lkb);
1337 	if (is_master_copy(lkb))
1338 		send_grant(r, lkb);
1339 	else
1340 		queue_cast(r, lkb, 0);
1341 }
1342 
1343 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1344    change the granted/requested modes.  We're munging things accordingly in
1345    the process copy.
1346    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1347    conversion deadlock
1348    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1349    compatible with other granted locks */
1350 
1351 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1352 {
1353 	if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1354 		log_print("munge_demoted %x invalid reply type %d",
1355 			  lkb->lkb_id, ms->m_type);
1356 		return;
1357 	}
1358 
1359 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1360 		log_print("munge_demoted %x invalid modes gr %d rq %d",
1361 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1362 		return;
1363 	}
1364 
1365 	lkb->lkb_grmode = DLM_LOCK_NL;
1366 }
1367 
1368 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1369 {
1370 	if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1371 	    ms->m_type != DLM_MSG_GRANT) {
1372 		log_print("munge_altmode %x invalid reply type %d",
1373 			  lkb->lkb_id, ms->m_type);
1374 		return;
1375 	}
1376 
1377 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1378 		lkb->lkb_rqmode = DLM_LOCK_PR;
1379 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1380 		lkb->lkb_rqmode = DLM_LOCK_CW;
1381 	else {
1382 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1383 		dlm_print_lkb(lkb);
1384 	}
1385 }
1386 
1387 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1388 {
1389 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1390 					   lkb_statequeue);
1391 	if (lkb->lkb_id == first->lkb_id)
1392 		return 1;
1393 
1394 	return 0;
1395 }
1396 
1397 /* Check if the given lkb conflicts with another lkb on the queue. */
1398 
1399 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1400 {
1401 	struct dlm_lkb *this;
1402 
1403 	list_for_each_entry(this, head, lkb_statequeue) {
1404 		if (this == lkb)
1405 			continue;
1406 		if (!modes_compat(this, lkb))
1407 			return 1;
1408 	}
1409 	return 0;
1410 }
1411 
1412 /*
1413  * "A conversion deadlock arises with a pair of lock requests in the converting
1414  * queue for one resource.  The granted mode of each lock blocks the requested
1415  * mode of the other lock."
1416  *
1417  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1418  * convert queue from being granted, then deadlk/demote lkb.
1419  *
1420  * Example:
1421  * Granted Queue: empty
1422  * Convert Queue: NL->EX (first lock)
1423  *                PR->EX (second lock)
1424  *
1425  * The first lock can't be granted because of the granted mode of the second
1426  * lock and the second lock can't be granted because it's not first in the
1427  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1428  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1429  * flag set and return DEMOTED in the lksb flags.
1430  *
1431  * Originally, this function detected conv-deadlk in a more limited scope:
1432  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1433  * - if lkb1 was the first entry in the queue (not just earlier), and was
1434  *   blocked by the granted mode of lkb2, and there was nothing on the
1435  *   granted queue preventing lkb1 from being granted immediately, i.e.
1436  *   lkb2 was the only thing preventing lkb1 from being granted.
1437  *
1438  * That second condition meant we'd only say there was conv-deadlk if
1439  * resolving it (by demotion) would lead to the first lock on the convert
1440  * queue being granted right away.  It allowed conversion deadlocks to exist
1441  * between locks on the convert queue while they couldn't be granted anyway.
1442  *
1443  * Now, we detect and take action on conversion deadlocks immediately when
1444  * they're created, even if they may not be immediately consequential.  If
1445  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1446  * mode that would prevent lkb1's conversion from being granted, we do a
1447  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1448  * I think this means that the lkb_is_ahead condition below should always
1449  * be zero, i.e. there will never be conv-deadlk between two locks that are
1450  * both already on the convert queue.
1451  */
1452 
1453 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1454 {
1455 	struct dlm_lkb *lkb1;
1456 	int lkb_is_ahead = 0;
1457 
1458 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1459 		if (lkb1 == lkb2) {
1460 			lkb_is_ahead = 1;
1461 			continue;
1462 		}
1463 
1464 		if (!lkb_is_ahead) {
1465 			if (!modes_compat(lkb2, lkb1))
1466 				return 1;
1467 		} else {
1468 			if (!modes_compat(lkb2, lkb1) &&
1469 			    !modes_compat(lkb1, lkb2))
1470 				return 1;
1471 		}
1472 	}
1473 	return 0;
1474 }
1475 
1476 /*
1477  * Return 1 if the lock can be granted, 0 otherwise.
1478  * Also detect and resolve conversion deadlocks.
1479  *
1480  * lkb is the lock to be granted
1481  *
1482  * now is 1 if the function is being called in the context of the
1483  * immediate request, it is 0 if called later, after the lock has been
1484  * queued.
1485  *
1486  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1487  */
1488 
1489 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1490 {
1491 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1492 
1493 	/*
1494 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1495 	 * a new request for a NL mode lock being blocked.
1496 	 *
1497 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1498 	 * request, then it would be granted.  In essence, the use of this flag
1499 	 * tells the Lock Manager to expedite theis request by not considering
1500 	 * what may be in the CONVERTING or WAITING queues...  As of this
1501 	 * writing, the EXPEDITE flag can be used only with new requests for NL
1502 	 * mode locks.  This flag is not valid for conversion requests.
1503 	 *
1504 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1505 	 * conversion or used with a non-NL requested mode.  We also know an
1506 	 * EXPEDITE request is always granted immediately, so now must always
1507 	 * be 1.  The full condition to grant an expedite request: (now &&
1508 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1509 	 * therefore be shortened to just checking the flag.
1510 	 */
1511 
1512 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1513 		return 1;
1514 
1515 	/*
1516 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1517 	 * added to the remaining conditions.
1518 	 */
1519 
1520 	if (queue_conflict(&r->res_grantqueue, lkb))
1521 		goto out;
1522 
1523 	/*
1524 	 * 6-3: By default, a conversion request is immediately granted if the
1525 	 * requested mode is compatible with the modes of all other granted
1526 	 * locks
1527 	 */
1528 
1529 	if (queue_conflict(&r->res_convertqueue, lkb))
1530 		goto out;
1531 
1532 	/*
1533 	 * 6-5: But the default algorithm for deciding whether to grant or
1534 	 * queue conversion requests does not by itself guarantee that such
1535 	 * requests are serviced on a "first come first serve" basis.  This, in
1536 	 * turn, can lead to a phenomenon known as "indefinate postponement".
1537 	 *
1538 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1539 	 * the system service employed to request a lock conversion.  This flag
1540 	 * forces certain conversion requests to be queued, even if they are
1541 	 * compatible with the granted modes of other locks on the same
1542 	 * resource.  Thus, the use of this flag results in conversion requests
1543 	 * being ordered on a "first come first servce" basis.
1544 	 *
1545 	 * DCT: This condition is all about new conversions being able to occur
1546 	 * "in place" while the lock remains on the granted queue (assuming
1547 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1548 	 * doesn't _have_ to go onto the convert queue where it's processed in
1549 	 * order.  The "now" variable is necessary to distinguish converts
1550 	 * being received and processed for the first time now, because once a
1551 	 * convert is moved to the conversion queue the condition below applies
1552 	 * requiring fifo granting.
1553 	 */
1554 
1555 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1556 		return 1;
1557 
1558 	/*
1559 	 * The NOORDER flag is set to avoid the standard vms rules on grant
1560 	 * order.
1561 	 */
1562 
1563 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1564 		return 1;
1565 
1566 	/*
1567 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1568 	 * granted until all other conversion requests ahead of it are granted
1569 	 * and/or canceled.
1570 	 */
1571 
1572 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1573 		return 1;
1574 
1575 	/*
1576 	 * 6-4: By default, a new request is immediately granted only if all
1577 	 * three of the following conditions are satisfied when the request is
1578 	 * issued:
1579 	 * - The queue of ungranted conversion requests for the resource is
1580 	 *   empty.
1581 	 * - The queue of ungranted new requests for the resource is empty.
1582 	 * - The mode of the new request is compatible with the most
1583 	 *   restrictive mode of all granted locks on the resource.
1584 	 */
1585 
1586 	if (now && !conv && list_empty(&r->res_convertqueue) &&
1587 	    list_empty(&r->res_waitqueue))
1588 		return 1;
1589 
1590 	/*
1591 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
1592 	 * it cannot be granted until the queue of ungranted conversion
1593 	 * requests is empty, all ungranted new requests ahead of it are
1594 	 * granted and/or canceled, and it is compatible with the granted mode
1595 	 * of the most restrictive lock granted on the resource.
1596 	 */
1597 
1598 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
1599 	    first_in_list(lkb, &r->res_waitqueue))
1600 		return 1;
1601  out:
1602 	return 0;
1603 }
1604 
1605 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1606 			  int *err)
1607 {
1608 	int rv;
1609 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1610 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1611 
1612 	if (err)
1613 		*err = 0;
1614 
1615 	rv = _can_be_granted(r, lkb, now);
1616 	if (rv)
1617 		goto out;
1618 
1619 	/*
1620 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1621 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1622 	 * cancels one of the locks.
1623 	 */
1624 
1625 	if (is_convert && can_be_queued(lkb) &&
1626 	    conversion_deadlock_detect(r, lkb)) {
1627 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1628 			lkb->lkb_grmode = DLM_LOCK_NL;
1629 			lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1630 		} else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1631 			if (err)
1632 				*err = -EDEADLK;
1633 			else {
1634 				log_print("can_be_granted deadlock %x now %d",
1635 					  lkb->lkb_id, now);
1636 				dlm_dump_rsb(r);
1637 			}
1638 		}
1639 		goto out;
1640 	}
1641 
1642 	/*
1643 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1644 	 * to grant a request in a mode other than the normal rqmode.  It's a
1645 	 * simple way to provide a big optimization to applications that can
1646 	 * use them.
1647 	 */
1648 
1649 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1650 		alt = DLM_LOCK_PR;
1651 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1652 		alt = DLM_LOCK_CW;
1653 
1654 	if (alt) {
1655 		lkb->lkb_rqmode = alt;
1656 		rv = _can_be_granted(r, lkb, now);
1657 		if (rv)
1658 			lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1659 		else
1660 			lkb->lkb_rqmode = rqmode;
1661 	}
1662  out:
1663 	return rv;
1664 }
1665 
1666 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1667    for locks pending on the convert list.  Once verified (watch for these
1668    log_prints), we should be able to just call _can_be_granted() and not
1669    bother with the demote/deadlk cases here (and there's no easy way to deal
1670    with a deadlk here, we'd have to generate something like grant_lock with
1671    the deadlk error.) */
1672 
1673 /* returns the highest requested mode of all blocked conversions */
1674 
1675 static int grant_pending_convert(struct dlm_rsb *r, int high)
1676 {
1677 	struct dlm_lkb *lkb, *s;
1678 	int hi, demoted, quit, grant_restart, demote_restart;
1679 	int deadlk;
1680 
1681 	quit = 0;
1682  restart:
1683 	grant_restart = 0;
1684 	demote_restart = 0;
1685 	hi = DLM_LOCK_IV;
1686 
1687 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1688 		demoted = is_demoted(lkb);
1689 		deadlk = 0;
1690 
1691 		if (can_be_granted(r, lkb, 0, &deadlk)) {
1692 			grant_lock_pending(r, lkb);
1693 			grant_restart = 1;
1694 			continue;
1695 		}
1696 
1697 		if (!demoted && is_demoted(lkb)) {
1698 			log_print("WARN: pending demoted %x node %d %s",
1699 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1700 			demote_restart = 1;
1701 			continue;
1702 		}
1703 
1704 		if (deadlk) {
1705 			log_print("WARN: pending deadlock %x node %d %s",
1706 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1707 			dlm_dump_rsb(r);
1708 			continue;
1709 		}
1710 
1711 		hi = max_t(int, lkb->lkb_rqmode, hi);
1712 	}
1713 
1714 	if (grant_restart)
1715 		goto restart;
1716 	if (demote_restart && !quit) {
1717 		quit = 1;
1718 		goto restart;
1719 	}
1720 
1721 	return max_t(int, high, hi);
1722 }
1723 
1724 static int grant_pending_wait(struct dlm_rsb *r, int high)
1725 {
1726 	struct dlm_lkb *lkb, *s;
1727 
1728 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1729 		if (can_be_granted(r, lkb, 0, NULL))
1730 			grant_lock_pending(r, lkb);
1731                 else
1732 			high = max_t(int, lkb->lkb_rqmode, high);
1733 	}
1734 
1735 	return high;
1736 }
1737 
1738 static void grant_pending_locks(struct dlm_rsb *r)
1739 {
1740 	struct dlm_lkb *lkb, *s;
1741 	int high = DLM_LOCK_IV;
1742 
1743 	DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1744 
1745 	high = grant_pending_convert(r, high);
1746 	high = grant_pending_wait(r, high);
1747 
1748 	if (high == DLM_LOCK_IV)
1749 		return;
1750 
1751 	/*
1752 	 * If there are locks left on the wait/convert queue then send blocking
1753 	 * ASTs to granted locks based on the largest requested mode (high)
1754 	 * found above. FIXME: highbast < high comparison not valid for PR/CW.
1755 	 */
1756 
1757 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1758 		if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1759 		    !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1760 			queue_bast(r, lkb, high);
1761 			lkb->lkb_highbast = high;
1762 		}
1763 	}
1764 }
1765 
1766 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1767 			    struct dlm_lkb *lkb)
1768 {
1769 	struct dlm_lkb *gr;
1770 
1771 	list_for_each_entry(gr, head, lkb_statequeue) {
1772 		if (gr->lkb_bastaddr &&
1773 		    gr->lkb_highbast < lkb->lkb_rqmode &&
1774 		    !modes_compat(gr, lkb)) {
1775 			queue_bast(r, gr, lkb->lkb_rqmode);
1776 			gr->lkb_highbast = lkb->lkb_rqmode;
1777 		}
1778 	}
1779 }
1780 
1781 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1782 {
1783 	send_bast_queue(r, &r->res_grantqueue, lkb);
1784 }
1785 
1786 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1787 {
1788 	send_bast_queue(r, &r->res_grantqueue, lkb);
1789 	send_bast_queue(r, &r->res_convertqueue, lkb);
1790 }
1791 
1792 /* set_master(r, lkb) -- set the master nodeid of a resource
1793 
1794    The purpose of this function is to set the nodeid field in the given
1795    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1796    known, it can just be copied to the lkb and the function will return
1797    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1798    before it can be copied to the lkb.
1799 
1800    When the rsb nodeid is being looked up remotely, the initial lkb
1801    causing the lookup is kept on the ls_waiters list waiting for the
1802    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1803    on the rsb's res_lookup list until the master is verified.
1804 
1805    Return values:
1806    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1807    1: the rsb master is not available and the lkb has been placed on
1808       a wait queue
1809 */
1810 
1811 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1812 {
1813 	struct dlm_ls *ls = r->res_ls;
1814 	int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1815 
1816 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1817 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1818 		r->res_first_lkid = lkb->lkb_id;
1819 		lkb->lkb_nodeid = r->res_nodeid;
1820 		return 0;
1821 	}
1822 
1823 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1824 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1825 		return 1;
1826 	}
1827 
1828 	if (r->res_nodeid == 0) {
1829 		lkb->lkb_nodeid = 0;
1830 		return 0;
1831 	}
1832 
1833 	if (r->res_nodeid > 0) {
1834 		lkb->lkb_nodeid = r->res_nodeid;
1835 		return 0;
1836 	}
1837 
1838 	DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1839 
1840 	dir_nodeid = dlm_dir_nodeid(r);
1841 
1842 	if (dir_nodeid != our_nodeid) {
1843 		r->res_first_lkid = lkb->lkb_id;
1844 		send_lookup(r, lkb);
1845 		return 1;
1846 	}
1847 
1848 	for (;;) {
1849 		/* It's possible for dlm_scand to remove an old rsb for
1850 		   this same resource from the toss list, us to create
1851 		   a new one, look up the master locally, and find it
1852 		   already exists just before dlm_scand does the
1853 		   dir_remove() on the previous rsb. */
1854 
1855 		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1856 				       r->res_length, &ret_nodeid);
1857 		if (!error)
1858 			break;
1859 		log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1860 		schedule();
1861 	}
1862 
1863 	if (ret_nodeid == our_nodeid) {
1864 		r->res_first_lkid = 0;
1865 		r->res_nodeid = 0;
1866 		lkb->lkb_nodeid = 0;
1867 	} else {
1868 		r->res_first_lkid = lkb->lkb_id;
1869 		r->res_nodeid = ret_nodeid;
1870 		lkb->lkb_nodeid = ret_nodeid;
1871 	}
1872 	return 0;
1873 }
1874 
1875 static void process_lookup_list(struct dlm_rsb *r)
1876 {
1877 	struct dlm_lkb *lkb, *safe;
1878 
1879 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1880 		list_del_init(&lkb->lkb_rsb_lookup);
1881 		_request_lock(r, lkb);
1882 		schedule();
1883 	}
1884 }
1885 
1886 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1887 
1888 static void confirm_master(struct dlm_rsb *r, int error)
1889 {
1890 	struct dlm_lkb *lkb;
1891 
1892 	if (!r->res_first_lkid)
1893 		return;
1894 
1895 	switch (error) {
1896 	case 0:
1897 	case -EINPROGRESS:
1898 		r->res_first_lkid = 0;
1899 		process_lookup_list(r);
1900 		break;
1901 
1902 	case -EAGAIN:
1903 		/* the remote master didn't queue our NOQUEUE request;
1904 		   make a waiting lkb the first_lkid */
1905 
1906 		r->res_first_lkid = 0;
1907 
1908 		if (!list_empty(&r->res_lookup)) {
1909 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1910 					 lkb_rsb_lookup);
1911 			list_del_init(&lkb->lkb_rsb_lookup);
1912 			r->res_first_lkid = lkb->lkb_id;
1913 			_request_lock(r, lkb);
1914 		} else
1915 			r->res_nodeid = -1;
1916 		break;
1917 
1918 	default:
1919 		log_error(r->res_ls, "confirm_master unknown error %d", error);
1920 	}
1921 }
1922 
1923 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1924 			 int namelen, unsigned long timeout_cs, void *ast,
1925 			 void *astarg, void *bast, struct dlm_args *args)
1926 {
1927 	int rv = -EINVAL;
1928 
1929 	/* check for invalid arg usage */
1930 
1931 	if (mode < 0 || mode > DLM_LOCK_EX)
1932 		goto out;
1933 
1934 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1935 		goto out;
1936 
1937 	if (flags & DLM_LKF_CANCEL)
1938 		goto out;
1939 
1940 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1941 		goto out;
1942 
1943 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1944 		goto out;
1945 
1946 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1947 		goto out;
1948 
1949 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1950 		goto out;
1951 
1952 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1953 		goto out;
1954 
1955 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1956 		goto out;
1957 
1958 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1959 		goto out;
1960 
1961 	if (!ast || !lksb)
1962 		goto out;
1963 
1964 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1965 		goto out;
1966 
1967 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1968 		goto out;
1969 
1970 	/* these args will be copied to the lkb in validate_lock_args,
1971 	   it cannot be done now because when converting locks, fields in
1972 	   an active lkb cannot be modified before locking the rsb */
1973 
1974 	args->flags = flags;
1975 	args->astaddr = ast;
1976 	args->astparam = (long) astarg;
1977 	args->bastaddr = bast;
1978 	args->timeout = timeout_cs;
1979 	args->mode = mode;
1980 	args->lksb = lksb;
1981 	rv = 0;
1982  out:
1983 	return rv;
1984 }
1985 
1986 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1987 {
1988 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1989  		      DLM_LKF_FORCEUNLOCK))
1990 		return -EINVAL;
1991 
1992 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1993 		return -EINVAL;
1994 
1995 	args->flags = flags;
1996 	args->astparam = (long) astarg;
1997 	return 0;
1998 }
1999 
2000 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2001 			      struct dlm_args *args)
2002 {
2003 	int rv = -EINVAL;
2004 
2005 	if (args->flags & DLM_LKF_CONVERT) {
2006 		if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2007 			goto out;
2008 
2009 		if (args->flags & DLM_LKF_QUECVT &&
2010 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2011 			goto out;
2012 
2013 		rv = -EBUSY;
2014 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2015 			goto out;
2016 
2017 		if (lkb->lkb_wait_type)
2018 			goto out;
2019 
2020 		if (is_overlap(lkb))
2021 			goto out;
2022 	}
2023 
2024 	lkb->lkb_exflags = args->flags;
2025 	lkb->lkb_sbflags = 0;
2026 	lkb->lkb_astaddr = args->astaddr;
2027 	lkb->lkb_astparam = args->astparam;
2028 	lkb->lkb_bastaddr = args->bastaddr;
2029 	lkb->lkb_rqmode = args->mode;
2030 	lkb->lkb_lksb = args->lksb;
2031 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2032 	lkb->lkb_ownpid = (int) current->pid;
2033 	lkb->lkb_timeout_cs = args->timeout;
2034 	rv = 0;
2035  out:
2036 	return rv;
2037 }
2038 
2039 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2040    for success */
2041 
2042 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2043    because there may be a lookup in progress and it's valid to do
2044    cancel/unlockf on it */
2045 
2046 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2047 {
2048 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2049 	int rv = -EINVAL;
2050 
2051 	if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2052 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2053 		dlm_print_lkb(lkb);
2054 		goto out;
2055 	}
2056 
2057 	/* an lkb may still exist even though the lock is EOL'ed due to a
2058 	   cancel, unlock or failed noqueue request; an app can't use these
2059 	   locks; return same error as if the lkid had not been found at all */
2060 
2061 	if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2062 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2063 		rv = -ENOENT;
2064 		goto out;
2065 	}
2066 
2067 	/* an lkb may be waiting for an rsb lookup to complete where the
2068 	   lookup was initiated by another lock */
2069 
2070 	if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2071 		if (!list_empty(&lkb->lkb_rsb_lookup)) {
2072 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2073 			list_del_init(&lkb->lkb_rsb_lookup);
2074 			queue_cast(lkb->lkb_resource, lkb,
2075 				   args->flags & DLM_LKF_CANCEL ?
2076 				   -DLM_ECANCEL : -DLM_EUNLOCK);
2077 			unhold_lkb(lkb); /* undoes create_lkb() */
2078 			rv = -EBUSY;
2079 			goto out;
2080 		}
2081 	}
2082 
2083 	/* cancel not allowed with another cancel/unlock in progress */
2084 
2085 	if (args->flags & DLM_LKF_CANCEL) {
2086 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2087 			goto out;
2088 
2089 		if (is_overlap(lkb))
2090 			goto out;
2091 
2092 		/* don't let scand try to do a cancel */
2093 		del_timeout(lkb);
2094 
2095 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2096 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2097 			rv = -EBUSY;
2098 			goto out;
2099 		}
2100 
2101 		switch (lkb->lkb_wait_type) {
2102 		case DLM_MSG_LOOKUP:
2103 		case DLM_MSG_REQUEST:
2104 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2105 			rv = -EBUSY;
2106 			goto out;
2107 		case DLM_MSG_UNLOCK:
2108 		case DLM_MSG_CANCEL:
2109 			goto out;
2110 		}
2111 		/* add_to_waiters() will set OVERLAP_CANCEL */
2112 		goto out_ok;
2113 	}
2114 
2115 	/* do we need to allow a force-unlock if there's a normal unlock
2116 	   already in progress?  in what conditions could the normal unlock
2117 	   fail such that we'd want to send a force-unlock to be sure? */
2118 
2119 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2120 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2121 			goto out;
2122 
2123 		if (is_overlap_unlock(lkb))
2124 			goto out;
2125 
2126 		/* don't let scand try to do a cancel */
2127 		del_timeout(lkb);
2128 
2129 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2130 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2131 			rv = -EBUSY;
2132 			goto out;
2133 		}
2134 
2135 		switch (lkb->lkb_wait_type) {
2136 		case DLM_MSG_LOOKUP:
2137 		case DLM_MSG_REQUEST:
2138 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2139 			rv = -EBUSY;
2140 			goto out;
2141 		case DLM_MSG_UNLOCK:
2142 			goto out;
2143 		}
2144 		/* add_to_waiters() will set OVERLAP_UNLOCK */
2145 		goto out_ok;
2146 	}
2147 
2148 	/* normal unlock not allowed if there's any op in progress */
2149 	rv = -EBUSY;
2150 	if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2151 		goto out;
2152 
2153  out_ok:
2154 	/* an overlapping op shouldn't blow away exflags from other op */
2155 	lkb->lkb_exflags |= args->flags;
2156 	lkb->lkb_sbflags = 0;
2157 	lkb->lkb_astparam = args->astparam;
2158 	rv = 0;
2159  out:
2160 	if (rv)
2161 		log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2162 			  lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2163 			  args->flags, lkb->lkb_wait_type,
2164 			  lkb->lkb_resource->res_name);
2165 	return rv;
2166 }
2167 
2168 /*
2169  * Four stage 4 varieties:
2170  * do_request(), do_convert(), do_unlock(), do_cancel()
2171  * These are called on the master node for the given lock and
2172  * from the central locking logic.
2173  */
2174 
2175 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2176 {
2177 	int error = 0;
2178 
2179 	if (can_be_granted(r, lkb, 1, NULL)) {
2180 		grant_lock(r, lkb);
2181 		queue_cast(r, lkb, 0);
2182 		goto out;
2183 	}
2184 
2185 	if (can_be_queued(lkb)) {
2186 		error = -EINPROGRESS;
2187 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
2188 		send_blocking_asts(r, lkb);
2189 		add_timeout(lkb);
2190 		goto out;
2191 	}
2192 
2193 	error = -EAGAIN;
2194 	if (force_blocking_asts(lkb))
2195 		send_blocking_asts_all(r, lkb);
2196 	queue_cast(r, lkb, -EAGAIN);
2197 
2198  out:
2199 	return error;
2200 }
2201 
2202 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2203 {
2204 	int error = 0;
2205 	int deadlk = 0;
2206 
2207 	/* changing an existing lock may allow others to be granted */
2208 
2209 	if (can_be_granted(r, lkb, 1, &deadlk)) {
2210 		grant_lock(r, lkb);
2211 		queue_cast(r, lkb, 0);
2212 		grant_pending_locks(r);
2213 		goto out;
2214 	}
2215 
2216 	/* can_be_granted() detected that this lock would block in a conversion
2217 	   deadlock, so we leave it on the granted queue and return EDEADLK in
2218 	   the ast for the convert. */
2219 
2220 	if (deadlk) {
2221 		/* it's left on the granted queue */
2222 		log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2223 			  lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2224 			  lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2225 		revert_lock(r, lkb);
2226 		queue_cast(r, lkb, -EDEADLK);
2227 		error = -EDEADLK;
2228 		goto out;
2229 	}
2230 
2231 	/* is_demoted() means the can_be_granted() above set the grmode
2232 	   to NL, and left us on the granted queue.  This auto-demotion
2233 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
2234 	   now grantable.  We have to try to grant other converting locks
2235 	   before we try again to grant this one. */
2236 
2237 	if (is_demoted(lkb)) {
2238 		grant_pending_convert(r, DLM_LOCK_IV);
2239 		if (_can_be_granted(r, lkb, 1)) {
2240 			grant_lock(r, lkb);
2241 			queue_cast(r, lkb, 0);
2242 			grant_pending_locks(r);
2243 			goto out;
2244 		}
2245 		/* else fall through and move to convert queue */
2246 	}
2247 
2248 	if (can_be_queued(lkb)) {
2249 		error = -EINPROGRESS;
2250 		del_lkb(r, lkb);
2251 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2252 		send_blocking_asts(r, lkb);
2253 		add_timeout(lkb);
2254 		goto out;
2255 	}
2256 
2257 	error = -EAGAIN;
2258 	if (force_blocking_asts(lkb))
2259 		send_blocking_asts_all(r, lkb);
2260 	queue_cast(r, lkb, -EAGAIN);
2261 
2262  out:
2263 	return error;
2264 }
2265 
2266 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2267 {
2268 	remove_lock(r, lkb);
2269 	queue_cast(r, lkb, -DLM_EUNLOCK);
2270 	grant_pending_locks(r);
2271 	return -DLM_EUNLOCK;
2272 }
2273 
2274 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2275 
2276 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2277 {
2278 	int error;
2279 
2280 	error = revert_lock(r, lkb);
2281 	if (error) {
2282 		queue_cast(r, lkb, -DLM_ECANCEL);
2283 		grant_pending_locks(r);
2284 		return -DLM_ECANCEL;
2285 	}
2286 	return 0;
2287 }
2288 
2289 /*
2290  * Four stage 3 varieties:
2291  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2292  */
2293 
2294 /* add a new lkb to a possibly new rsb, called by requesting process */
2295 
2296 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2297 {
2298 	int error;
2299 
2300 	/* set_master: sets lkb nodeid from r */
2301 
2302 	error = set_master(r, lkb);
2303 	if (error < 0)
2304 		goto out;
2305 	if (error) {
2306 		error = 0;
2307 		goto out;
2308 	}
2309 
2310 	if (is_remote(r))
2311 		/* receive_request() calls do_request() on remote node */
2312 		error = send_request(r, lkb);
2313 	else
2314 		error = do_request(r, lkb);
2315  out:
2316 	return error;
2317 }
2318 
2319 /* change some property of an existing lkb, e.g. mode */
2320 
2321 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2322 {
2323 	int error;
2324 
2325 	if (is_remote(r))
2326 		/* receive_convert() calls do_convert() on remote node */
2327 		error = send_convert(r, lkb);
2328 	else
2329 		error = do_convert(r, lkb);
2330 
2331 	return error;
2332 }
2333 
2334 /* remove an existing lkb from the granted queue */
2335 
2336 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2337 {
2338 	int error;
2339 
2340 	if (is_remote(r))
2341 		/* receive_unlock() calls do_unlock() on remote node */
2342 		error = send_unlock(r, lkb);
2343 	else
2344 		error = do_unlock(r, lkb);
2345 
2346 	return error;
2347 }
2348 
2349 /* remove an existing lkb from the convert or wait queue */
2350 
2351 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2352 {
2353 	int error;
2354 
2355 	if (is_remote(r))
2356 		/* receive_cancel() calls do_cancel() on remote node */
2357 		error = send_cancel(r, lkb);
2358 	else
2359 		error = do_cancel(r, lkb);
2360 
2361 	return error;
2362 }
2363 
2364 /*
2365  * Four stage 2 varieties:
2366  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2367  */
2368 
2369 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2370 			int len, struct dlm_args *args)
2371 {
2372 	struct dlm_rsb *r;
2373 	int error;
2374 
2375 	error = validate_lock_args(ls, lkb, args);
2376 	if (error)
2377 		goto out;
2378 
2379 	error = find_rsb(ls, name, len, R_CREATE, &r);
2380 	if (error)
2381 		goto out;
2382 
2383 	lock_rsb(r);
2384 
2385 	attach_lkb(r, lkb);
2386 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2387 
2388 	error = _request_lock(r, lkb);
2389 
2390 	unlock_rsb(r);
2391 	put_rsb(r);
2392 
2393  out:
2394 	return error;
2395 }
2396 
2397 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2398 			struct dlm_args *args)
2399 {
2400 	struct dlm_rsb *r;
2401 	int error;
2402 
2403 	r = lkb->lkb_resource;
2404 
2405 	hold_rsb(r);
2406 	lock_rsb(r);
2407 
2408 	error = validate_lock_args(ls, lkb, args);
2409 	if (error)
2410 		goto out;
2411 
2412 	error = _convert_lock(r, lkb);
2413  out:
2414 	unlock_rsb(r);
2415 	put_rsb(r);
2416 	return error;
2417 }
2418 
2419 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2420 		       struct dlm_args *args)
2421 {
2422 	struct dlm_rsb *r;
2423 	int error;
2424 
2425 	r = lkb->lkb_resource;
2426 
2427 	hold_rsb(r);
2428 	lock_rsb(r);
2429 
2430 	error = validate_unlock_args(lkb, args);
2431 	if (error)
2432 		goto out;
2433 
2434 	error = _unlock_lock(r, lkb);
2435  out:
2436 	unlock_rsb(r);
2437 	put_rsb(r);
2438 	return error;
2439 }
2440 
2441 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2442 		       struct dlm_args *args)
2443 {
2444 	struct dlm_rsb *r;
2445 	int error;
2446 
2447 	r = lkb->lkb_resource;
2448 
2449 	hold_rsb(r);
2450 	lock_rsb(r);
2451 
2452 	error = validate_unlock_args(lkb, args);
2453 	if (error)
2454 		goto out;
2455 
2456 	error = _cancel_lock(r, lkb);
2457  out:
2458 	unlock_rsb(r);
2459 	put_rsb(r);
2460 	return error;
2461 }
2462 
2463 /*
2464  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2465  */
2466 
2467 int dlm_lock(dlm_lockspace_t *lockspace,
2468 	     int mode,
2469 	     struct dlm_lksb *lksb,
2470 	     uint32_t flags,
2471 	     void *name,
2472 	     unsigned int namelen,
2473 	     uint32_t parent_lkid,
2474 	     void (*ast) (void *astarg),
2475 	     void *astarg,
2476 	     void (*bast) (void *astarg, int mode))
2477 {
2478 	struct dlm_ls *ls;
2479 	struct dlm_lkb *lkb;
2480 	struct dlm_args args;
2481 	int error, convert = flags & DLM_LKF_CONVERT;
2482 
2483 	ls = dlm_find_lockspace_local(lockspace);
2484 	if (!ls)
2485 		return -EINVAL;
2486 
2487 	dlm_lock_recovery(ls);
2488 
2489 	if (convert)
2490 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
2491 	else
2492 		error = create_lkb(ls, &lkb);
2493 
2494 	if (error)
2495 		goto out;
2496 
2497 	error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2498 			      astarg, bast, &args);
2499 	if (error)
2500 		goto out_put;
2501 
2502 	if (convert)
2503 		error = convert_lock(ls, lkb, &args);
2504 	else
2505 		error = request_lock(ls, lkb, name, namelen, &args);
2506 
2507 	if (error == -EINPROGRESS)
2508 		error = 0;
2509  out_put:
2510 	if (convert || error)
2511 		__put_lkb(ls, lkb);
2512 	if (error == -EAGAIN || error == -EDEADLK)
2513 		error = 0;
2514  out:
2515 	dlm_unlock_recovery(ls);
2516 	dlm_put_lockspace(ls);
2517 	return error;
2518 }
2519 
2520 int dlm_unlock(dlm_lockspace_t *lockspace,
2521 	       uint32_t lkid,
2522 	       uint32_t flags,
2523 	       struct dlm_lksb *lksb,
2524 	       void *astarg)
2525 {
2526 	struct dlm_ls *ls;
2527 	struct dlm_lkb *lkb;
2528 	struct dlm_args args;
2529 	int error;
2530 
2531 	ls = dlm_find_lockspace_local(lockspace);
2532 	if (!ls)
2533 		return -EINVAL;
2534 
2535 	dlm_lock_recovery(ls);
2536 
2537 	error = find_lkb(ls, lkid, &lkb);
2538 	if (error)
2539 		goto out;
2540 
2541 	error = set_unlock_args(flags, astarg, &args);
2542 	if (error)
2543 		goto out_put;
2544 
2545 	if (flags & DLM_LKF_CANCEL)
2546 		error = cancel_lock(ls, lkb, &args);
2547 	else
2548 		error = unlock_lock(ls, lkb, &args);
2549 
2550 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2551 		error = 0;
2552 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2553 		error = 0;
2554  out_put:
2555 	dlm_put_lkb(lkb);
2556  out:
2557 	dlm_unlock_recovery(ls);
2558 	dlm_put_lockspace(ls);
2559 	return error;
2560 }
2561 
2562 /*
2563  * send/receive routines for remote operations and replies
2564  *
2565  * send_args
2566  * send_common
2567  * send_request			receive_request
2568  * send_convert			receive_convert
2569  * send_unlock			receive_unlock
2570  * send_cancel			receive_cancel
2571  * send_grant			receive_grant
2572  * send_bast			receive_bast
2573  * send_lookup			receive_lookup
2574  * send_remove			receive_remove
2575  *
2576  * 				send_common_reply
2577  * receive_request_reply	send_request_reply
2578  * receive_convert_reply	send_convert_reply
2579  * receive_unlock_reply		send_unlock_reply
2580  * receive_cancel_reply		send_cancel_reply
2581  * receive_lookup_reply		send_lookup_reply
2582  */
2583 
2584 static int _create_message(struct dlm_ls *ls, int mb_len,
2585 			   int to_nodeid, int mstype,
2586 			   struct dlm_message **ms_ret,
2587 			   struct dlm_mhandle **mh_ret)
2588 {
2589 	struct dlm_message *ms;
2590 	struct dlm_mhandle *mh;
2591 	char *mb;
2592 
2593 	/* get_buffer gives us a message handle (mh) that we need to
2594 	   pass into lowcomms_commit and a message buffer (mb) that we
2595 	   write our data into */
2596 
2597 	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
2598 	if (!mh)
2599 		return -ENOBUFS;
2600 
2601 	memset(mb, 0, mb_len);
2602 
2603 	ms = (struct dlm_message *) mb;
2604 
2605 	ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2606 	ms->m_header.h_lockspace = ls->ls_global_id;
2607 	ms->m_header.h_nodeid = dlm_our_nodeid();
2608 	ms->m_header.h_length = mb_len;
2609 	ms->m_header.h_cmd = DLM_MSG;
2610 
2611 	ms->m_type = mstype;
2612 
2613 	*mh_ret = mh;
2614 	*ms_ret = ms;
2615 	return 0;
2616 }
2617 
2618 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2619 			  int to_nodeid, int mstype,
2620 			  struct dlm_message **ms_ret,
2621 			  struct dlm_mhandle **mh_ret)
2622 {
2623 	int mb_len = sizeof(struct dlm_message);
2624 
2625 	switch (mstype) {
2626 	case DLM_MSG_REQUEST:
2627 	case DLM_MSG_LOOKUP:
2628 	case DLM_MSG_REMOVE:
2629 		mb_len += r->res_length;
2630 		break;
2631 	case DLM_MSG_CONVERT:
2632 	case DLM_MSG_UNLOCK:
2633 	case DLM_MSG_REQUEST_REPLY:
2634 	case DLM_MSG_CONVERT_REPLY:
2635 	case DLM_MSG_GRANT:
2636 		if (lkb && lkb->lkb_lvbptr)
2637 			mb_len += r->res_ls->ls_lvblen;
2638 		break;
2639 	}
2640 
2641 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2642 			       ms_ret, mh_ret);
2643 }
2644 
2645 /* further lowcomms enhancements or alternate implementations may make
2646    the return value from this function useful at some point */
2647 
2648 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2649 {
2650 	dlm_message_out(ms);
2651 	dlm_lowcomms_commit_buffer(mh);
2652 	return 0;
2653 }
2654 
2655 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2656 		      struct dlm_message *ms)
2657 {
2658 	ms->m_nodeid   = lkb->lkb_nodeid;
2659 	ms->m_pid      = lkb->lkb_ownpid;
2660 	ms->m_lkid     = lkb->lkb_id;
2661 	ms->m_remid    = lkb->lkb_remid;
2662 	ms->m_exflags  = lkb->lkb_exflags;
2663 	ms->m_sbflags  = lkb->lkb_sbflags;
2664 	ms->m_flags    = lkb->lkb_flags;
2665 	ms->m_lvbseq   = lkb->lkb_lvbseq;
2666 	ms->m_status   = lkb->lkb_status;
2667 	ms->m_grmode   = lkb->lkb_grmode;
2668 	ms->m_rqmode   = lkb->lkb_rqmode;
2669 	ms->m_hash     = r->res_hash;
2670 
2671 	/* m_result and m_bastmode are set from function args,
2672 	   not from lkb fields */
2673 
2674 	if (lkb->lkb_bastaddr)
2675 		ms->m_asts |= AST_BAST;
2676 	if (lkb->lkb_astaddr)
2677 		ms->m_asts |= AST_COMP;
2678 
2679 	/* compare with switch in create_message; send_remove() doesn't
2680 	   use send_args() */
2681 
2682 	switch (ms->m_type) {
2683 	case DLM_MSG_REQUEST:
2684 	case DLM_MSG_LOOKUP:
2685 		memcpy(ms->m_extra, r->res_name, r->res_length);
2686 		break;
2687 	case DLM_MSG_CONVERT:
2688 	case DLM_MSG_UNLOCK:
2689 	case DLM_MSG_REQUEST_REPLY:
2690 	case DLM_MSG_CONVERT_REPLY:
2691 	case DLM_MSG_GRANT:
2692 		if (!lkb->lkb_lvbptr)
2693 			break;
2694 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2695 		break;
2696 	}
2697 }
2698 
2699 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2700 {
2701 	struct dlm_message *ms;
2702 	struct dlm_mhandle *mh;
2703 	int to_nodeid, error;
2704 
2705 	error = add_to_waiters(lkb, mstype);
2706 	if (error)
2707 		return error;
2708 
2709 	to_nodeid = r->res_nodeid;
2710 
2711 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2712 	if (error)
2713 		goto fail;
2714 
2715 	send_args(r, lkb, ms);
2716 
2717 	error = send_message(mh, ms);
2718 	if (error)
2719 		goto fail;
2720 	return 0;
2721 
2722  fail:
2723 	remove_from_waiters(lkb, msg_reply_type(mstype));
2724 	return error;
2725 }
2726 
2727 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2728 {
2729 	return send_common(r, lkb, DLM_MSG_REQUEST);
2730 }
2731 
2732 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2733 {
2734 	int error;
2735 
2736 	error = send_common(r, lkb, DLM_MSG_CONVERT);
2737 
2738 	/* down conversions go without a reply from the master */
2739 	if (!error && down_conversion(lkb)) {
2740 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2741 		r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2742 		r->res_ls->ls_stub_ms.m_result = 0;
2743 		r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2744 		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2745 	}
2746 
2747 	return error;
2748 }
2749 
2750 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2751    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2752    that the master is still correct. */
2753 
2754 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2755 {
2756 	return send_common(r, lkb, DLM_MSG_UNLOCK);
2757 }
2758 
2759 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2760 {
2761 	return send_common(r, lkb, DLM_MSG_CANCEL);
2762 }
2763 
2764 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2765 {
2766 	struct dlm_message *ms;
2767 	struct dlm_mhandle *mh;
2768 	int to_nodeid, error;
2769 
2770 	to_nodeid = lkb->lkb_nodeid;
2771 
2772 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2773 	if (error)
2774 		goto out;
2775 
2776 	send_args(r, lkb, ms);
2777 
2778 	ms->m_result = 0;
2779 
2780 	error = send_message(mh, ms);
2781  out:
2782 	return error;
2783 }
2784 
2785 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2786 {
2787 	struct dlm_message *ms;
2788 	struct dlm_mhandle *mh;
2789 	int to_nodeid, error;
2790 
2791 	to_nodeid = lkb->lkb_nodeid;
2792 
2793 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2794 	if (error)
2795 		goto out;
2796 
2797 	send_args(r, lkb, ms);
2798 
2799 	ms->m_bastmode = mode;
2800 
2801 	error = send_message(mh, ms);
2802  out:
2803 	return error;
2804 }
2805 
2806 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2807 {
2808 	struct dlm_message *ms;
2809 	struct dlm_mhandle *mh;
2810 	int to_nodeid, error;
2811 
2812 	error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2813 	if (error)
2814 		return error;
2815 
2816 	to_nodeid = dlm_dir_nodeid(r);
2817 
2818 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2819 	if (error)
2820 		goto fail;
2821 
2822 	send_args(r, lkb, ms);
2823 
2824 	error = send_message(mh, ms);
2825 	if (error)
2826 		goto fail;
2827 	return 0;
2828 
2829  fail:
2830 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2831 	return error;
2832 }
2833 
2834 static int send_remove(struct dlm_rsb *r)
2835 {
2836 	struct dlm_message *ms;
2837 	struct dlm_mhandle *mh;
2838 	int to_nodeid, error;
2839 
2840 	to_nodeid = dlm_dir_nodeid(r);
2841 
2842 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2843 	if (error)
2844 		goto out;
2845 
2846 	memcpy(ms->m_extra, r->res_name, r->res_length);
2847 	ms->m_hash = r->res_hash;
2848 
2849 	error = send_message(mh, ms);
2850  out:
2851 	return error;
2852 }
2853 
2854 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2855 			     int mstype, int rv)
2856 {
2857 	struct dlm_message *ms;
2858 	struct dlm_mhandle *mh;
2859 	int to_nodeid, error;
2860 
2861 	to_nodeid = lkb->lkb_nodeid;
2862 
2863 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2864 	if (error)
2865 		goto out;
2866 
2867 	send_args(r, lkb, ms);
2868 
2869 	ms->m_result = rv;
2870 
2871 	error = send_message(mh, ms);
2872  out:
2873 	return error;
2874 }
2875 
2876 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2877 {
2878 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2879 }
2880 
2881 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2882 {
2883 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2884 }
2885 
2886 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2887 {
2888 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2889 }
2890 
2891 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2892 {
2893 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2894 }
2895 
2896 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2897 			     int ret_nodeid, int rv)
2898 {
2899 	struct dlm_rsb *r = &ls->ls_stub_rsb;
2900 	struct dlm_message *ms;
2901 	struct dlm_mhandle *mh;
2902 	int error, nodeid = ms_in->m_header.h_nodeid;
2903 
2904 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2905 	if (error)
2906 		goto out;
2907 
2908 	ms->m_lkid = ms_in->m_lkid;
2909 	ms->m_result = rv;
2910 	ms->m_nodeid = ret_nodeid;
2911 
2912 	error = send_message(mh, ms);
2913  out:
2914 	return error;
2915 }
2916 
2917 /* which args we save from a received message depends heavily on the type
2918    of message, unlike the send side where we can safely send everything about
2919    the lkb for any type of message */
2920 
2921 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2922 {
2923 	lkb->lkb_exflags = ms->m_exflags;
2924 	lkb->lkb_sbflags = ms->m_sbflags;
2925 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2926 		         (ms->m_flags & 0x0000FFFF);
2927 }
2928 
2929 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2930 {
2931 	lkb->lkb_sbflags = ms->m_sbflags;
2932 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2933 		         (ms->m_flags & 0x0000FFFF);
2934 }
2935 
2936 static int receive_extralen(struct dlm_message *ms)
2937 {
2938 	return (ms->m_header.h_length - sizeof(struct dlm_message));
2939 }
2940 
2941 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2942 		       struct dlm_message *ms)
2943 {
2944 	int len;
2945 
2946 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2947 		if (!lkb->lkb_lvbptr)
2948 			lkb->lkb_lvbptr = allocate_lvb(ls);
2949 		if (!lkb->lkb_lvbptr)
2950 			return -ENOMEM;
2951 		len = receive_extralen(ms);
2952 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2953 	}
2954 	return 0;
2955 }
2956 
2957 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2958 				struct dlm_message *ms)
2959 {
2960 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
2961 	lkb->lkb_ownpid = ms->m_pid;
2962 	lkb->lkb_remid = ms->m_lkid;
2963 	lkb->lkb_grmode = DLM_LOCK_IV;
2964 	lkb->lkb_rqmode = ms->m_rqmode;
2965 	lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2966 	lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2967 
2968 	DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2969 
2970 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2971 		/* lkb was just created so there won't be an lvb yet */
2972 		lkb->lkb_lvbptr = allocate_lvb(ls);
2973 		if (!lkb->lkb_lvbptr)
2974 			return -ENOMEM;
2975 	}
2976 
2977 	return 0;
2978 }
2979 
2980 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2981 				struct dlm_message *ms)
2982 {
2983 	if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2984 		log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2985 			  lkb->lkb_nodeid, ms->m_header.h_nodeid,
2986 			  lkb->lkb_id, lkb->lkb_remid);
2987 		return -EINVAL;
2988 	}
2989 
2990 	if (!is_master_copy(lkb))
2991 		return -EINVAL;
2992 
2993 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2994 		return -EBUSY;
2995 
2996 	if (receive_lvb(ls, lkb, ms))
2997 		return -ENOMEM;
2998 
2999 	lkb->lkb_rqmode = ms->m_rqmode;
3000 	lkb->lkb_lvbseq = ms->m_lvbseq;
3001 
3002 	return 0;
3003 }
3004 
3005 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3006 			       struct dlm_message *ms)
3007 {
3008 	if (!is_master_copy(lkb))
3009 		return -EINVAL;
3010 	if (receive_lvb(ls, lkb, ms))
3011 		return -ENOMEM;
3012 	return 0;
3013 }
3014 
3015 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3016    uses to send a reply and that the remote end uses to process the reply. */
3017 
3018 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3019 {
3020 	struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3021 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3022 	lkb->lkb_remid = ms->m_lkid;
3023 }
3024 
3025 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3026 {
3027 	struct dlm_lkb *lkb;
3028 	struct dlm_rsb *r;
3029 	int error, namelen;
3030 
3031 	error = create_lkb(ls, &lkb);
3032 	if (error)
3033 		goto fail;
3034 
3035 	receive_flags(lkb, ms);
3036 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
3037 	error = receive_request_args(ls, lkb, ms);
3038 	if (error) {
3039 		__put_lkb(ls, lkb);
3040 		goto fail;
3041 	}
3042 
3043 	namelen = receive_extralen(ms);
3044 
3045 	error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3046 	if (error) {
3047 		__put_lkb(ls, lkb);
3048 		goto fail;
3049 	}
3050 
3051 	lock_rsb(r);
3052 
3053 	attach_lkb(r, lkb);
3054 	error = do_request(r, lkb);
3055 	send_request_reply(r, lkb, error);
3056 
3057 	unlock_rsb(r);
3058 	put_rsb(r);
3059 
3060 	if (error == -EINPROGRESS)
3061 		error = 0;
3062 	if (error)
3063 		dlm_put_lkb(lkb);
3064 	return;
3065 
3066  fail:
3067 	setup_stub_lkb(ls, ms);
3068 	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3069 }
3070 
3071 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3072 {
3073 	struct dlm_lkb *lkb;
3074 	struct dlm_rsb *r;
3075 	int error, reply = 1;
3076 
3077 	error = find_lkb(ls, ms->m_remid, &lkb);
3078 	if (error)
3079 		goto fail;
3080 
3081 	r = lkb->lkb_resource;
3082 
3083 	hold_rsb(r);
3084 	lock_rsb(r);
3085 
3086 	receive_flags(lkb, ms);
3087 	error = receive_convert_args(ls, lkb, ms);
3088 	if (error)
3089 		goto out;
3090 	reply = !down_conversion(lkb);
3091 
3092 	error = do_convert(r, lkb);
3093  out:
3094 	if (reply)
3095 		send_convert_reply(r, lkb, error);
3096 
3097 	unlock_rsb(r);
3098 	put_rsb(r);
3099 	dlm_put_lkb(lkb);
3100 	return;
3101 
3102  fail:
3103 	setup_stub_lkb(ls, ms);
3104 	send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3105 }
3106 
3107 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3108 {
3109 	struct dlm_lkb *lkb;
3110 	struct dlm_rsb *r;
3111 	int error;
3112 
3113 	error = find_lkb(ls, ms->m_remid, &lkb);
3114 	if (error)
3115 		goto fail;
3116 
3117 	r = lkb->lkb_resource;
3118 
3119 	hold_rsb(r);
3120 	lock_rsb(r);
3121 
3122 	receive_flags(lkb, ms);
3123 	error = receive_unlock_args(ls, lkb, ms);
3124 	if (error)
3125 		goto out;
3126 
3127 	error = do_unlock(r, lkb);
3128  out:
3129 	send_unlock_reply(r, lkb, error);
3130 
3131 	unlock_rsb(r);
3132 	put_rsb(r);
3133 	dlm_put_lkb(lkb);
3134 	return;
3135 
3136  fail:
3137 	setup_stub_lkb(ls, ms);
3138 	send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3139 }
3140 
3141 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3142 {
3143 	struct dlm_lkb *lkb;
3144 	struct dlm_rsb *r;
3145 	int error;
3146 
3147 	error = find_lkb(ls, ms->m_remid, &lkb);
3148 	if (error)
3149 		goto fail;
3150 
3151 	receive_flags(lkb, ms);
3152 
3153 	r = lkb->lkb_resource;
3154 
3155 	hold_rsb(r);
3156 	lock_rsb(r);
3157 
3158 	error = do_cancel(r, lkb);
3159 	send_cancel_reply(r, lkb, error);
3160 
3161 	unlock_rsb(r);
3162 	put_rsb(r);
3163 	dlm_put_lkb(lkb);
3164 	return;
3165 
3166  fail:
3167 	setup_stub_lkb(ls, ms);
3168 	send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3169 }
3170 
3171 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3172 {
3173 	struct dlm_lkb *lkb;
3174 	struct dlm_rsb *r;
3175 	int error;
3176 
3177 	error = find_lkb(ls, ms->m_remid, &lkb);
3178 	if (error) {
3179 		log_error(ls, "receive_grant no lkb");
3180 		return;
3181 	}
3182 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3183 
3184 	r = lkb->lkb_resource;
3185 
3186 	hold_rsb(r);
3187 	lock_rsb(r);
3188 
3189 	receive_flags_reply(lkb, ms);
3190 	if (is_altmode(lkb))
3191 		munge_altmode(lkb, ms);
3192 	grant_lock_pc(r, lkb, ms);
3193 	queue_cast(r, lkb, 0);
3194 
3195 	unlock_rsb(r);
3196 	put_rsb(r);
3197 	dlm_put_lkb(lkb);
3198 }
3199 
3200 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3201 {
3202 	struct dlm_lkb *lkb;
3203 	struct dlm_rsb *r;
3204 	int error;
3205 
3206 	error = find_lkb(ls, ms->m_remid, &lkb);
3207 	if (error) {
3208 		log_error(ls, "receive_bast no lkb");
3209 		return;
3210 	}
3211 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3212 
3213 	r = lkb->lkb_resource;
3214 
3215 	hold_rsb(r);
3216 	lock_rsb(r);
3217 
3218 	queue_bast(r, lkb, ms->m_bastmode);
3219 
3220 	unlock_rsb(r);
3221 	put_rsb(r);
3222 	dlm_put_lkb(lkb);
3223 }
3224 
3225 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3226 {
3227 	int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3228 
3229 	from_nodeid = ms->m_header.h_nodeid;
3230 	our_nodeid = dlm_our_nodeid();
3231 
3232 	len = receive_extralen(ms);
3233 
3234 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3235 	if (dir_nodeid != our_nodeid) {
3236 		log_error(ls, "lookup dir_nodeid %d from %d",
3237 			  dir_nodeid, from_nodeid);
3238 		error = -EINVAL;
3239 		ret_nodeid = -1;
3240 		goto out;
3241 	}
3242 
3243 	error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3244 
3245 	/* Optimization: we're master so treat lookup as a request */
3246 	if (!error && ret_nodeid == our_nodeid) {
3247 		receive_request(ls, ms);
3248 		return;
3249 	}
3250  out:
3251 	send_lookup_reply(ls, ms, ret_nodeid, error);
3252 }
3253 
3254 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3255 {
3256 	int len, dir_nodeid, from_nodeid;
3257 
3258 	from_nodeid = ms->m_header.h_nodeid;
3259 
3260 	len = receive_extralen(ms);
3261 
3262 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3263 	if (dir_nodeid != dlm_our_nodeid()) {
3264 		log_error(ls, "remove dir entry dir_nodeid %d from %d",
3265 			  dir_nodeid, from_nodeid);
3266 		return;
3267 	}
3268 
3269 	dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3270 }
3271 
3272 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3273 {
3274 	do_purge(ls, ms->m_nodeid, ms->m_pid);
3275 }
3276 
3277 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3278 {
3279 	struct dlm_lkb *lkb;
3280 	struct dlm_rsb *r;
3281 	int error, mstype, result;
3282 
3283 	error = find_lkb(ls, ms->m_remid, &lkb);
3284 	if (error) {
3285 		log_error(ls, "receive_request_reply no lkb");
3286 		return;
3287 	}
3288 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3289 
3290 	r = lkb->lkb_resource;
3291 	hold_rsb(r);
3292 	lock_rsb(r);
3293 
3294 	mstype = lkb->lkb_wait_type;
3295 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3296 	if (error)
3297 		goto out;
3298 
3299 	/* Optimization: the dir node was also the master, so it took our
3300 	   lookup as a request and sent request reply instead of lookup reply */
3301 	if (mstype == DLM_MSG_LOOKUP) {
3302 		r->res_nodeid = ms->m_header.h_nodeid;
3303 		lkb->lkb_nodeid = r->res_nodeid;
3304 	}
3305 
3306 	/* this is the value returned from do_request() on the master */
3307 	result = ms->m_result;
3308 
3309 	switch (result) {
3310 	case -EAGAIN:
3311 		/* request would block (be queued) on remote master */
3312 		queue_cast(r, lkb, -EAGAIN);
3313 		confirm_master(r, -EAGAIN);
3314 		unhold_lkb(lkb); /* undoes create_lkb() */
3315 		break;
3316 
3317 	case -EINPROGRESS:
3318 	case 0:
3319 		/* request was queued or granted on remote master */
3320 		receive_flags_reply(lkb, ms);
3321 		lkb->lkb_remid = ms->m_lkid;
3322 		if (is_altmode(lkb))
3323 			munge_altmode(lkb, ms);
3324 		if (result) {
3325 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
3326 			add_timeout(lkb);
3327 		} else {
3328 			grant_lock_pc(r, lkb, ms);
3329 			queue_cast(r, lkb, 0);
3330 		}
3331 		confirm_master(r, result);
3332 		break;
3333 
3334 	case -EBADR:
3335 	case -ENOTBLK:
3336 		/* find_rsb failed to find rsb or rsb wasn't master */
3337 		log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3338 			  lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3339 		r->res_nodeid = -1;
3340 		lkb->lkb_nodeid = -1;
3341 
3342 		if (is_overlap(lkb)) {
3343 			/* we'll ignore error in cancel/unlock reply */
3344 			queue_cast_overlap(r, lkb);
3345 			unhold_lkb(lkb); /* undoes create_lkb() */
3346 		} else
3347 			_request_lock(r, lkb);
3348 		break;
3349 
3350 	default:
3351 		log_error(ls, "receive_request_reply %x error %d",
3352 			  lkb->lkb_id, result);
3353 	}
3354 
3355 	if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3356 		log_debug(ls, "receive_request_reply %x result %d unlock",
3357 			  lkb->lkb_id, result);
3358 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3359 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3360 		send_unlock(r, lkb);
3361 	} else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3362 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3363 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3364 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3365 		send_cancel(r, lkb);
3366 	} else {
3367 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3368 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3369 	}
3370  out:
3371 	unlock_rsb(r);
3372 	put_rsb(r);
3373 	dlm_put_lkb(lkb);
3374 }
3375 
3376 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3377 				    struct dlm_message *ms)
3378 {
3379 	/* this is the value returned from do_convert() on the master */
3380 	switch (ms->m_result) {
3381 	case -EAGAIN:
3382 		/* convert would block (be queued) on remote master */
3383 		queue_cast(r, lkb, -EAGAIN);
3384 		break;
3385 
3386 	case -EDEADLK:
3387 		receive_flags_reply(lkb, ms);
3388 		revert_lock_pc(r, lkb);
3389 		queue_cast(r, lkb, -EDEADLK);
3390 		break;
3391 
3392 	case -EINPROGRESS:
3393 		/* convert was queued on remote master */
3394 		receive_flags_reply(lkb, ms);
3395 		if (is_demoted(lkb))
3396 			munge_demoted(lkb, ms);
3397 		del_lkb(r, lkb);
3398 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3399 		add_timeout(lkb);
3400 		break;
3401 
3402 	case 0:
3403 		/* convert was granted on remote master */
3404 		receive_flags_reply(lkb, ms);
3405 		if (is_demoted(lkb))
3406 			munge_demoted(lkb, ms);
3407 		grant_lock_pc(r, lkb, ms);
3408 		queue_cast(r, lkb, 0);
3409 		break;
3410 
3411 	default:
3412 		log_error(r->res_ls, "receive_convert_reply %x error %d",
3413 			  lkb->lkb_id, ms->m_result);
3414 	}
3415 }
3416 
3417 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3418 {
3419 	struct dlm_rsb *r = lkb->lkb_resource;
3420 	int error;
3421 
3422 	hold_rsb(r);
3423 	lock_rsb(r);
3424 
3425 	/* stub reply can happen with waiters_mutex held */
3426 	error = remove_from_waiters_ms(lkb, ms);
3427 	if (error)
3428 		goto out;
3429 
3430 	__receive_convert_reply(r, lkb, ms);
3431  out:
3432 	unlock_rsb(r);
3433 	put_rsb(r);
3434 }
3435 
3436 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3437 {
3438 	struct dlm_lkb *lkb;
3439 	int error;
3440 
3441 	error = find_lkb(ls, ms->m_remid, &lkb);
3442 	if (error) {
3443 		log_error(ls, "receive_convert_reply no lkb");
3444 		return;
3445 	}
3446 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3447 
3448 	_receive_convert_reply(lkb, ms);
3449 	dlm_put_lkb(lkb);
3450 }
3451 
3452 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3453 {
3454 	struct dlm_rsb *r = lkb->lkb_resource;
3455 	int error;
3456 
3457 	hold_rsb(r);
3458 	lock_rsb(r);
3459 
3460 	/* stub reply can happen with waiters_mutex held */
3461 	error = remove_from_waiters_ms(lkb, ms);
3462 	if (error)
3463 		goto out;
3464 
3465 	/* this is the value returned from do_unlock() on the master */
3466 
3467 	switch (ms->m_result) {
3468 	case -DLM_EUNLOCK:
3469 		receive_flags_reply(lkb, ms);
3470 		remove_lock_pc(r, lkb);
3471 		queue_cast(r, lkb, -DLM_EUNLOCK);
3472 		break;
3473 	case -ENOENT:
3474 		break;
3475 	default:
3476 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
3477 			  lkb->lkb_id, ms->m_result);
3478 	}
3479  out:
3480 	unlock_rsb(r);
3481 	put_rsb(r);
3482 }
3483 
3484 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3485 {
3486 	struct dlm_lkb *lkb;
3487 	int error;
3488 
3489 	error = find_lkb(ls, ms->m_remid, &lkb);
3490 	if (error) {
3491 		log_error(ls, "receive_unlock_reply no lkb");
3492 		return;
3493 	}
3494 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3495 
3496 	_receive_unlock_reply(lkb, ms);
3497 	dlm_put_lkb(lkb);
3498 }
3499 
3500 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3501 {
3502 	struct dlm_rsb *r = lkb->lkb_resource;
3503 	int error;
3504 
3505 	hold_rsb(r);
3506 	lock_rsb(r);
3507 
3508 	/* stub reply can happen with waiters_mutex held */
3509 	error = remove_from_waiters_ms(lkb, ms);
3510 	if (error)
3511 		goto out;
3512 
3513 	/* this is the value returned from do_cancel() on the master */
3514 
3515 	switch (ms->m_result) {
3516 	case -DLM_ECANCEL:
3517 		receive_flags_reply(lkb, ms);
3518 		revert_lock_pc(r, lkb);
3519 		queue_cast(r, lkb, -DLM_ECANCEL);
3520 		break;
3521 	case 0:
3522 		break;
3523 	default:
3524 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
3525 			  lkb->lkb_id, ms->m_result);
3526 	}
3527  out:
3528 	unlock_rsb(r);
3529 	put_rsb(r);
3530 }
3531 
3532 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3533 {
3534 	struct dlm_lkb *lkb;
3535 	int error;
3536 
3537 	error = find_lkb(ls, ms->m_remid, &lkb);
3538 	if (error) {
3539 		log_error(ls, "receive_cancel_reply no lkb");
3540 		return;
3541 	}
3542 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3543 
3544 	_receive_cancel_reply(lkb, ms);
3545 	dlm_put_lkb(lkb);
3546 }
3547 
3548 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3549 {
3550 	struct dlm_lkb *lkb;
3551 	struct dlm_rsb *r;
3552 	int error, ret_nodeid;
3553 
3554 	error = find_lkb(ls, ms->m_lkid, &lkb);
3555 	if (error) {
3556 		log_error(ls, "receive_lookup_reply no lkb");
3557 		return;
3558 	}
3559 
3560 	/* ms->m_result is the value returned by dlm_dir_lookup on dir node
3561 	   FIXME: will a non-zero error ever be returned? */
3562 
3563 	r = lkb->lkb_resource;
3564 	hold_rsb(r);
3565 	lock_rsb(r);
3566 
3567 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3568 	if (error)
3569 		goto out;
3570 
3571 	ret_nodeid = ms->m_nodeid;
3572 	if (ret_nodeid == dlm_our_nodeid()) {
3573 		r->res_nodeid = 0;
3574 		ret_nodeid = 0;
3575 		r->res_first_lkid = 0;
3576 	} else {
3577 		/* set_master() will copy res_nodeid to lkb_nodeid */
3578 		r->res_nodeid = ret_nodeid;
3579 	}
3580 
3581 	if (is_overlap(lkb)) {
3582 		log_debug(ls, "receive_lookup_reply %x unlock %x",
3583 			  lkb->lkb_id, lkb->lkb_flags);
3584 		queue_cast_overlap(r, lkb);
3585 		unhold_lkb(lkb); /* undoes create_lkb() */
3586 		goto out_list;
3587 	}
3588 
3589 	_request_lock(r, lkb);
3590 
3591  out_list:
3592 	if (!ret_nodeid)
3593 		process_lookup_list(r);
3594  out:
3595 	unlock_rsb(r);
3596 	put_rsb(r);
3597 	dlm_put_lkb(lkb);
3598 }
3599 
3600 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3601 {
3602 	struct dlm_message *ms = (struct dlm_message *) hd;
3603 	struct dlm_ls *ls;
3604 	int error = 0;
3605 
3606 	if (!recovery)
3607 		dlm_message_in(ms);
3608 
3609 	ls = dlm_find_lockspace_global(hd->h_lockspace);
3610 	if (!ls) {
3611 		log_print("drop message %d from %d for unknown lockspace %d",
3612 			  ms->m_type, nodeid, hd->h_lockspace);
3613 		return -EINVAL;
3614 	}
3615 
3616 	/* recovery may have just ended leaving a bunch of backed-up requests
3617 	   in the requestqueue; wait while dlm_recoverd clears them */
3618 
3619 	if (!recovery)
3620 		dlm_wait_requestqueue(ls);
3621 
3622 	/* recovery may have just started while there were a bunch of
3623 	   in-flight requests -- save them in requestqueue to be processed
3624 	   after recovery.  we can't let dlm_recvd block on the recovery
3625 	   lock.  if dlm_recoverd is calling this function to clear the
3626 	   requestqueue, it needs to be interrupted (-EINTR) if another
3627 	   recovery operation is starting. */
3628 
3629 	while (1) {
3630 		if (dlm_locking_stopped(ls)) {
3631 			if (recovery) {
3632 				error = -EINTR;
3633 				goto out;
3634 			}
3635 			error = dlm_add_requestqueue(ls, nodeid, hd);
3636 			if (error == -EAGAIN)
3637 				continue;
3638 			else {
3639 				error = -EINTR;
3640 				goto out;
3641 			}
3642 		}
3643 
3644 		if (dlm_lock_recovery_try(ls))
3645 			break;
3646 		schedule();
3647 	}
3648 
3649 	switch (ms->m_type) {
3650 
3651 	/* messages sent to a master node */
3652 
3653 	case DLM_MSG_REQUEST:
3654 		receive_request(ls, ms);
3655 		break;
3656 
3657 	case DLM_MSG_CONVERT:
3658 		receive_convert(ls, ms);
3659 		break;
3660 
3661 	case DLM_MSG_UNLOCK:
3662 		receive_unlock(ls, ms);
3663 		break;
3664 
3665 	case DLM_MSG_CANCEL:
3666 		receive_cancel(ls, ms);
3667 		break;
3668 
3669 	/* messages sent from a master node (replies to above) */
3670 
3671 	case DLM_MSG_REQUEST_REPLY:
3672 		receive_request_reply(ls, ms);
3673 		break;
3674 
3675 	case DLM_MSG_CONVERT_REPLY:
3676 		receive_convert_reply(ls, ms);
3677 		break;
3678 
3679 	case DLM_MSG_UNLOCK_REPLY:
3680 		receive_unlock_reply(ls, ms);
3681 		break;
3682 
3683 	case DLM_MSG_CANCEL_REPLY:
3684 		receive_cancel_reply(ls, ms);
3685 		break;
3686 
3687 	/* messages sent from a master node (only two types of async msg) */
3688 
3689 	case DLM_MSG_GRANT:
3690 		receive_grant(ls, ms);
3691 		break;
3692 
3693 	case DLM_MSG_BAST:
3694 		receive_bast(ls, ms);
3695 		break;
3696 
3697 	/* messages sent to a dir node */
3698 
3699 	case DLM_MSG_LOOKUP:
3700 		receive_lookup(ls, ms);
3701 		break;
3702 
3703 	case DLM_MSG_REMOVE:
3704 		receive_remove(ls, ms);
3705 		break;
3706 
3707 	/* messages sent from a dir node (remove has no reply) */
3708 
3709 	case DLM_MSG_LOOKUP_REPLY:
3710 		receive_lookup_reply(ls, ms);
3711 		break;
3712 
3713 	/* other messages */
3714 
3715 	case DLM_MSG_PURGE:
3716 		receive_purge(ls, ms);
3717 		break;
3718 
3719 	default:
3720 		log_error(ls, "unknown message type %d", ms->m_type);
3721 	}
3722 
3723 	dlm_unlock_recovery(ls);
3724  out:
3725 	dlm_put_lockspace(ls);
3726 	dlm_astd_wake();
3727 	return error;
3728 }
3729 
3730 
3731 /*
3732  * Recovery related
3733  */
3734 
3735 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3736 {
3737 	if (middle_conversion(lkb)) {
3738 		hold_lkb(lkb);
3739 		ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3740 		ls->ls_stub_ms.m_result = -EINPROGRESS;
3741 		ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3742 		_receive_convert_reply(lkb, &ls->ls_stub_ms);
3743 
3744 		/* Same special case as in receive_rcom_lock_args() */
3745 		lkb->lkb_grmode = DLM_LOCK_IV;
3746 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3747 		unhold_lkb(lkb);
3748 
3749 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3750 		lkb->lkb_flags |= DLM_IFL_RESEND;
3751 	}
3752 
3753 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3754 	   conversions are async; there's no reply from the remote master */
3755 }
3756 
3757 /* A waiting lkb needs recovery if the master node has failed, or
3758    the master node is changing (only when no directory is used) */
3759 
3760 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3761 {
3762 	if (dlm_is_removed(ls, lkb->lkb_nodeid))
3763 		return 1;
3764 
3765 	if (!dlm_no_directory(ls))
3766 		return 0;
3767 
3768 	if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3769 		return 1;
3770 
3771 	return 0;
3772 }
3773 
3774 /* Recovery for locks that are waiting for replies from nodes that are now
3775    gone.  We can just complete unlocks and cancels by faking a reply from the
3776    dead node.  Requests and up-conversions we flag to be resent after
3777    recovery.  Down-conversions can just be completed with a fake reply like
3778    unlocks.  Conversions between PR and CW need special attention. */
3779 
3780 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3781 {
3782 	struct dlm_lkb *lkb, *safe;
3783 
3784 	mutex_lock(&ls->ls_waiters_mutex);
3785 
3786 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3787 		log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3788 			  lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3789 
3790 		/* all outstanding lookups, regardless of destination  will be
3791 		   resent after recovery is done */
3792 
3793 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3794 			lkb->lkb_flags |= DLM_IFL_RESEND;
3795 			continue;
3796 		}
3797 
3798 		if (!waiter_needs_recovery(ls, lkb))
3799 			continue;
3800 
3801 		switch (lkb->lkb_wait_type) {
3802 
3803 		case DLM_MSG_REQUEST:
3804 			lkb->lkb_flags |= DLM_IFL_RESEND;
3805 			break;
3806 
3807 		case DLM_MSG_CONVERT:
3808 			recover_convert_waiter(ls, lkb);
3809 			break;
3810 
3811 		case DLM_MSG_UNLOCK:
3812 			hold_lkb(lkb);
3813 			ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
3814 			ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3815 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3816 			_receive_unlock_reply(lkb, &ls->ls_stub_ms);
3817 			dlm_put_lkb(lkb);
3818 			break;
3819 
3820 		case DLM_MSG_CANCEL:
3821 			hold_lkb(lkb);
3822 			ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
3823 			ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3824 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3825 			_receive_cancel_reply(lkb, &ls->ls_stub_ms);
3826 			dlm_put_lkb(lkb);
3827 			break;
3828 
3829 		default:
3830 			log_error(ls, "invalid lkb wait_type %d",
3831 				  lkb->lkb_wait_type);
3832 		}
3833 		schedule();
3834 	}
3835 	mutex_unlock(&ls->ls_waiters_mutex);
3836 }
3837 
3838 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
3839 {
3840 	struct dlm_lkb *lkb;
3841 	int found = 0;
3842 
3843 	mutex_lock(&ls->ls_waiters_mutex);
3844 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3845 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
3846 			hold_lkb(lkb);
3847 			found = 1;
3848 			break;
3849 		}
3850 	}
3851 	mutex_unlock(&ls->ls_waiters_mutex);
3852 
3853 	if (!found)
3854 		lkb = NULL;
3855 	return lkb;
3856 }
3857 
3858 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
3859    master or dir-node for r.  Processing the lkb may result in it being placed
3860    back on waiters. */
3861 
3862 /* We do this after normal locking has been enabled and any saved messages
3863    (in requestqueue) have been processed.  We should be confident that at
3864    this point we won't get or process a reply to any of these waiting
3865    operations.  But, new ops may be coming in on the rsbs/locks here from
3866    userspace or remotely. */
3867 
3868 /* there may have been an overlap unlock/cancel prior to recovery or after
3869    recovery.  if before, the lkb may still have a pos wait_count; if after, the
3870    overlap flag would just have been set and nothing new sent.  we can be
3871    confident here than any replies to either the initial op or overlap ops
3872    prior to recovery have been received. */
3873 
3874 int dlm_recover_waiters_post(struct dlm_ls *ls)
3875 {
3876 	struct dlm_lkb *lkb;
3877 	struct dlm_rsb *r;
3878 	int error = 0, mstype, err, oc, ou;
3879 
3880 	while (1) {
3881 		if (dlm_locking_stopped(ls)) {
3882 			log_debug(ls, "recover_waiters_post aborted");
3883 			error = -EINTR;
3884 			break;
3885 		}
3886 
3887 		lkb = find_resend_waiter(ls);
3888 		if (!lkb)
3889 			break;
3890 
3891 		r = lkb->lkb_resource;
3892 		hold_rsb(r);
3893 		lock_rsb(r);
3894 
3895 		mstype = lkb->lkb_wait_type;
3896 		oc = is_overlap_cancel(lkb);
3897 		ou = is_overlap_unlock(lkb);
3898 		err = 0;
3899 
3900 		log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3901 			  lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3902 
3903 		/* At this point we assume that we won't get a reply to any
3904 		   previous op or overlap op on this lock.  First, do a big
3905 		   remove_from_waiters() for all previous ops. */
3906 
3907 		lkb->lkb_flags &= ~DLM_IFL_RESEND;
3908 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3909 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3910 		lkb->lkb_wait_type = 0;
3911 		lkb->lkb_wait_count = 0;
3912 		mutex_lock(&ls->ls_waiters_mutex);
3913 		list_del_init(&lkb->lkb_wait_reply);
3914 		mutex_unlock(&ls->ls_waiters_mutex);
3915 		unhold_lkb(lkb); /* for waiters list */
3916 
3917 		if (oc || ou) {
3918 			/* do an unlock or cancel instead of resending */
3919 			switch (mstype) {
3920 			case DLM_MSG_LOOKUP:
3921 			case DLM_MSG_REQUEST:
3922 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3923 							-DLM_ECANCEL);
3924 				unhold_lkb(lkb); /* undoes create_lkb() */
3925 				break;
3926 			case DLM_MSG_CONVERT:
3927 				if (oc) {
3928 					queue_cast(r, lkb, -DLM_ECANCEL);
3929 				} else {
3930 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3931 					_unlock_lock(r, lkb);
3932 				}
3933 				break;
3934 			default:
3935 				err = 1;
3936 			}
3937 		} else {
3938 			switch (mstype) {
3939 			case DLM_MSG_LOOKUP:
3940 			case DLM_MSG_REQUEST:
3941 				_request_lock(r, lkb);
3942 				if (is_master(r))
3943 					confirm_master(r, 0);
3944 				break;
3945 			case DLM_MSG_CONVERT:
3946 				_convert_lock(r, lkb);
3947 				break;
3948 			default:
3949 				err = 1;
3950 			}
3951 		}
3952 
3953 		if (err)
3954 			log_error(ls, "recover_waiters_post %x %d %x %d %d",
3955 			  	  lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3956 		unlock_rsb(r);
3957 		put_rsb(r);
3958 		dlm_put_lkb(lkb);
3959 	}
3960 
3961 	return error;
3962 }
3963 
3964 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3965 			int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3966 {
3967 	struct dlm_ls *ls = r->res_ls;
3968 	struct dlm_lkb *lkb, *safe;
3969 
3970 	list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3971 		if (test(ls, lkb)) {
3972 			rsb_set_flag(r, RSB_LOCKS_PURGED);
3973 			del_lkb(r, lkb);
3974 			/* this put should free the lkb */
3975 			if (!dlm_put_lkb(lkb))
3976 				log_error(ls, "purged lkb not released");
3977 		}
3978 	}
3979 }
3980 
3981 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3982 {
3983 	return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3984 }
3985 
3986 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3987 {
3988 	return is_master_copy(lkb);
3989 }
3990 
3991 static void purge_dead_locks(struct dlm_rsb *r)
3992 {
3993 	purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3994 	purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3995 	purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3996 }
3997 
3998 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3999 {
4000 	purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4001 	purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4002 	purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4003 }
4004 
4005 /* Get rid of locks held by nodes that are gone. */
4006 
4007 int dlm_purge_locks(struct dlm_ls *ls)
4008 {
4009 	struct dlm_rsb *r;
4010 
4011 	log_debug(ls, "dlm_purge_locks");
4012 
4013 	down_write(&ls->ls_root_sem);
4014 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4015 		hold_rsb(r);
4016 		lock_rsb(r);
4017 		if (is_master(r))
4018 			purge_dead_locks(r);
4019 		unlock_rsb(r);
4020 		unhold_rsb(r);
4021 
4022 		schedule();
4023 	}
4024 	up_write(&ls->ls_root_sem);
4025 
4026 	return 0;
4027 }
4028 
4029 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4030 {
4031 	struct dlm_rsb *r, *r_ret = NULL;
4032 
4033 	read_lock(&ls->ls_rsbtbl[bucket].lock);
4034 	list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4035 		if (!rsb_flag(r, RSB_LOCKS_PURGED))
4036 			continue;
4037 		hold_rsb(r);
4038 		rsb_clear_flag(r, RSB_LOCKS_PURGED);
4039 		r_ret = r;
4040 		break;
4041 	}
4042 	read_unlock(&ls->ls_rsbtbl[bucket].lock);
4043 	return r_ret;
4044 }
4045 
4046 void dlm_grant_after_purge(struct dlm_ls *ls)
4047 {
4048 	struct dlm_rsb *r;
4049 	int bucket = 0;
4050 
4051 	while (1) {
4052 		r = find_purged_rsb(ls, bucket);
4053 		if (!r) {
4054 			if (bucket == ls->ls_rsbtbl_size - 1)
4055 				break;
4056 			bucket++;
4057 			continue;
4058 		}
4059 		lock_rsb(r);
4060 		if (is_master(r)) {
4061 			grant_pending_locks(r);
4062 			confirm_master(r, 0);
4063 		}
4064 		unlock_rsb(r);
4065 		put_rsb(r);
4066 		schedule();
4067 	}
4068 }
4069 
4070 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4071 					 uint32_t remid)
4072 {
4073 	struct dlm_lkb *lkb;
4074 
4075 	list_for_each_entry(lkb, head, lkb_statequeue) {
4076 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4077 			return lkb;
4078 	}
4079 	return NULL;
4080 }
4081 
4082 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4083 				    uint32_t remid)
4084 {
4085 	struct dlm_lkb *lkb;
4086 
4087 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4088 	if (lkb)
4089 		return lkb;
4090 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4091 	if (lkb)
4092 		return lkb;
4093 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4094 	if (lkb)
4095 		return lkb;
4096 	return NULL;
4097 }
4098 
4099 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4100 				  struct dlm_rsb *r, struct dlm_rcom *rc)
4101 {
4102 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4103 	int lvblen;
4104 
4105 	lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4106 	lkb->lkb_ownpid = rl->rl_ownpid;
4107 	lkb->lkb_remid = rl->rl_lkid;
4108 	lkb->lkb_exflags = rl->rl_exflags;
4109 	lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
4110 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
4111 	lkb->lkb_lvbseq = rl->rl_lvbseq;
4112 	lkb->lkb_rqmode = rl->rl_rqmode;
4113 	lkb->lkb_grmode = rl->rl_grmode;
4114 	/* don't set lkb_status because add_lkb wants to itself */
4115 
4116 	lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
4117 	lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
4118 
4119 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4120 		lkb->lkb_lvbptr = allocate_lvb(ls);
4121 		if (!lkb->lkb_lvbptr)
4122 			return -ENOMEM;
4123 		lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4124 			 sizeof(struct rcom_lock);
4125 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4126 	}
4127 
4128 	/* Conversions between PR and CW (middle modes) need special handling.
4129 	   The real granted mode of these converting locks cannot be determined
4130 	   until all locks have been rebuilt on the rsb (recover_conversion) */
4131 
4132 	if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
4133 		rl->rl_status = DLM_LKSTS_CONVERT;
4134 		lkb->lkb_grmode = DLM_LOCK_IV;
4135 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
4136 	}
4137 
4138 	return 0;
4139 }
4140 
4141 /* This lkb may have been recovered in a previous aborted recovery so we need
4142    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4143    If so we just send back a standard reply.  If not, we create a new lkb with
4144    the given values and send back our lkid.  We send back our lkid by sending
4145    back the rcom_lock struct we got but with the remid field filled in. */
4146 
4147 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4148 {
4149 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4150 	struct dlm_rsb *r;
4151 	struct dlm_lkb *lkb;
4152 	int error;
4153 
4154 	if (rl->rl_parent_lkid) {
4155 		error = -EOPNOTSUPP;
4156 		goto out;
4157 	}
4158 
4159 	error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
4160 	if (error)
4161 		goto out;
4162 
4163 	lock_rsb(r);
4164 
4165 	lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
4166 	if (lkb) {
4167 		error = -EEXIST;
4168 		goto out_remid;
4169 	}
4170 
4171 	error = create_lkb(ls, &lkb);
4172 	if (error)
4173 		goto out_unlock;
4174 
4175 	error = receive_rcom_lock_args(ls, lkb, r, rc);
4176 	if (error) {
4177 		__put_lkb(ls, lkb);
4178 		goto out_unlock;
4179 	}
4180 
4181 	attach_lkb(r, lkb);
4182 	add_lkb(r, lkb, rl->rl_status);
4183 	error = 0;
4184 
4185  out_remid:
4186 	/* this is the new value returned to the lock holder for
4187 	   saving in its process-copy lkb */
4188 	rl->rl_remid = lkb->lkb_id;
4189 
4190  out_unlock:
4191 	unlock_rsb(r);
4192 	put_rsb(r);
4193  out:
4194 	if (error)
4195 		log_print("recover_master_copy %d %x", error, rl->rl_lkid);
4196 	rl->rl_result = error;
4197 	return error;
4198 }
4199 
4200 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4201 {
4202 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4203 	struct dlm_rsb *r;
4204 	struct dlm_lkb *lkb;
4205 	int error;
4206 
4207 	error = find_lkb(ls, rl->rl_lkid, &lkb);
4208 	if (error) {
4209 		log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
4210 		return error;
4211 	}
4212 
4213 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4214 
4215 	error = rl->rl_result;
4216 
4217 	r = lkb->lkb_resource;
4218 	hold_rsb(r);
4219 	lock_rsb(r);
4220 
4221 	switch (error) {
4222 	case -EBADR:
4223 		/* There's a chance the new master received our lock before
4224 		   dlm_recover_master_reply(), this wouldn't happen if we did
4225 		   a barrier between recover_masters and recover_locks. */
4226 		log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4227 			  (unsigned long)r, r->res_name);
4228 		dlm_send_rcom_lock(r, lkb);
4229 		goto out;
4230 	case -EEXIST:
4231 		log_debug(ls, "master copy exists %x", lkb->lkb_id);
4232 		/* fall through */
4233 	case 0:
4234 		lkb->lkb_remid = rl->rl_remid;
4235 		break;
4236 	default:
4237 		log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4238 			  error, lkb->lkb_id);
4239 	}
4240 
4241 	/* an ack for dlm_recover_locks() which waits for replies from
4242 	   all the locks it sends to new masters */
4243 	dlm_recovered_lock(r);
4244  out:
4245 	unlock_rsb(r);
4246 	put_rsb(r);
4247 	dlm_put_lkb(lkb);
4248 
4249 	return 0;
4250 }
4251 
4252 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4253 		     int mode, uint32_t flags, void *name, unsigned int namelen,
4254 		     unsigned long timeout_cs)
4255 {
4256 	struct dlm_lkb *lkb;
4257 	struct dlm_args args;
4258 	int error;
4259 
4260 	dlm_lock_recovery(ls);
4261 
4262 	error = create_lkb(ls, &lkb);
4263 	if (error) {
4264 		kfree(ua);
4265 		goto out;
4266 	}
4267 
4268 	if (flags & DLM_LKF_VALBLK) {
4269 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4270 		if (!ua->lksb.sb_lvbptr) {
4271 			kfree(ua);
4272 			__put_lkb(ls, lkb);
4273 			error = -ENOMEM;
4274 			goto out;
4275 		}
4276 	}
4277 
4278 	/* After ua is attached to lkb it will be freed by free_lkb().
4279 	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
4280 	   lock and that lkb_astparam is the dlm_user_args structure. */
4281 
4282 	error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4283 			      DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
4284 	lkb->lkb_flags |= DLM_IFL_USER;
4285 	ua->old_mode = DLM_LOCK_IV;
4286 
4287 	if (error) {
4288 		__put_lkb(ls, lkb);
4289 		goto out;
4290 	}
4291 
4292 	error = request_lock(ls, lkb, name, namelen, &args);
4293 
4294 	switch (error) {
4295 	case 0:
4296 		break;
4297 	case -EINPROGRESS:
4298 		error = 0;
4299 		break;
4300 	case -EAGAIN:
4301 		error = 0;
4302 		/* fall through */
4303 	default:
4304 		__put_lkb(ls, lkb);
4305 		goto out;
4306 	}
4307 
4308 	/* add this new lkb to the per-process list of locks */
4309 	spin_lock(&ua->proc->locks_spin);
4310 	hold_lkb(lkb);
4311 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4312 	spin_unlock(&ua->proc->locks_spin);
4313  out:
4314 	dlm_unlock_recovery(ls);
4315 	return error;
4316 }
4317 
4318 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4319 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4320 		     unsigned long timeout_cs)
4321 {
4322 	struct dlm_lkb *lkb;
4323 	struct dlm_args args;
4324 	struct dlm_user_args *ua;
4325 	int error;
4326 
4327 	dlm_lock_recovery(ls);
4328 
4329 	error = find_lkb(ls, lkid, &lkb);
4330 	if (error)
4331 		goto out;
4332 
4333 	/* user can change the params on its lock when it converts it, or
4334 	   add an lvb that didn't exist before */
4335 
4336 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
4337 
4338 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4339 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4340 		if (!ua->lksb.sb_lvbptr) {
4341 			error = -ENOMEM;
4342 			goto out_put;
4343 		}
4344 	}
4345 	if (lvb_in && ua->lksb.sb_lvbptr)
4346 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4347 
4348 	ua->xid = ua_tmp->xid;
4349 	ua->castparam = ua_tmp->castparam;
4350 	ua->castaddr = ua_tmp->castaddr;
4351 	ua->bastparam = ua_tmp->bastparam;
4352 	ua->bastaddr = ua_tmp->bastaddr;
4353 	ua->user_lksb = ua_tmp->user_lksb;
4354 	ua->old_mode = lkb->lkb_grmode;
4355 
4356 	error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4357 			      DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
4358 	if (error)
4359 		goto out_put;
4360 
4361 	error = convert_lock(ls, lkb, &args);
4362 
4363 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4364 		error = 0;
4365  out_put:
4366 	dlm_put_lkb(lkb);
4367  out:
4368 	dlm_unlock_recovery(ls);
4369 	kfree(ua_tmp);
4370 	return error;
4371 }
4372 
4373 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4374 		    uint32_t flags, uint32_t lkid, char *lvb_in)
4375 {
4376 	struct dlm_lkb *lkb;
4377 	struct dlm_args args;
4378 	struct dlm_user_args *ua;
4379 	int error;
4380 
4381 	dlm_lock_recovery(ls);
4382 
4383 	error = find_lkb(ls, lkid, &lkb);
4384 	if (error)
4385 		goto out;
4386 
4387 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
4388 
4389 	if (lvb_in && ua->lksb.sb_lvbptr)
4390 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4391 	ua->castparam = ua_tmp->castparam;
4392 	ua->user_lksb = ua_tmp->user_lksb;
4393 
4394 	error = set_unlock_args(flags, ua, &args);
4395 	if (error)
4396 		goto out_put;
4397 
4398 	error = unlock_lock(ls, lkb, &args);
4399 
4400 	if (error == -DLM_EUNLOCK)
4401 		error = 0;
4402 	/* from validate_unlock_args() */
4403 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4404 		error = 0;
4405 	if (error)
4406 		goto out_put;
4407 
4408 	spin_lock(&ua->proc->locks_spin);
4409 	/* dlm_user_add_ast() may have already taken lkb off the proc list */
4410 	if (!list_empty(&lkb->lkb_ownqueue))
4411 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4412 	spin_unlock(&ua->proc->locks_spin);
4413  out_put:
4414 	dlm_put_lkb(lkb);
4415  out:
4416 	dlm_unlock_recovery(ls);
4417 	kfree(ua_tmp);
4418 	return error;
4419 }
4420 
4421 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4422 		    uint32_t flags, uint32_t lkid)
4423 {
4424 	struct dlm_lkb *lkb;
4425 	struct dlm_args args;
4426 	struct dlm_user_args *ua;
4427 	int error;
4428 
4429 	dlm_lock_recovery(ls);
4430 
4431 	error = find_lkb(ls, lkid, &lkb);
4432 	if (error)
4433 		goto out;
4434 
4435 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
4436 	ua->castparam = ua_tmp->castparam;
4437 	ua->user_lksb = ua_tmp->user_lksb;
4438 
4439 	error = set_unlock_args(flags, ua, &args);
4440 	if (error)
4441 		goto out_put;
4442 
4443 	error = cancel_lock(ls, lkb, &args);
4444 
4445 	if (error == -DLM_ECANCEL)
4446 		error = 0;
4447 	/* from validate_unlock_args() */
4448 	if (error == -EBUSY)
4449 		error = 0;
4450  out_put:
4451 	dlm_put_lkb(lkb);
4452  out:
4453 	dlm_unlock_recovery(ls);
4454 	kfree(ua_tmp);
4455 	return error;
4456 }
4457 
4458 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4459 {
4460 	struct dlm_lkb *lkb;
4461 	struct dlm_args args;
4462 	struct dlm_user_args *ua;
4463 	struct dlm_rsb *r;
4464 	int error;
4465 
4466 	dlm_lock_recovery(ls);
4467 
4468 	error = find_lkb(ls, lkid, &lkb);
4469 	if (error)
4470 		goto out;
4471 
4472 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
4473 
4474 	error = set_unlock_args(flags, ua, &args);
4475 	if (error)
4476 		goto out_put;
4477 
4478 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4479 
4480 	r = lkb->lkb_resource;
4481 	hold_rsb(r);
4482 	lock_rsb(r);
4483 
4484 	error = validate_unlock_args(lkb, &args);
4485 	if (error)
4486 		goto out_r;
4487 	lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4488 
4489 	error = _cancel_lock(r, lkb);
4490  out_r:
4491 	unlock_rsb(r);
4492 	put_rsb(r);
4493 
4494 	if (error == -DLM_ECANCEL)
4495 		error = 0;
4496 	/* from validate_unlock_args() */
4497 	if (error == -EBUSY)
4498 		error = 0;
4499  out_put:
4500 	dlm_put_lkb(lkb);
4501  out:
4502 	dlm_unlock_recovery(ls);
4503 	return error;
4504 }
4505 
4506 /* lkb's that are removed from the waiters list by revert are just left on the
4507    orphans list with the granted orphan locks, to be freed by purge */
4508 
4509 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4510 {
4511 	struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4512 	struct dlm_args args;
4513 	int error;
4514 
4515 	hold_lkb(lkb);
4516 	mutex_lock(&ls->ls_orphans_mutex);
4517 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4518 	mutex_unlock(&ls->ls_orphans_mutex);
4519 
4520 	set_unlock_args(0, ua, &args);
4521 
4522 	error = cancel_lock(ls, lkb, &args);
4523 	if (error == -DLM_ECANCEL)
4524 		error = 0;
4525 	return error;
4526 }
4527 
4528 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4529    Regardless of what rsb queue the lock is on, it's removed and freed. */
4530 
4531 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4532 {
4533 	struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4534 	struct dlm_args args;
4535 	int error;
4536 
4537 	set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4538 
4539 	error = unlock_lock(ls, lkb, &args);
4540 	if (error == -DLM_EUNLOCK)
4541 		error = 0;
4542 	return error;
4543 }
4544 
4545 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4546    (which does lock_rsb) due to deadlock with receiving a message that does
4547    lock_rsb followed by dlm_user_add_ast() */
4548 
4549 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4550 				     struct dlm_user_proc *proc)
4551 {
4552 	struct dlm_lkb *lkb = NULL;
4553 
4554 	mutex_lock(&ls->ls_clear_proc_locks);
4555 	if (list_empty(&proc->locks))
4556 		goto out;
4557 
4558 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4559 	list_del_init(&lkb->lkb_ownqueue);
4560 
4561 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4562 		lkb->lkb_flags |= DLM_IFL_ORPHAN;
4563 	else
4564 		lkb->lkb_flags |= DLM_IFL_DEAD;
4565  out:
4566 	mutex_unlock(&ls->ls_clear_proc_locks);
4567 	return lkb;
4568 }
4569 
4570 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4571    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4572    which we clear here. */
4573 
4574 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4575    list, and no more device_writes should add lkb's to proc->locks list; so we
4576    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4577    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4578    them ourself. */
4579 
4580 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4581 {
4582 	struct dlm_lkb *lkb, *safe;
4583 
4584 	dlm_lock_recovery(ls);
4585 
4586 	while (1) {
4587 		lkb = del_proc_lock(ls, proc);
4588 		if (!lkb)
4589 			break;
4590 		del_timeout(lkb);
4591 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4592 			orphan_proc_lock(ls, lkb);
4593 		else
4594 			unlock_proc_lock(ls, lkb);
4595 
4596 		/* this removes the reference for the proc->locks list
4597 		   added by dlm_user_request, it may result in the lkb
4598 		   being freed */
4599 
4600 		dlm_put_lkb(lkb);
4601 	}
4602 
4603 	mutex_lock(&ls->ls_clear_proc_locks);
4604 
4605 	/* in-progress unlocks */
4606 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4607 		list_del_init(&lkb->lkb_ownqueue);
4608 		lkb->lkb_flags |= DLM_IFL_DEAD;
4609 		dlm_put_lkb(lkb);
4610 	}
4611 
4612 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4613 		list_del(&lkb->lkb_astqueue);
4614 		dlm_put_lkb(lkb);
4615 	}
4616 
4617 	mutex_unlock(&ls->ls_clear_proc_locks);
4618 	dlm_unlock_recovery(ls);
4619 }
4620 
4621 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4622 {
4623 	struct dlm_lkb *lkb, *safe;
4624 
4625 	while (1) {
4626 		lkb = NULL;
4627 		spin_lock(&proc->locks_spin);
4628 		if (!list_empty(&proc->locks)) {
4629 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
4630 					 lkb_ownqueue);
4631 			list_del_init(&lkb->lkb_ownqueue);
4632 		}
4633 		spin_unlock(&proc->locks_spin);
4634 
4635 		if (!lkb)
4636 			break;
4637 
4638 		lkb->lkb_flags |= DLM_IFL_DEAD;
4639 		unlock_proc_lock(ls, lkb);
4640 		dlm_put_lkb(lkb); /* ref from proc->locks list */
4641 	}
4642 
4643 	spin_lock(&proc->locks_spin);
4644 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4645 		list_del_init(&lkb->lkb_ownqueue);
4646 		lkb->lkb_flags |= DLM_IFL_DEAD;
4647 		dlm_put_lkb(lkb);
4648 	}
4649 	spin_unlock(&proc->locks_spin);
4650 
4651 	spin_lock(&proc->asts_spin);
4652 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4653 		list_del(&lkb->lkb_astqueue);
4654 		dlm_put_lkb(lkb);
4655 	}
4656 	spin_unlock(&proc->asts_spin);
4657 }
4658 
4659 /* pid of 0 means purge all orphans */
4660 
4661 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4662 {
4663 	struct dlm_lkb *lkb, *safe;
4664 
4665 	mutex_lock(&ls->ls_orphans_mutex);
4666 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4667 		if (pid && lkb->lkb_ownpid != pid)
4668 			continue;
4669 		unlock_proc_lock(ls, lkb);
4670 		list_del_init(&lkb->lkb_ownqueue);
4671 		dlm_put_lkb(lkb);
4672 	}
4673 	mutex_unlock(&ls->ls_orphans_mutex);
4674 }
4675 
4676 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4677 {
4678 	struct dlm_message *ms;
4679 	struct dlm_mhandle *mh;
4680 	int error;
4681 
4682 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4683 				DLM_MSG_PURGE, &ms, &mh);
4684 	if (error)
4685 		return error;
4686 	ms->m_nodeid = nodeid;
4687 	ms->m_pid = pid;
4688 
4689 	return send_message(mh, ms);
4690 }
4691 
4692 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4693 		   int nodeid, int pid)
4694 {
4695 	int error = 0;
4696 
4697 	if (nodeid != dlm_our_nodeid()) {
4698 		error = send_purge(ls, nodeid, pid);
4699 	} else {
4700 		dlm_lock_recovery(ls);
4701 		if (pid == current->pid)
4702 			purge_proc_locks(ls, proc);
4703 		else
4704 			do_purge(ls, nodeid, pid);
4705 		dlm_unlock_recovery(ls);
4706 	}
4707 	return error;
4708 }
4709 
4710