xref: /openbmc/linux/fs/ocfs2/dlm/dlmmaster.c (revision 82ced6fd)
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmmod.c
5  *
6  * standalone DLM module
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26 
27 
28 #include <linux/module.h>
29 #include <linux/fs.h>
30 #include <linux/types.h>
31 #include <linux/slab.h>
32 #include <linux/highmem.h>
33 #include <linux/utsname.h>
34 #include <linux/init.h>
35 #include <linux/sysctl.h>
36 #include <linux/random.h>
37 #include <linux/blkdev.h>
38 #include <linux/socket.h>
39 #include <linux/inet.h>
40 #include <linux/spinlock.h>
41 #include <linux/delay.h>
42 
43 
44 #include "cluster/heartbeat.h"
45 #include "cluster/nodemanager.h"
46 #include "cluster/tcp.h"
47 
48 #include "dlmapi.h"
49 #include "dlmcommon.h"
50 #include "dlmdomain.h"
51 #include "dlmdebug.h"
52 
53 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
54 #include "cluster/masklog.h"
55 
56 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
57 			      struct dlm_master_list_entry *mle,
58 			      struct o2nm_node *node,
59 			      int idx);
60 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
61 			    struct dlm_master_list_entry *mle,
62 			    struct o2nm_node *node,
63 			    int idx);
64 
65 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
66 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
67 				struct dlm_lock_resource *res,
68 				void *nodemap, u32 flags);
69 static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
70 
71 static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
72 				struct dlm_master_list_entry *mle,
73 				const char *name,
74 				unsigned int namelen)
75 {
76 	if (dlm != mle->dlm)
77 		return 0;
78 
79 	if (namelen != mle->mnamelen ||
80 	    memcmp(name, mle->mname, namelen) != 0)
81 		return 0;
82 
83 	return 1;
84 }
85 
86 static struct kmem_cache *dlm_lockres_cache = NULL;
87 static struct kmem_cache *dlm_lockname_cache = NULL;
88 static struct kmem_cache *dlm_mle_cache = NULL;
89 
90 static void dlm_mle_release(struct kref *kref);
91 static void dlm_init_mle(struct dlm_master_list_entry *mle,
92 			enum dlm_mle_type type,
93 			struct dlm_ctxt *dlm,
94 			struct dlm_lock_resource *res,
95 			const char *name,
96 			unsigned int namelen);
97 static void dlm_put_mle(struct dlm_master_list_entry *mle);
98 static void __dlm_put_mle(struct dlm_master_list_entry *mle);
99 static int dlm_find_mle(struct dlm_ctxt *dlm,
100 			struct dlm_master_list_entry **mle,
101 			char *name, unsigned int namelen);
102 
103 static int dlm_do_master_request(struct dlm_lock_resource *res,
104 				 struct dlm_master_list_entry *mle, int to);
105 
106 
107 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
108 				     struct dlm_lock_resource *res,
109 				     struct dlm_master_list_entry *mle,
110 				     int *blocked);
111 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
112 				    struct dlm_lock_resource *res,
113 				    struct dlm_master_list_entry *mle,
114 				    int blocked);
115 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
116 				 struct dlm_lock_resource *res,
117 				 struct dlm_master_list_entry *mle,
118 				 struct dlm_master_list_entry **oldmle,
119 				 const char *name, unsigned int namelen,
120 				 u8 new_master, u8 master);
121 
122 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
123 				    struct dlm_lock_resource *res);
124 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
125 				      struct dlm_lock_resource *res);
126 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
127 				       struct dlm_lock_resource *res,
128 				       u8 target);
129 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
130 				       struct dlm_lock_resource *res);
131 
132 
133 int dlm_is_host_down(int errno)
134 {
135 	switch (errno) {
136 		case -EBADF:
137 		case -ECONNREFUSED:
138 		case -ENOTCONN:
139 		case -ECONNRESET:
140 		case -EPIPE:
141 		case -EHOSTDOWN:
142 		case -EHOSTUNREACH:
143 		case -ETIMEDOUT:
144 		case -ECONNABORTED:
145 		case -ENETDOWN:
146 		case -ENETUNREACH:
147 		case -ENETRESET:
148 		case -ESHUTDOWN:
149 		case -ENOPROTOOPT:
150 		case -EINVAL:   /* if returned from our tcp code,
151 				   this means there is no socket */
152 			return 1;
153 	}
154 	return 0;
155 }
156 
157 
158 /*
159  * MASTER LIST FUNCTIONS
160  */
161 
162 
163 /*
164  * regarding master list entries and heartbeat callbacks:
165  *
166  * in order to avoid sleeping and allocation that occurs in
167  * heartbeat, master list entries are simply attached to the
168  * dlm's established heartbeat callbacks.  the mle is attached
169  * when it is created, and since the dlm->spinlock is held at
170  * that time, any heartbeat event will be properly discovered
171  * by the mle.  the mle needs to be detached from the
172  * dlm->mle_hb_events list as soon as heartbeat events are no
173  * longer useful to the mle, and before the mle is freed.
174  *
175  * as a general rule, heartbeat events are no longer needed by
176  * the mle once an "answer" regarding the lock master has been
177  * received.
178  */
179 static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
180 					      struct dlm_master_list_entry *mle)
181 {
182 	assert_spin_locked(&dlm->spinlock);
183 
184 	list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
185 }
186 
187 
188 static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
189 					      struct dlm_master_list_entry *mle)
190 {
191 	if (!list_empty(&mle->hb_events))
192 		list_del_init(&mle->hb_events);
193 }
194 
195 
196 static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
197 					    struct dlm_master_list_entry *mle)
198 {
199 	spin_lock(&dlm->spinlock);
200 	__dlm_mle_detach_hb_events(dlm, mle);
201 	spin_unlock(&dlm->spinlock);
202 }
203 
204 static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
205 {
206 	struct dlm_ctxt *dlm;
207 	dlm = mle->dlm;
208 
209 	assert_spin_locked(&dlm->spinlock);
210 	assert_spin_locked(&dlm->master_lock);
211 	mle->inuse++;
212 	kref_get(&mle->mle_refs);
213 }
214 
215 static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
216 {
217 	struct dlm_ctxt *dlm;
218 	dlm = mle->dlm;
219 
220 	spin_lock(&dlm->spinlock);
221 	spin_lock(&dlm->master_lock);
222 	mle->inuse--;
223 	__dlm_put_mle(mle);
224 	spin_unlock(&dlm->master_lock);
225 	spin_unlock(&dlm->spinlock);
226 
227 }
228 
229 /* remove from list and free */
230 static void __dlm_put_mle(struct dlm_master_list_entry *mle)
231 {
232 	struct dlm_ctxt *dlm;
233 	dlm = mle->dlm;
234 
235 	assert_spin_locked(&dlm->spinlock);
236 	assert_spin_locked(&dlm->master_lock);
237 	if (!atomic_read(&mle->mle_refs.refcount)) {
238 		/* this may or may not crash, but who cares.
239 		 * it's a BUG. */
240 		mlog(ML_ERROR, "bad mle: %p\n", mle);
241 		dlm_print_one_mle(mle);
242 		BUG();
243 	} else
244 		kref_put(&mle->mle_refs, dlm_mle_release);
245 }
246 
247 
248 /* must not have any spinlocks coming in */
249 static void dlm_put_mle(struct dlm_master_list_entry *mle)
250 {
251 	struct dlm_ctxt *dlm;
252 	dlm = mle->dlm;
253 
254 	spin_lock(&dlm->spinlock);
255 	spin_lock(&dlm->master_lock);
256 	__dlm_put_mle(mle);
257 	spin_unlock(&dlm->master_lock);
258 	spin_unlock(&dlm->spinlock);
259 }
260 
261 static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
262 {
263 	kref_get(&mle->mle_refs);
264 }
265 
266 static void dlm_init_mle(struct dlm_master_list_entry *mle,
267 			enum dlm_mle_type type,
268 			struct dlm_ctxt *dlm,
269 			struct dlm_lock_resource *res,
270 			const char *name,
271 			unsigned int namelen)
272 {
273 	assert_spin_locked(&dlm->spinlock);
274 
275 	mle->dlm = dlm;
276 	mle->type = type;
277 	INIT_HLIST_NODE(&mle->master_hash_node);
278 	INIT_LIST_HEAD(&mle->hb_events);
279 	memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
280 	spin_lock_init(&mle->spinlock);
281 	init_waitqueue_head(&mle->wq);
282 	atomic_set(&mle->woken, 0);
283 	kref_init(&mle->mle_refs);
284 	memset(mle->response_map, 0, sizeof(mle->response_map));
285 	mle->master = O2NM_MAX_NODES;
286 	mle->new_master = O2NM_MAX_NODES;
287 	mle->inuse = 0;
288 
289 	BUG_ON(mle->type != DLM_MLE_BLOCK &&
290 	       mle->type != DLM_MLE_MASTER &&
291 	       mle->type != DLM_MLE_MIGRATION);
292 
293 	if (mle->type == DLM_MLE_MASTER) {
294 		BUG_ON(!res);
295 		mle->mleres = res;
296 		memcpy(mle->mname, res->lockname.name, res->lockname.len);
297 		mle->mnamelen = res->lockname.len;
298 		mle->mnamehash = res->lockname.hash;
299 	} else {
300 		BUG_ON(!name);
301 		mle->mleres = NULL;
302 		memcpy(mle->mname, name, namelen);
303 		mle->mnamelen = namelen;
304 		mle->mnamehash = dlm_lockid_hash(name, namelen);
305 	}
306 
307 	atomic_inc(&dlm->mle_tot_count[mle->type]);
308 	atomic_inc(&dlm->mle_cur_count[mle->type]);
309 
310 	/* copy off the node_map and register hb callbacks on our copy */
311 	memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
312 	memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
313 	clear_bit(dlm->node_num, mle->vote_map);
314 	clear_bit(dlm->node_num, mle->node_map);
315 
316 	/* attach the mle to the domain node up/down events */
317 	__dlm_mle_attach_hb_events(dlm, mle);
318 }
319 
320 void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
321 {
322 	assert_spin_locked(&dlm->spinlock);
323 	assert_spin_locked(&dlm->master_lock);
324 
325 	if (!hlist_unhashed(&mle->master_hash_node))
326 		hlist_del_init(&mle->master_hash_node);
327 }
328 
329 void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
330 {
331 	struct hlist_head *bucket;
332 
333 	assert_spin_locked(&dlm->master_lock);
334 
335 	bucket = dlm_master_hash(dlm, mle->mnamehash);
336 	hlist_add_head(&mle->master_hash_node, bucket);
337 }
338 
339 /* returns 1 if found, 0 if not */
340 static int dlm_find_mle(struct dlm_ctxt *dlm,
341 			struct dlm_master_list_entry **mle,
342 			char *name, unsigned int namelen)
343 {
344 	struct dlm_master_list_entry *tmpmle;
345 	struct hlist_head *bucket;
346 	struct hlist_node *list;
347 	unsigned int hash;
348 
349 	assert_spin_locked(&dlm->master_lock);
350 
351 	hash = dlm_lockid_hash(name, namelen);
352 	bucket = dlm_master_hash(dlm, hash);
353 	hlist_for_each(list, bucket) {
354 		tmpmle = hlist_entry(list, struct dlm_master_list_entry,
355 				     master_hash_node);
356 		if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
357 			continue;
358 		dlm_get_mle(tmpmle);
359 		*mle = tmpmle;
360 		return 1;
361 	}
362 	return 0;
363 }
364 
365 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
366 {
367 	struct dlm_master_list_entry *mle;
368 
369 	assert_spin_locked(&dlm->spinlock);
370 
371 	list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
372 		if (node_up)
373 			dlm_mle_node_up(dlm, mle, NULL, idx);
374 		else
375 			dlm_mle_node_down(dlm, mle, NULL, idx);
376 	}
377 }
378 
379 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
380 			      struct dlm_master_list_entry *mle,
381 			      struct o2nm_node *node, int idx)
382 {
383 	spin_lock(&mle->spinlock);
384 
385 	if (!test_bit(idx, mle->node_map))
386 		mlog(0, "node %u already removed from nodemap!\n", idx);
387 	else
388 		clear_bit(idx, mle->node_map);
389 
390 	spin_unlock(&mle->spinlock);
391 }
392 
393 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
394 			    struct dlm_master_list_entry *mle,
395 			    struct o2nm_node *node, int idx)
396 {
397 	spin_lock(&mle->spinlock);
398 
399 	if (test_bit(idx, mle->node_map))
400 		mlog(0, "node %u already in node map!\n", idx);
401 	else
402 		set_bit(idx, mle->node_map);
403 
404 	spin_unlock(&mle->spinlock);
405 }
406 
407 
408 int dlm_init_mle_cache(void)
409 {
410 	dlm_mle_cache = kmem_cache_create("o2dlm_mle",
411 					  sizeof(struct dlm_master_list_entry),
412 					  0, SLAB_HWCACHE_ALIGN,
413 					  NULL);
414 	if (dlm_mle_cache == NULL)
415 		return -ENOMEM;
416 	return 0;
417 }
418 
419 void dlm_destroy_mle_cache(void)
420 {
421 	if (dlm_mle_cache)
422 		kmem_cache_destroy(dlm_mle_cache);
423 }
424 
425 static void dlm_mle_release(struct kref *kref)
426 {
427 	struct dlm_master_list_entry *mle;
428 	struct dlm_ctxt *dlm;
429 
430 	mlog_entry_void();
431 
432 	mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
433 	dlm = mle->dlm;
434 
435 	assert_spin_locked(&dlm->spinlock);
436 	assert_spin_locked(&dlm->master_lock);
437 
438 	mlog(0, "Releasing mle for %.*s, type %d\n", mle->mnamelen, mle->mname,
439 	     mle->type);
440 
441 	/* remove from list if not already */
442 	__dlm_unlink_mle(dlm, mle);
443 
444 	/* detach the mle from the domain node up/down events */
445 	__dlm_mle_detach_hb_events(dlm, mle);
446 
447 	atomic_dec(&dlm->mle_cur_count[mle->type]);
448 
449 	/* NOTE: kfree under spinlock here.
450 	 * if this is bad, we can move this to a freelist. */
451 	kmem_cache_free(dlm_mle_cache, mle);
452 }
453 
454 
455 /*
456  * LOCK RESOURCE FUNCTIONS
457  */
458 
459 int dlm_init_master_caches(void)
460 {
461 	dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
462 					      sizeof(struct dlm_lock_resource),
463 					      0, SLAB_HWCACHE_ALIGN, NULL);
464 	if (!dlm_lockres_cache)
465 		goto bail;
466 
467 	dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
468 					       DLM_LOCKID_NAME_MAX, 0,
469 					       SLAB_HWCACHE_ALIGN, NULL);
470 	if (!dlm_lockname_cache)
471 		goto bail;
472 
473 	return 0;
474 bail:
475 	dlm_destroy_master_caches();
476 	return -ENOMEM;
477 }
478 
479 void dlm_destroy_master_caches(void)
480 {
481 	if (dlm_lockname_cache)
482 		kmem_cache_destroy(dlm_lockname_cache);
483 
484 	if (dlm_lockres_cache)
485 		kmem_cache_destroy(dlm_lockres_cache);
486 }
487 
488 static void dlm_lockres_release(struct kref *kref)
489 {
490 	struct dlm_lock_resource *res;
491 	struct dlm_ctxt *dlm;
492 
493 	res = container_of(kref, struct dlm_lock_resource, refs);
494 	dlm = res->dlm;
495 
496 	/* This should not happen -- all lockres' have a name
497 	 * associated with them at init time. */
498 	BUG_ON(!res->lockname.name);
499 
500 	mlog(0, "destroying lockres %.*s\n", res->lockname.len,
501 	     res->lockname.name);
502 
503 	spin_lock(&dlm->track_lock);
504 	if (!list_empty(&res->tracking))
505 		list_del_init(&res->tracking);
506 	else {
507 		mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n",
508 		     res->lockname.len, res->lockname.name);
509 		dlm_print_one_lock_resource(res);
510 	}
511 	spin_unlock(&dlm->track_lock);
512 
513 	atomic_dec(&dlm->res_cur_count);
514 
515 	dlm_put(dlm);
516 
517 	if (!hlist_unhashed(&res->hash_node) ||
518 	    !list_empty(&res->granted) ||
519 	    !list_empty(&res->converting) ||
520 	    !list_empty(&res->blocked) ||
521 	    !list_empty(&res->dirty) ||
522 	    !list_empty(&res->recovering) ||
523 	    !list_empty(&res->purge)) {
524 		mlog(ML_ERROR,
525 		     "Going to BUG for resource %.*s."
526 		     "  We're on a list! [%c%c%c%c%c%c%c]\n",
527 		     res->lockname.len, res->lockname.name,
528 		     !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
529 		     !list_empty(&res->granted) ? 'G' : ' ',
530 		     !list_empty(&res->converting) ? 'C' : ' ',
531 		     !list_empty(&res->blocked) ? 'B' : ' ',
532 		     !list_empty(&res->dirty) ? 'D' : ' ',
533 		     !list_empty(&res->recovering) ? 'R' : ' ',
534 		     !list_empty(&res->purge) ? 'P' : ' ');
535 
536 		dlm_print_one_lock_resource(res);
537 	}
538 
539 	/* By the time we're ready to blow this guy away, we shouldn't
540 	 * be on any lists. */
541 	BUG_ON(!hlist_unhashed(&res->hash_node));
542 	BUG_ON(!list_empty(&res->granted));
543 	BUG_ON(!list_empty(&res->converting));
544 	BUG_ON(!list_empty(&res->blocked));
545 	BUG_ON(!list_empty(&res->dirty));
546 	BUG_ON(!list_empty(&res->recovering));
547 	BUG_ON(!list_empty(&res->purge));
548 
549 	kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
550 
551 	kmem_cache_free(dlm_lockres_cache, res);
552 }
553 
554 void dlm_lockres_put(struct dlm_lock_resource *res)
555 {
556 	kref_put(&res->refs, dlm_lockres_release);
557 }
558 
559 static void dlm_init_lockres(struct dlm_ctxt *dlm,
560 			     struct dlm_lock_resource *res,
561 			     const char *name, unsigned int namelen)
562 {
563 	char *qname;
564 
565 	/* If we memset here, we lose our reference to the kmalloc'd
566 	 * res->lockname.name, so be sure to init every field
567 	 * correctly! */
568 
569 	qname = (char *) res->lockname.name;
570 	memcpy(qname, name, namelen);
571 
572 	res->lockname.len = namelen;
573 	res->lockname.hash = dlm_lockid_hash(name, namelen);
574 
575 	init_waitqueue_head(&res->wq);
576 	spin_lock_init(&res->spinlock);
577 	INIT_HLIST_NODE(&res->hash_node);
578 	INIT_LIST_HEAD(&res->granted);
579 	INIT_LIST_HEAD(&res->converting);
580 	INIT_LIST_HEAD(&res->blocked);
581 	INIT_LIST_HEAD(&res->dirty);
582 	INIT_LIST_HEAD(&res->recovering);
583 	INIT_LIST_HEAD(&res->purge);
584 	INIT_LIST_HEAD(&res->tracking);
585 	atomic_set(&res->asts_reserved, 0);
586 	res->migration_pending = 0;
587 	res->inflight_locks = 0;
588 
589 	/* put in dlm_lockres_release */
590 	dlm_grab(dlm);
591 	res->dlm = dlm;
592 
593 	kref_init(&res->refs);
594 
595 	atomic_inc(&dlm->res_tot_count);
596 	atomic_inc(&dlm->res_cur_count);
597 
598 	/* just for consistency */
599 	spin_lock(&res->spinlock);
600 	dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
601 	spin_unlock(&res->spinlock);
602 
603 	res->state = DLM_LOCK_RES_IN_PROGRESS;
604 
605 	res->last_used = 0;
606 
607 	spin_lock(&dlm->spinlock);
608 	list_add_tail(&res->tracking, &dlm->tracking_list);
609 	spin_unlock(&dlm->spinlock);
610 
611 	memset(res->lvb, 0, DLM_LVB_LEN);
612 	memset(res->refmap, 0, sizeof(res->refmap));
613 }
614 
615 struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
616 				   const char *name,
617 				   unsigned int namelen)
618 {
619 	struct dlm_lock_resource *res = NULL;
620 
621 	res = (struct dlm_lock_resource *)
622 				kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
623 	if (!res)
624 		goto error;
625 
626 	res->lockname.name = (char *)
627 				kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
628 	if (!res->lockname.name)
629 		goto error;
630 
631 	dlm_init_lockres(dlm, res, name, namelen);
632 	return res;
633 
634 error:
635 	if (res && res->lockname.name)
636 		kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
637 
638 	if (res)
639 		kmem_cache_free(dlm_lockres_cache, res);
640 	return NULL;
641 }
642 
643 void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
644 				   struct dlm_lock_resource *res,
645 				   int new_lockres,
646 				   const char *file,
647 				   int line)
648 {
649 	if (!new_lockres)
650 		assert_spin_locked(&res->spinlock);
651 
652 	if (!test_bit(dlm->node_num, res->refmap)) {
653 		BUG_ON(res->inflight_locks != 0);
654 		dlm_lockres_set_refmap_bit(dlm->node_num, res);
655 	}
656 	res->inflight_locks++;
657 	mlog(0, "%s:%.*s: inflight++: now %u\n",
658 	     dlm->name, res->lockname.len, res->lockname.name,
659 	     res->inflight_locks);
660 }
661 
662 void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
663 				   struct dlm_lock_resource *res,
664 				   const char *file,
665 				   int line)
666 {
667 	assert_spin_locked(&res->spinlock);
668 
669 	BUG_ON(res->inflight_locks == 0);
670 	res->inflight_locks--;
671 	mlog(0, "%s:%.*s: inflight--: now %u\n",
672 	     dlm->name, res->lockname.len, res->lockname.name,
673 	     res->inflight_locks);
674 	if (res->inflight_locks == 0)
675 		dlm_lockres_clear_refmap_bit(dlm->node_num, res);
676 	wake_up(&res->wq);
677 }
678 
679 /*
680  * lookup a lock resource by name.
681  * may already exist in the hashtable.
682  * lockid is null terminated
683  *
684  * if not, allocate enough for the lockres and for
685  * the temporary structure used in doing the mastering.
686  *
687  * also, do a lookup in the dlm->master_list to see
688  * if another node has begun mastering the same lock.
689  * if so, there should be a block entry in there
690  * for this name, and we should *not* attempt to master
691  * the lock here.   need to wait around for that node
692  * to assert_master (or die).
693  *
694  */
695 struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
696 					  const char *lockid,
697 					  int namelen,
698 					  int flags)
699 {
700 	struct dlm_lock_resource *tmpres=NULL, *res=NULL;
701 	struct dlm_master_list_entry *mle = NULL;
702 	struct dlm_master_list_entry *alloc_mle = NULL;
703 	int blocked = 0;
704 	int ret, nodenum;
705 	struct dlm_node_iter iter;
706 	unsigned int hash;
707 	int tries = 0;
708 	int bit, wait_on_recovery = 0;
709 	int drop_inflight_if_nonlocal = 0;
710 
711 	BUG_ON(!lockid);
712 
713 	hash = dlm_lockid_hash(lockid, namelen);
714 
715 	mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
716 
717 lookup:
718 	spin_lock(&dlm->spinlock);
719 	tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
720 	if (tmpres) {
721 		int dropping_ref = 0;
722 
723 		spin_unlock(&dlm->spinlock);
724 
725 		spin_lock(&tmpres->spinlock);
726 		/* We wait for the other thread that is mastering the resource */
727 		if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
728 			__dlm_wait_on_lockres(tmpres);
729 			BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
730 		}
731 
732 		if (tmpres->owner == dlm->node_num) {
733 			BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
734 			dlm_lockres_grab_inflight_ref(dlm, tmpres);
735 		} else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
736 			dropping_ref = 1;
737 		spin_unlock(&tmpres->spinlock);
738 
739 		/* wait until done messaging the master, drop our ref to allow
740 		 * the lockres to be purged, start over. */
741 		if (dropping_ref) {
742 			spin_lock(&tmpres->spinlock);
743 			__dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF);
744 			spin_unlock(&tmpres->spinlock);
745 			dlm_lockres_put(tmpres);
746 			tmpres = NULL;
747 			goto lookup;
748 		}
749 
750 		mlog(0, "found in hash!\n");
751 		if (res)
752 			dlm_lockres_put(res);
753 		res = tmpres;
754 		goto leave;
755 	}
756 
757 	if (!res) {
758 		spin_unlock(&dlm->spinlock);
759 		mlog(0, "allocating a new resource\n");
760 		/* nothing found and we need to allocate one. */
761 		alloc_mle = (struct dlm_master_list_entry *)
762 			kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
763 		if (!alloc_mle)
764 			goto leave;
765 		res = dlm_new_lockres(dlm, lockid, namelen);
766 		if (!res)
767 			goto leave;
768 		goto lookup;
769 	}
770 
771 	mlog(0, "no lockres found, allocated our own: %p\n", res);
772 
773 	if (flags & LKM_LOCAL) {
774 		/* caller knows it's safe to assume it's not mastered elsewhere
775 		 * DONE!  return right away */
776 		spin_lock(&res->spinlock);
777 		dlm_change_lockres_owner(dlm, res, dlm->node_num);
778 		__dlm_insert_lockres(dlm, res);
779 		dlm_lockres_grab_inflight_ref(dlm, res);
780 		spin_unlock(&res->spinlock);
781 		spin_unlock(&dlm->spinlock);
782 		/* lockres still marked IN_PROGRESS */
783 		goto wake_waiters;
784 	}
785 
786 	/* check master list to see if another node has started mastering it */
787 	spin_lock(&dlm->master_lock);
788 
789 	/* if we found a block, wait for lock to be mastered by another node */
790 	blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
791 	if (blocked) {
792 		int mig;
793 		if (mle->type == DLM_MLE_MASTER) {
794 			mlog(ML_ERROR, "master entry for nonexistent lock!\n");
795 			BUG();
796 		}
797 		mig = (mle->type == DLM_MLE_MIGRATION);
798 		/* if there is a migration in progress, let the migration
799 		 * finish before continuing.  we can wait for the absence
800 		 * of the MIGRATION mle: either the migrate finished or
801 		 * one of the nodes died and the mle was cleaned up.
802 		 * if there is a BLOCK here, but it already has a master
803 		 * set, we are too late.  the master does not have a ref
804 		 * for us in the refmap.  detach the mle and drop it.
805 		 * either way, go back to the top and start over. */
806 		if (mig || mle->master != O2NM_MAX_NODES) {
807 			BUG_ON(mig && mle->master == dlm->node_num);
808 			/* we arrived too late.  the master does not
809 			 * have a ref for us. retry. */
810 			mlog(0, "%s:%.*s: late on %s\n",
811 			     dlm->name, namelen, lockid,
812 			     mig ?  "MIGRATION" : "BLOCK");
813 			spin_unlock(&dlm->master_lock);
814 			spin_unlock(&dlm->spinlock);
815 
816 			/* master is known, detach */
817 			if (!mig)
818 				dlm_mle_detach_hb_events(dlm, mle);
819 			dlm_put_mle(mle);
820 			mle = NULL;
821 			/* this is lame, but we cant wait on either
822 			 * the mle or lockres waitqueue here */
823 			if (mig)
824 				msleep(100);
825 			goto lookup;
826 		}
827 	} else {
828 		/* go ahead and try to master lock on this node */
829 		mle = alloc_mle;
830 		/* make sure this does not get freed below */
831 		alloc_mle = NULL;
832 		dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
833 		set_bit(dlm->node_num, mle->maybe_map);
834 		__dlm_insert_mle(dlm, mle);
835 
836 		/* still holding the dlm spinlock, check the recovery map
837 		 * to see if there are any nodes that still need to be
838 		 * considered.  these will not appear in the mle nodemap
839 		 * but they might own this lockres.  wait on them. */
840 		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
841 		if (bit < O2NM_MAX_NODES) {
842 			mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "
843 			     "recover before lock mastery can begin\n",
844 			     dlm->name, namelen, (char *)lockid, bit);
845 			wait_on_recovery = 1;
846 		}
847 	}
848 
849 	/* at this point there is either a DLM_MLE_BLOCK or a
850 	 * DLM_MLE_MASTER on the master list, so it's safe to add the
851 	 * lockres to the hashtable.  anyone who finds the lock will
852 	 * still have to wait on the IN_PROGRESS. */
853 
854 	/* finally add the lockres to its hash bucket */
855 	__dlm_insert_lockres(dlm, res);
856 	/* since this lockres is new it doesnt not require the spinlock */
857 	dlm_lockres_grab_inflight_ref_new(dlm, res);
858 
859 	/* if this node does not become the master make sure to drop
860 	 * this inflight reference below */
861 	drop_inflight_if_nonlocal = 1;
862 
863 	/* get an extra ref on the mle in case this is a BLOCK
864 	 * if so, the creator of the BLOCK may try to put the last
865 	 * ref at this time in the assert master handler, so we
866 	 * need an extra one to keep from a bad ptr deref. */
867 	dlm_get_mle_inuse(mle);
868 	spin_unlock(&dlm->master_lock);
869 	spin_unlock(&dlm->spinlock);
870 
871 redo_request:
872 	while (wait_on_recovery) {
873 		/* any cluster changes that occurred after dropping the
874 		 * dlm spinlock would be detectable be a change on the mle,
875 		 * so we only need to clear out the recovery map once. */
876 		if (dlm_is_recovery_lock(lockid, namelen)) {
877 			mlog(ML_NOTICE, "%s: recovery map is not empty, but "
878 			     "must master $RECOVERY lock now\n", dlm->name);
879 			if (!dlm_pre_master_reco_lockres(dlm, res))
880 				wait_on_recovery = 0;
881 			else {
882 				mlog(0, "%s: waiting 500ms for heartbeat state "
883 				    "change\n", dlm->name);
884 				msleep(500);
885 			}
886 			continue;
887 		}
888 
889 		dlm_kick_recovery_thread(dlm);
890 		msleep(1000);
891 		dlm_wait_for_recovery(dlm);
892 
893 		spin_lock(&dlm->spinlock);
894 		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
895 		if (bit < O2NM_MAX_NODES) {
896 			mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "
897 			     "recover before lock mastery can begin\n",
898 			     dlm->name, namelen, (char *)lockid, bit);
899 			wait_on_recovery = 1;
900 		} else
901 			wait_on_recovery = 0;
902 		spin_unlock(&dlm->spinlock);
903 
904 		if (wait_on_recovery)
905 			dlm_wait_for_node_recovery(dlm, bit, 10000);
906 	}
907 
908 	/* must wait for lock to be mastered elsewhere */
909 	if (blocked)
910 		goto wait;
911 
912 	ret = -EINVAL;
913 	dlm_node_iter_init(mle->vote_map, &iter);
914 	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
915 		ret = dlm_do_master_request(res, mle, nodenum);
916 		if (ret < 0)
917 			mlog_errno(ret);
918 		if (mle->master != O2NM_MAX_NODES) {
919 			/* found a master ! */
920 			if (mle->master <= nodenum)
921 				break;
922 			/* if our master request has not reached the master
923 			 * yet, keep going until it does.  this is how the
924 			 * master will know that asserts are needed back to
925 			 * the lower nodes. */
926 			mlog(0, "%s:%.*s: requests only up to %u but master "
927 			     "is %u, keep going\n", dlm->name, namelen,
928 			     lockid, nodenum, mle->master);
929 		}
930 	}
931 
932 wait:
933 	/* keep going until the response map includes all nodes */
934 	ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
935 	if (ret < 0) {
936 		wait_on_recovery = 1;
937 		mlog(0, "%s:%.*s: node map changed, redo the "
938 		     "master request now, blocked=%d\n",
939 		     dlm->name, res->lockname.len,
940 		     res->lockname.name, blocked);
941 		if (++tries > 20) {
942 			mlog(ML_ERROR, "%s:%.*s: spinning on "
943 			     "dlm_wait_for_lock_mastery, blocked=%d\n",
944 			     dlm->name, res->lockname.len,
945 			     res->lockname.name, blocked);
946 			dlm_print_one_lock_resource(res);
947 			dlm_print_one_mle(mle);
948 			tries = 0;
949 		}
950 		goto redo_request;
951 	}
952 
953 	mlog(0, "lockres mastered by %u\n", res->owner);
954 	/* make sure we never continue without this */
955 	BUG_ON(res->owner == O2NM_MAX_NODES);
956 
957 	/* master is known, detach if not already detached */
958 	dlm_mle_detach_hb_events(dlm, mle);
959 	dlm_put_mle(mle);
960 	/* put the extra ref */
961 	dlm_put_mle_inuse(mle);
962 
963 wake_waiters:
964 	spin_lock(&res->spinlock);
965 	if (res->owner != dlm->node_num && drop_inflight_if_nonlocal)
966 		dlm_lockres_drop_inflight_ref(dlm, res);
967 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
968 	spin_unlock(&res->spinlock);
969 	wake_up(&res->wq);
970 
971 leave:
972 	/* need to free the unused mle */
973 	if (alloc_mle)
974 		kmem_cache_free(dlm_mle_cache, alloc_mle);
975 
976 	return res;
977 }
978 
979 
980 #define DLM_MASTERY_TIMEOUT_MS   5000
981 
982 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
983 				     struct dlm_lock_resource *res,
984 				     struct dlm_master_list_entry *mle,
985 				     int *blocked)
986 {
987 	u8 m;
988 	int ret, bit;
989 	int map_changed, voting_done;
990 	int assert, sleep;
991 
992 recheck:
993 	ret = 0;
994 	assert = 0;
995 
996 	/* check if another node has already become the owner */
997 	spin_lock(&res->spinlock);
998 	if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
999 		mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
1000 		     res->lockname.len, res->lockname.name, res->owner);
1001 		spin_unlock(&res->spinlock);
1002 		/* this will cause the master to re-assert across
1003 		 * the whole cluster, freeing up mles */
1004 		if (res->owner != dlm->node_num) {
1005 			ret = dlm_do_master_request(res, mle, res->owner);
1006 			if (ret < 0) {
1007 				/* give recovery a chance to run */
1008 				mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
1009 				msleep(500);
1010 				goto recheck;
1011 			}
1012 		}
1013 		ret = 0;
1014 		goto leave;
1015 	}
1016 	spin_unlock(&res->spinlock);
1017 
1018 	spin_lock(&mle->spinlock);
1019 	m = mle->master;
1020 	map_changed = (memcmp(mle->vote_map, mle->node_map,
1021 			      sizeof(mle->vote_map)) != 0);
1022 	voting_done = (memcmp(mle->vote_map, mle->response_map,
1023 			     sizeof(mle->vote_map)) == 0);
1024 
1025 	/* restart if we hit any errors */
1026 	if (map_changed) {
1027 		int b;
1028 		mlog(0, "%s: %.*s: node map changed, restarting\n",
1029 		     dlm->name, res->lockname.len, res->lockname.name);
1030 		ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
1031 		b = (mle->type == DLM_MLE_BLOCK);
1032 		if ((*blocked && !b) || (!*blocked && b)) {
1033 			mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
1034 			     dlm->name, res->lockname.len, res->lockname.name,
1035 			     *blocked, b);
1036 			*blocked = b;
1037 		}
1038 		spin_unlock(&mle->spinlock);
1039 		if (ret < 0) {
1040 			mlog_errno(ret);
1041 			goto leave;
1042 		}
1043 		mlog(0, "%s:%.*s: restart lock mastery succeeded, "
1044 		     "rechecking now\n", dlm->name, res->lockname.len,
1045 		     res->lockname.name);
1046 		goto recheck;
1047 	} else {
1048 		if (!voting_done) {
1049 			mlog(0, "map not changed and voting not done "
1050 			     "for %s:%.*s\n", dlm->name, res->lockname.len,
1051 			     res->lockname.name);
1052 		}
1053 	}
1054 
1055 	if (m != O2NM_MAX_NODES) {
1056 		/* another node has done an assert!
1057 		 * all done! */
1058 		sleep = 0;
1059 	} else {
1060 		sleep = 1;
1061 		/* have all nodes responded? */
1062 		if (voting_done && !*blocked) {
1063 			bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
1064 			if (dlm->node_num <= bit) {
1065 				/* my node number is lowest.
1066 			 	 * now tell other nodes that I am
1067 				 * mastering this. */
1068 				mle->master = dlm->node_num;
1069 				/* ref was grabbed in get_lock_resource
1070 				 * will be dropped in dlmlock_master */
1071 				assert = 1;
1072 				sleep = 0;
1073 			}
1074 			/* if voting is done, but we have not received
1075 			 * an assert master yet, we must sleep */
1076 		}
1077 	}
1078 
1079 	spin_unlock(&mle->spinlock);
1080 
1081 	/* sleep if we haven't finished voting yet */
1082 	if (sleep) {
1083 		unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
1084 
1085 		/*
1086 		if (atomic_read(&mle->mle_refs.refcount) < 2)
1087 			mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
1088 			atomic_read(&mle->mle_refs.refcount),
1089 			res->lockname.len, res->lockname.name);
1090 		*/
1091 		atomic_set(&mle->woken, 0);
1092 		(void)wait_event_timeout(mle->wq,
1093 					 (atomic_read(&mle->woken) == 1),
1094 					 timeo);
1095 		if (res->owner == O2NM_MAX_NODES) {
1096 			mlog(0, "%s:%.*s: waiting again\n", dlm->name,
1097 			     res->lockname.len, res->lockname.name);
1098 			goto recheck;
1099 		}
1100 		mlog(0, "done waiting, master is %u\n", res->owner);
1101 		ret = 0;
1102 		goto leave;
1103 	}
1104 
1105 	ret = 0;   /* done */
1106 	if (assert) {
1107 		m = dlm->node_num;
1108 		mlog(0, "about to master %.*s here, this=%u\n",
1109 		     res->lockname.len, res->lockname.name, m);
1110 		ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1111 		if (ret) {
1112 			/* This is a failure in the network path,
1113 			 * not in the response to the assert_master
1114 			 * (any nonzero response is a BUG on this node).
1115 			 * Most likely a socket just got disconnected
1116 			 * due to node death. */
1117 			mlog_errno(ret);
1118 		}
1119 		/* no longer need to restart lock mastery.
1120 		 * all living nodes have been contacted. */
1121 		ret = 0;
1122 	}
1123 
1124 	/* set the lockres owner */
1125 	spin_lock(&res->spinlock);
1126 	/* mastery reference obtained either during
1127 	 * assert_master_handler or in get_lock_resource */
1128 	dlm_change_lockres_owner(dlm, res, m);
1129 	spin_unlock(&res->spinlock);
1130 
1131 leave:
1132 	return ret;
1133 }
1134 
1135 struct dlm_bitmap_diff_iter
1136 {
1137 	int curnode;
1138 	unsigned long *orig_bm;
1139 	unsigned long *cur_bm;
1140 	unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
1141 };
1142 
1143 enum dlm_node_state_change
1144 {
1145 	NODE_DOWN = -1,
1146 	NODE_NO_CHANGE = 0,
1147 	NODE_UP
1148 };
1149 
1150 static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
1151 				      unsigned long *orig_bm,
1152 				      unsigned long *cur_bm)
1153 {
1154 	unsigned long p1, p2;
1155 	int i;
1156 
1157 	iter->curnode = -1;
1158 	iter->orig_bm = orig_bm;
1159 	iter->cur_bm = cur_bm;
1160 
1161 	for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
1162        		p1 = *(iter->orig_bm + i);
1163 	       	p2 = *(iter->cur_bm + i);
1164 		iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
1165 	}
1166 }
1167 
1168 static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
1169 				     enum dlm_node_state_change *state)
1170 {
1171 	int bit;
1172 
1173 	if (iter->curnode >= O2NM_MAX_NODES)
1174 		return -ENOENT;
1175 
1176 	bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
1177 			    iter->curnode+1);
1178 	if (bit >= O2NM_MAX_NODES) {
1179 		iter->curnode = O2NM_MAX_NODES;
1180 		return -ENOENT;
1181 	}
1182 
1183 	/* if it was there in the original then this node died */
1184 	if (test_bit(bit, iter->orig_bm))
1185 		*state = NODE_DOWN;
1186 	else
1187 		*state = NODE_UP;
1188 
1189 	iter->curnode = bit;
1190 	return bit;
1191 }
1192 
1193 
1194 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1195 				    struct dlm_lock_resource *res,
1196 				    struct dlm_master_list_entry *mle,
1197 				    int blocked)
1198 {
1199 	struct dlm_bitmap_diff_iter bdi;
1200 	enum dlm_node_state_change sc;
1201 	int node;
1202 	int ret = 0;
1203 
1204 	mlog(0, "something happened such that the "
1205 	     "master process may need to be restarted!\n");
1206 
1207 	assert_spin_locked(&mle->spinlock);
1208 
1209 	dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
1210 	node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1211 	while (node >= 0) {
1212 		if (sc == NODE_UP) {
1213 			/* a node came up.  clear any old vote from
1214 			 * the response map and set it in the vote map
1215 			 * then restart the mastery. */
1216 			mlog(ML_NOTICE, "node %d up while restarting\n", node);
1217 
1218 			/* redo the master request, but only for the new node */
1219 			mlog(0, "sending request to new node\n");
1220 			clear_bit(node, mle->response_map);
1221 			set_bit(node, mle->vote_map);
1222 		} else {
1223 			mlog(ML_ERROR, "node down! %d\n", node);
1224 			if (blocked) {
1225 				int lowest = find_next_bit(mle->maybe_map,
1226 						       O2NM_MAX_NODES, 0);
1227 
1228 				/* act like it was never there */
1229 				clear_bit(node, mle->maybe_map);
1230 
1231 			       	if (node == lowest) {
1232 					mlog(0, "expected master %u died"
1233 					    " while this node was blocked "
1234 					    "waiting on it!\n", node);
1235 					lowest = find_next_bit(mle->maybe_map,
1236 						       	O2NM_MAX_NODES,
1237 						       	lowest+1);
1238 					if (lowest < O2NM_MAX_NODES) {
1239 						mlog(0, "%s:%.*s:still "
1240 						     "blocked. waiting on %u "
1241 						     "now\n", dlm->name,
1242 						     res->lockname.len,
1243 						     res->lockname.name,
1244 						     lowest);
1245 					} else {
1246 						/* mle is an MLE_BLOCK, but
1247 						 * there is now nothing left to
1248 						 * block on.  we need to return
1249 						 * all the way back out and try
1250 						 * again with an MLE_MASTER.
1251 						 * dlm_do_local_recovery_cleanup
1252 						 * has already run, so the mle
1253 						 * refcount is ok */
1254 						mlog(0, "%s:%.*s: no "
1255 						     "longer blocking. try to "
1256 						     "master this here\n",
1257 						     dlm->name,
1258 						     res->lockname.len,
1259 						     res->lockname.name);
1260 						mle->type = DLM_MLE_MASTER;
1261 						mle->mleres = res;
1262 					}
1263 				}
1264 			}
1265 
1266 			/* now blank out everything, as if we had never
1267 			 * contacted anyone */
1268 			memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
1269 			memset(mle->response_map, 0, sizeof(mle->response_map));
1270 			/* reset the vote_map to the current node_map */
1271 			memcpy(mle->vote_map, mle->node_map,
1272 			       sizeof(mle->node_map));
1273 			/* put myself into the maybe map */
1274 			if (mle->type != DLM_MLE_BLOCK)
1275 				set_bit(dlm->node_num, mle->maybe_map);
1276 		}
1277 		ret = -EAGAIN;
1278 		node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1279 	}
1280 	return ret;
1281 }
1282 
1283 
1284 /*
1285  * DLM_MASTER_REQUEST_MSG
1286  *
1287  * returns: 0 on success,
1288  *          -errno on a network error
1289  *
1290  * on error, the caller should assume the target node is "dead"
1291  *
1292  */
1293 
1294 static int dlm_do_master_request(struct dlm_lock_resource *res,
1295 				 struct dlm_master_list_entry *mle, int to)
1296 {
1297 	struct dlm_ctxt *dlm = mle->dlm;
1298 	struct dlm_master_request request;
1299 	int ret, response=0, resend;
1300 
1301 	memset(&request, 0, sizeof(request));
1302 	request.node_idx = dlm->node_num;
1303 
1304 	BUG_ON(mle->type == DLM_MLE_MIGRATION);
1305 
1306 	request.namelen = (u8)mle->mnamelen;
1307 	memcpy(request.name, mle->mname, request.namelen);
1308 
1309 again:
1310 	ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1311 				 sizeof(request), to, &response);
1312 	if (ret < 0)  {
1313 		if (ret == -ESRCH) {
1314 			/* should never happen */
1315 			mlog(ML_ERROR, "TCP stack not ready!\n");
1316 			BUG();
1317 		} else if (ret == -EINVAL) {
1318 			mlog(ML_ERROR, "bad args passed to o2net!\n");
1319 			BUG();
1320 		} else if (ret == -ENOMEM) {
1321 			mlog(ML_ERROR, "out of memory while trying to send "
1322 			     "network message!  retrying\n");
1323 			/* this is totally crude */
1324 			msleep(50);
1325 			goto again;
1326 		} else if (!dlm_is_host_down(ret)) {
1327 			/* not a network error. bad. */
1328 			mlog_errno(ret);
1329 			mlog(ML_ERROR, "unhandled error!");
1330 			BUG();
1331 		}
1332 		/* all other errors should be network errors,
1333 		 * and likely indicate node death */
1334 		mlog(ML_ERROR, "link to %d went down!\n", to);
1335 		goto out;
1336 	}
1337 
1338 	ret = 0;
1339 	resend = 0;
1340 	spin_lock(&mle->spinlock);
1341 	switch (response) {
1342 		case DLM_MASTER_RESP_YES:
1343 			set_bit(to, mle->response_map);
1344 			mlog(0, "node %u is the master, response=YES\n", to);
1345 			mlog(0, "%s:%.*s: master node %u now knows I have a "
1346 			     "reference\n", dlm->name, res->lockname.len,
1347 			     res->lockname.name, to);
1348 			mle->master = to;
1349 			break;
1350 		case DLM_MASTER_RESP_NO:
1351 			mlog(0, "node %u not master, response=NO\n", to);
1352 			set_bit(to, mle->response_map);
1353 			break;
1354 		case DLM_MASTER_RESP_MAYBE:
1355 			mlog(0, "node %u not master, response=MAYBE\n", to);
1356 			set_bit(to, mle->response_map);
1357 			set_bit(to, mle->maybe_map);
1358 			break;
1359 		case DLM_MASTER_RESP_ERROR:
1360 			mlog(0, "node %u hit an error, resending\n", to);
1361 			resend = 1;
1362 			response = 0;
1363 			break;
1364 		default:
1365 			mlog(ML_ERROR, "bad response! %u\n", response);
1366 			BUG();
1367 	}
1368 	spin_unlock(&mle->spinlock);
1369 	if (resend) {
1370 		/* this is also totally crude */
1371 		msleep(50);
1372 		goto again;
1373 	}
1374 
1375 out:
1376 	return ret;
1377 }
1378 
1379 /*
1380  * locks that can be taken here:
1381  * dlm->spinlock
1382  * res->spinlock
1383  * mle->spinlock
1384  * dlm->master_list
1385  *
1386  * if possible, TRIM THIS DOWN!!!
1387  */
1388 int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
1389 			       void **ret_data)
1390 {
1391 	u8 response = DLM_MASTER_RESP_MAYBE;
1392 	struct dlm_ctxt *dlm = data;
1393 	struct dlm_lock_resource *res = NULL;
1394 	struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1395 	struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1396 	char *name;
1397 	unsigned int namelen, hash;
1398 	int found, ret;
1399 	int set_maybe;
1400 	int dispatch_assert = 0;
1401 
1402 	if (!dlm_grab(dlm))
1403 		return DLM_MASTER_RESP_NO;
1404 
1405 	if (!dlm_domain_fully_joined(dlm)) {
1406 		response = DLM_MASTER_RESP_NO;
1407 		goto send_response;
1408 	}
1409 
1410 	name = request->name;
1411 	namelen = request->namelen;
1412 	hash = dlm_lockid_hash(name, namelen);
1413 
1414 	if (namelen > DLM_LOCKID_NAME_MAX) {
1415 		response = DLM_IVBUFLEN;
1416 		goto send_response;
1417 	}
1418 
1419 way_up_top:
1420 	spin_lock(&dlm->spinlock);
1421 	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1422 	if (res) {
1423 		spin_unlock(&dlm->spinlock);
1424 
1425 		/* take care of the easy cases up front */
1426 		spin_lock(&res->spinlock);
1427 		if (res->state & (DLM_LOCK_RES_RECOVERING|
1428 				  DLM_LOCK_RES_MIGRATING)) {
1429 			spin_unlock(&res->spinlock);
1430 			mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1431 			     "being recovered/migrated\n");
1432 			response = DLM_MASTER_RESP_ERROR;
1433 			if (mle)
1434 				kmem_cache_free(dlm_mle_cache, mle);
1435 			goto send_response;
1436 		}
1437 
1438 		if (res->owner == dlm->node_num) {
1439 			mlog(0, "%s:%.*s: setting bit %u in refmap\n",
1440 			     dlm->name, namelen, name, request->node_idx);
1441 			dlm_lockres_set_refmap_bit(request->node_idx, res);
1442 			spin_unlock(&res->spinlock);
1443 			response = DLM_MASTER_RESP_YES;
1444 			if (mle)
1445 				kmem_cache_free(dlm_mle_cache, mle);
1446 
1447 			/* this node is the owner.
1448 			 * there is some extra work that needs to
1449 			 * happen now.  the requesting node has
1450 			 * caused all nodes up to this one to
1451 			 * create mles.  this node now needs to
1452 			 * go back and clean those up. */
1453 			dispatch_assert = 1;
1454 			goto send_response;
1455 		} else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1456 			spin_unlock(&res->spinlock);
1457 			// mlog(0, "node %u is the master\n", res->owner);
1458 			response = DLM_MASTER_RESP_NO;
1459 			if (mle)
1460 				kmem_cache_free(dlm_mle_cache, mle);
1461 			goto send_response;
1462 		}
1463 
1464 		/* ok, there is no owner.  either this node is
1465 		 * being blocked, or it is actively trying to
1466 		 * master this lock. */
1467 		if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1468 			mlog(ML_ERROR, "lock with no owner should be "
1469 			     "in-progress!\n");
1470 			BUG();
1471 		}
1472 
1473 		// mlog(0, "lockres is in progress...\n");
1474 		spin_lock(&dlm->master_lock);
1475 		found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1476 		if (!found) {
1477 			mlog(ML_ERROR, "no mle found for this lock!\n");
1478 			BUG();
1479 		}
1480 		set_maybe = 1;
1481 		spin_lock(&tmpmle->spinlock);
1482 		if (tmpmle->type == DLM_MLE_BLOCK) {
1483 			// mlog(0, "this node is waiting for "
1484 			// "lockres to be mastered\n");
1485 			response = DLM_MASTER_RESP_NO;
1486 		} else if (tmpmle->type == DLM_MLE_MIGRATION) {
1487 			mlog(0, "node %u is master, but trying to migrate to "
1488 			     "node %u.\n", tmpmle->master, tmpmle->new_master);
1489 			if (tmpmle->master == dlm->node_num) {
1490 				mlog(ML_ERROR, "no owner on lockres, but this "
1491 				     "node is trying to migrate it to %u?!\n",
1492 				     tmpmle->new_master);
1493 				BUG();
1494 			} else {
1495 				/* the real master can respond on its own */
1496 				response = DLM_MASTER_RESP_NO;
1497 			}
1498 		} else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1499 			set_maybe = 0;
1500 			if (tmpmle->master == dlm->node_num) {
1501 				response = DLM_MASTER_RESP_YES;
1502 				/* this node will be the owner.
1503 				 * go back and clean the mles on any
1504 				 * other nodes */
1505 				dispatch_assert = 1;
1506 				dlm_lockres_set_refmap_bit(request->node_idx, res);
1507 				mlog(0, "%s:%.*s: setting bit %u in refmap\n",
1508 				     dlm->name, namelen, name,
1509 				     request->node_idx);
1510 			} else
1511 				response = DLM_MASTER_RESP_NO;
1512 		} else {
1513 			// mlog(0, "this node is attempting to "
1514 			// "master lockres\n");
1515 			response = DLM_MASTER_RESP_MAYBE;
1516 		}
1517 		if (set_maybe)
1518 			set_bit(request->node_idx, tmpmle->maybe_map);
1519 		spin_unlock(&tmpmle->spinlock);
1520 
1521 		spin_unlock(&dlm->master_lock);
1522 		spin_unlock(&res->spinlock);
1523 
1524 		/* keep the mle attached to heartbeat events */
1525 		dlm_put_mle(tmpmle);
1526 		if (mle)
1527 			kmem_cache_free(dlm_mle_cache, mle);
1528 		goto send_response;
1529 	}
1530 
1531 	/*
1532 	 * lockres doesn't exist on this node
1533 	 * if there is an MLE_BLOCK, return NO
1534 	 * if there is an MLE_MASTER, return MAYBE
1535 	 * otherwise, add an MLE_BLOCK, return NO
1536 	 */
1537 	spin_lock(&dlm->master_lock);
1538 	found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1539 	if (!found) {
1540 		/* this lockid has never been seen on this node yet */
1541 		// mlog(0, "no mle found\n");
1542 		if (!mle) {
1543 			spin_unlock(&dlm->master_lock);
1544 			spin_unlock(&dlm->spinlock);
1545 
1546 			mle = (struct dlm_master_list_entry *)
1547 				kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
1548 			if (!mle) {
1549 				response = DLM_MASTER_RESP_ERROR;
1550 				mlog_errno(-ENOMEM);
1551 				goto send_response;
1552 			}
1553 			goto way_up_top;
1554 		}
1555 
1556 		// mlog(0, "this is second time thru, already allocated, "
1557 		// "add the block.\n");
1558 		dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1559 		set_bit(request->node_idx, mle->maybe_map);
1560 		__dlm_insert_mle(dlm, mle);
1561 		response = DLM_MASTER_RESP_NO;
1562 	} else {
1563 		// mlog(0, "mle was found\n");
1564 		set_maybe = 1;
1565 		spin_lock(&tmpmle->spinlock);
1566 		if (tmpmle->master == dlm->node_num) {
1567 			mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1568 			BUG();
1569 		}
1570 		if (tmpmle->type == DLM_MLE_BLOCK)
1571 			response = DLM_MASTER_RESP_NO;
1572 		else if (tmpmle->type == DLM_MLE_MIGRATION) {
1573 			mlog(0, "migration mle was found (%u->%u)\n",
1574 			     tmpmle->master, tmpmle->new_master);
1575 			/* real master can respond on its own */
1576 			response = DLM_MASTER_RESP_NO;
1577 		} else
1578 			response = DLM_MASTER_RESP_MAYBE;
1579 		if (set_maybe)
1580 			set_bit(request->node_idx, tmpmle->maybe_map);
1581 		spin_unlock(&tmpmle->spinlock);
1582 	}
1583 	spin_unlock(&dlm->master_lock);
1584 	spin_unlock(&dlm->spinlock);
1585 
1586 	if (found) {
1587 		/* keep the mle attached to heartbeat events */
1588 		dlm_put_mle(tmpmle);
1589 	}
1590 send_response:
1591 	/*
1592 	 * __dlm_lookup_lockres() grabbed a reference to this lockres.
1593 	 * The reference is released by dlm_assert_master_worker() under
1594 	 * the call to dlm_dispatch_assert_master().  If
1595 	 * dlm_assert_master_worker() isn't called, we drop it here.
1596 	 */
1597 	if (dispatch_assert) {
1598 		if (response != DLM_MASTER_RESP_YES)
1599 			mlog(ML_ERROR, "invalid response %d\n", response);
1600 		if (!res) {
1601 			mlog(ML_ERROR, "bad lockres while trying to assert!\n");
1602 			BUG();
1603 		}
1604 		mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1605 			     dlm->node_num, res->lockname.len, res->lockname.name);
1606 		ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
1607 						 DLM_ASSERT_MASTER_MLE_CLEANUP);
1608 		if (ret < 0) {
1609 			mlog(ML_ERROR, "failed to dispatch assert master work\n");
1610 			response = DLM_MASTER_RESP_ERROR;
1611 			dlm_lockres_put(res);
1612 		}
1613 	} else {
1614 		if (res)
1615 			dlm_lockres_put(res);
1616 	}
1617 
1618 	dlm_put(dlm);
1619 	return response;
1620 }
1621 
1622 /*
1623  * DLM_ASSERT_MASTER_MSG
1624  */
1625 
1626 
1627 /*
1628  * NOTE: this can be used for debugging
1629  * can periodically run all locks owned by this node
1630  * and re-assert across the cluster...
1631  */
1632 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
1633 				struct dlm_lock_resource *res,
1634 				void *nodemap, u32 flags)
1635 {
1636 	struct dlm_assert_master assert;
1637 	int to, tmpret;
1638 	struct dlm_node_iter iter;
1639 	int ret = 0;
1640 	int reassert;
1641 	const char *lockname = res->lockname.name;
1642 	unsigned int namelen = res->lockname.len;
1643 
1644 	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1645 
1646 	spin_lock(&res->spinlock);
1647 	res->state |= DLM_LOCK_RES_SETREF_INPROG;
1648 	spin_unlock(&res->spinlock);
1649 
1650 again:
1651 	reassert = 0;
1652 
1653 	/* note that if this nodemap is empty, it returns 0 */
1654 	dlm_node_iter_init(nodemap, &iter);
1655 	while ((to = dlm_node_iter_next(&iter)) >= 0) {
1656 		int r = 0;
1657 		struct dlm_master_list_entry *mle = NULL;
1658 
1659 		mlog(0, "sending assert master to %d (%.*s)\n", to,
1660 		     namelen, lockname);
1661 		memset(&assert, 0, sizeof(assert));
1662 		assert.node_idx = dlm->node_num;
1663 		assert.namelen = namelen;
1664 		memcpy(assert.name, lockname, namelen);
1665 		assert.flags = cpu_to_be32(flags);
1666 
1667 		tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1668 					    &assert, sizeof(assert), to, &r);
1669 		if (tmpret < 0) {
1670 			mlog(0, "assert_master returned %d!\n", tmpret);
1671 			if (!dlm_is_host_down(tmpret)) {
1672 				mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
1673 				BUG();
1674 			}
1675 			/* a node died.  finish out the rest of the nodes. */
1676 			mlog(0, "link to %d went down!\n", to);
1677 			/* any nonzero status return will do */
1678 			ret = tmpret;
1679 			r = 0;
1680 		} else if (r < 0) {
1681 			/* ok, something horribly messed.  kill thyself. */
1682 			mlog(ML_ERROR,"during assert master of %.*s to %u, "
1683 			     "got %d.\n", namelen, lockname, to, r);
1684 			spin_lock(&dlm->spinlock);
1685 			spin_lock(&dlm->master_lock);
1686 			if (dlm_find_mle(dlm, &mle, (char *)lockname,
1687 					 namelen)) {
1688 				dlm_print_one_mle(mle);
1689 				__dlm_put_mle(mle);
1690 			}
1691 			spin_unlock(&dlm->master_lock);
1692 			spin_unlock(&dlm->spinlock);
1693 			BUG();
1694 		}
1695 
1696 		if (r & DLM_ASSERT_RESPONSE_REASSERT &&
1697 		    !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
1698 				mlog(ML_ERROR, "%.*s: very strange, "
1699 				     "master MLE but no lockres on %u\n",
1700 				     namelen, lockname, to);
1701 		}
1702 
1703 		if (r & DLM_ASSERT_RESPONSE_REASSERT) {
1704 			mlog(0, "%.*s: node %u create mles on other "
1705 			     "nodes and requests a re-assert\n",
1706 			     namelen, lockname, to);
1707 			reassert = 1;
1708 		}
1709 		if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
1710 			mlog(0, "%.*s: node %u has a reference to this "
1711 			     "lockres, set the bit in the refmap\n",
1712 			     namelen, lockname, to);
1713 			spin_lock(&res->spinlock);
1714 			dlm_lockres_set_refmap_bit(to, res);
1715 			spin_unlock(&res->spinlock);
1716 		}
1717 	}
1718 
1719 	if (reassert)
1720 		goto again;
1721 
1722 	spin_lock(&res->spinlock);
1723 	res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
1724 	spin_unlock(&res->spinlock);
1725 	wake_up(&res->wq);
1726 
1727 	return ret;
1728 }
1729 
1730 /*
1731  * locks that can be taken here:
1732  * dlm->spinlock
1733  * res->spinlock
1734  * mle->spinlock
1735  * dlm->master_list
1736  *
1737  * if possible, TRIM THIS DOWN!!!
1738  */
1739 int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
1740 			      void **ret_data)
1741 {
1742 	struct dlm_ctxt *dlm = data;
1743 	struct dlm_master_list_entry *mle = NULL;
1744 	struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1745 	struct dlm_lock_resource *res = NULL;
1746 	char *name;
1747 	unsigned int namelen, hash;
1748 	u32 flags;
1749 	int master_request = 0, have_lockres_ref = 0;
1750 	int ret = 0;
1751 
1752 	if (!dlm_grab(dlm))
1753 		return 0;
1754 
1755 	name = assert->name;
1756 	namelen = assert->namelen;
1757 	hash = dlm_lockid_hash(name, namelen);
1758 	flags = be32_to_cpu(assert->flags);
1759 
1760 	if (namelen > DLM_LOCKID_NAME_MAX) {
1761 		mlog(ML_ERROR, "Invalid name length!");
1762 		goto done;
1763 	}
1764 
1765 	spin_lock(&dlm->spinlock);
1766 
1767 	if (flags)
1768 		mlog(0, "assert_master with flags: %u\n", flags);
1769 
1770 	/* find the MLE */
1771 	spin_lock(&dlm->master_lock);
1772 	if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1773 		/* not an error, could be master just re-asserting */
1774 		mlog(0, "just got an assert_master from %u, but no "
1775 		     "MLE for it! (%.*s)\n", assert->node_idx,
1776 		     namelen, name);
1777 	} else {
1778 		int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
1779 		if (bit >= O2NM_MAX_NODES) {
1780 			/* not necessarily an error, though less likely.
1781 			 * could be master just re-asserting. */
1782 			mlog(0, "no bits set in the maybe_map, but %u "
1783 			     "is asserting! (%.*s)\n", assert->node_idx,
1784 			     namelen, name);
1785 		} else if (bit != assert->node_idx) {
1786 			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1787 				mlog(0, "master %u was found, %u should "
1788 				     "back off\n", assert->node_idx, bit);
1789 			} else {
1790 				/* with the fix for bug 569, a higher node
1791 				 * number winning the mastery will respond
1792 				 * YES to mastery requests, but this node
1793 				 * had no way of knowing.  let it pass. */
1794 				mlog(0, "%u is the lowest node, "
1795 				     "%u is asserting. (%.*s)  %u must "
1796 				     "have begun after %u won.\n", bit,
1797 				     assert->node_idx, namelen, name, bit,
1798 				     assert->node_idx);
1799 			}
1800 		}
1801 		if (mle->type == DLM_MLE_MIGRATION) {
1802 			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1803 				mlog(0, "%s:%.*s: got cleanup assert"
1804 				     " from %u for migration\n",
1805 				     dlm->name, namelen, name,
1806 				     assert->node_idx);
1807 			} else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
1808 				mlog(0, "%s:%.*s: got unrelated assert"
1809 				     " from %u for migration, ignoring\n",
1810 				     dlm->name, namelen, name,
1811 				     assert->node_idx);
1812 				__dlm_put_mle(mle);
1813 				spin_unlock(&dlm->master_lock);
1814 				spin_unlock(&dlm->spinlock);
1815 				goto done;
1816 			}
1817 		}
1818 	}
1819 	spin_unlock(&dlm->master_lock);
1820 
1821 	/* ok everything checks out with the MLE
1822 	 * now check to see if there is a lockres */
1823 	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1824 	if (res) {
1825 		spin_lock(&res->spinlock);
1826 		if (res->state & DLM_LOCK_RES_RECOVERING)  {
1827 			mlog(ML_ERROR, "%u asserting but %.*s is "
1828 			     "RECOVERING!\n", assert->node_idx, namelen, name);
1829 			goto kill;
1830 		}
1831 		if (!mle) {
1832 			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
1833 			    res->owner != assert->node_idx) {
1834 				mlog(ML_ERROR, "DIE! Mastery assert from %u, "
1835 				     "but current owner is %u! (%.*s)\n",
1836 				     assert->node_idx, res->owner, namelen,
1837 				     name);
1838 				__dlm_print_one_lock_resource(res);
1839 				BUG();
1840 			}
1841 		} else if (mle->type != DLM_MLE_MIGRATION) {
1842 			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1843 				/* owner is just re-asserting */
1844 				if (res->owner == assert->node_idx) {
1845 					mlog(0, "owner %u re-asserting on "
1846 					     "lock %.*s\n", assert->node_idx,
1847 					     namelen, name);
1848 					goto ok;
1849 				}
1850 				mlog(ML_ERROR, "got assert_master from "
1851 				     "node %u, but %u is the owner! "
1852 				     "(%.*s)\n", assert->node_idx,
1853 				     res->owner, namelen, name);
1854 				goto kill;
1855 			}
1856 			if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1857 				mlog(ML_ERROR, "got assert from %u, but lock "
1858 				     "with no owner should be "
1859 				     "in-progress! (%.*s)\n",
1860 				     assert->node_idx,
1861 				     namelen, name);
1862 				goto kill;
1863 			}
1864 		} else /* mle->type == DLM_MLE_MIGRATION */ {
1865 			/* should only be getting an assert from new master */
1866 			if (assert->node_idx != mle->new_master) {
1867 				mlog(ML_ERROR, "got assert from %u, but "
1868 				     "new master is %u, and old master "
1869 				     "was %u (%.*s)\n",
1870 				     assert->node_idx, mle->new_master,
1871 				     mle->master, namelen, name);
1872 				goto kill;
1873 			}
1874 
1875 		}
1876 ok:
1877 		spin_unlock(&res->spinlock);
1878 	}
1879 	spin_unlock(&dlm->spinlock);
1880 
1881 	// mlog(0, "woo!  got an assert_master from node %u!\n",
1882 	// 	     assert->node_idx);
1883 	if (mle) {
1884 		int extra_ref = 0;
1885 		int nn = -1;
1886 		int rr, err = 0;
1887 
1888 		spin_lock(&mle->spinlock);
1889 		if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1890 			extra_ref = 1;
1891 		else {
1892 			/* MASTER mle: if any bits set in the response map
1893 			 * then the calling node needs to re-assert to clear
1894 			 * up nodes that this node contacted */
1895 			while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
1896 						    nn+1)) < O2NM_MAX_NODES) {
1897 				if (nn != dlm->node_num && nn != assert->node_idx)
1898 					master_request = 1;
1899 			}
1900 		}
1901 		mle->master = assert->node_idx;
1902 		atomic_set(&mle->woken, 1);
1903 		wake_up(&mle->wq);
1904 		spin_unlock(&mle->spinlock);
1905 
1906 		if (res) {
1907 			int wake = 0;
1908 			spin_lock(&res->spinlock);
1909 			if (mle->type == DLM_MLE_MIGRATION) {
1910 				mlog(0, "finishing off migration of lockres %.*s, "
1911 			     		"from %u to %u\n",
1912 			       		res->lockname.len, res->lockname.name,
1913 			       		dlm->node_num, mle->new_master);
1914 				res->state &= ~DLM_LOCK_RES_MIGRATING;
1915 				wake = 1;
1916 				dlm_change_lockres_owner(dlm, res, mle->new_master);
1917 				BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1918 			} else {
1919 				dlm_change_lockres_owner(dlm, res, mle->master);
1920 			}
1921 			spin_unlock(&res->spinlock);
1922 			have_lockres_ref = 1;
1923 			if (wake)
1924 				wake_up(&res->wq);
1925 		}
1926 
1927 		/* master is known, detach if not already detached.
1928 		 * ensures that only one assert_master call will happen
1929 		 * on this mle. */
1930 		spin_lock(&dlm->spinlock);
1931 		spin_lock(&dlm->master_lock);
1932 
1933 		rr = atomic_read(&mle->mle_refs.refcount);
1934 		if (mle->inuse > 0) {
1935 			if (extra_ref && rr < 3)
1936 				err = 1;
1937 			else if (!extra_ref && rr < 2)
1938 				err = 1;
1939 		} else {
1940 			if (extra_ref && rr < 2)
1941 				err = 1;
1942 			else if (!extra_ref && rr < 1)
1943 				err = 1;
1944 		}
1945 		if (err) {
1946 			mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
1947 			     "that will mess up this node, refs=%d, extra=%d, "
1948 			     "inuse=%d\n", dlm->name, namelen, name,
1949 			     assert->node_idx, rr, extra_ref, mle->inuse);
1950 			dlm_print_one_mle(mle);
1951 		}
1952 		__dlm_unlink_mle(dlm, mle);
1953 		__dlm_mle_detach_hb_events(dlm, mle);
1954 		__dlm_put_mle(mle);
1955 		if (extra_ref) {
1956 			/* the assert master message now balances the extra
1957 		 	 * ref given by the master / migration request message.
1958 		 	 * if this is the last put, it will be removed
1959 		 	 * from the list. */
1960 			__dlm_put_mle(mle);
1961 		}
1962 		spin_unlock(&dlm->master_lock);
1963 		spin_unlock(&dlm->spinlock);
1964 	} else if (res) {
1965 		if (res->owner != assert->node_idx) {
1966 			mlog(0, "assert_master from %u, but current "
1967 			     "owner is %u (%.*s), no mle\n", assert->node_idx,
1968 			     res->owner, namelen, name);
1969 		}
1970 	}
1971 
1972 done:
1973 	ret = 0;
1974 	if (res) {
1975 		spin_lock(&res->spinlock);
1976 		res->state |= DLM_LOCK_RES_SETREF_INPROG;
1977 		spin_unlock(&res->spinlock);
1978 		*ret_data = (void *)res;
1979 	}
1980 	dlm_put(dlm);
1981 	if (master_request) {
1982 		mlog(0, "need to tell master to reassert\n");
1983 		/* positive. negative would shoot down the node. */
1984 		ret |= DLM_ASSERT_RESPONSE_REASSERT;
1985 		if (!have_lockres_ref) {
1986 			mlog(ML_ERROR, "strange, got assert from %u, MASTER "
1987 			     "mle present here for %s:%.*s, but no lockres!\n",
1988 			     assert->node_idx, dlm->name, namelen, name);
1989 		}
1990 	}
1991 	if (have_lockres_ref) {
1992 		/* let the master know we have a reference to the lockres */
1993 		ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
1994 		mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
1995 		     dlm->name, namelen, name, assert->node_idx);
1996 	}
1997 	return ret;
1998 
1999 kill:
2000 	/* kill the caller! */
2001 	mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
2002 	     "and killing the other node now!  This node is OK and can continue.\n");
2003 	__dlm_print_one_lock_resource(res);
2004 	spin_unlock(&res->spinlock);
2005 	spin_unlock(&dlm->spinlock);
2006 	*ret_data = (void *)res;
2007 	dlm_put(dlm);
2008 	return -EINVAL;
2009 }
2010 
2011 void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
2012 {
2013 	struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
2014 
2015 	if (ret_data) {
2016 		spin_lock(&res->spinlock);
2017 		res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
2018 		spin_unlock(&res->spinlock);
2019 		wake_up(&res->wq);
2020 		dlm_lockres_put(res);
2021 	}
2022 	return;
2023 }
2024 
2025 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
2026 			       struct dlm_lock_resource *res,
2027 			       int ignore_higher, u8 request_from, u32 flags)
2028 {
2029 	struct dlm_work_item *item;
2030 	item = kzalloc(sizeof(*item), GFP_NOFS);
2031 	if (!item)
2032 		return -ENOMEM;
2033 
2034 
2035 	/* queue up work for dlm_assert_master_worker */
2036 	dlm_grab(dlm);  /* get an extra ref for the work item */
2037 	dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
2038 	item->u.am.lockres = res; /* already have a ref */
2039 	/* can optionally ignore node numbers higher than this node */
2040 	item->u.am.ignore_higher = ignore_higher;
2041 	item->u.am.request_from = request_from;
2042 	item->u.am.flags = flags;
2043 
2044 	if (ignore_higher)
2045 		mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
2046 		     res->lockname.name);
2047 
2048 	spin_lock(&dlm->work_lock);
2049 	list_add_tail(&item->list, &dlm->work_list);
2050 	spin_unlock(&dlm->work_lock);
2051 
2052 	queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2053 	return 0;
2054 }
2055 
2056 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
2057 {
2058 	struct dlm_ctxt *dlm = data;
2059 	int ret = 0;
2060 	struct dlm_lock_resource *res;
2061 	unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
2062 	int ignore_higher;
2063 	int bit;
2064 	u8 request_from;
2065 	u32 flags;
2066 
2067 	dlm = item->dlm;
2068 	res = item->u.am.lockres;
2069 	ignore_higher = item->u.am.ignore_higher;
2070 	request_from = item->u.am.request_from;
2071 	flags = item->u.am.flags;
2072 
2073 	spin_lock(&dlm->spinlock);
2074 	memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
2075 	spin_unlock(&dlm->spinlock);
2076 
2077 	clear_bit(dlm->node_num, nodemap);
2078 	if (ignore_higher) {
2079 		/* if is this just to clear up mles for nodes below
2080 		 * this node, do not send the message to the original
2081 		 * caller or any node number higher than this */
2082 		clear_bit(request_from, nodemap);
2083 		bit = dlm->node_num;
2084 		while (1) {
2085 			bit = find_next_bit(nodemap, O2NM_MAX_NODES,
2086 					    bit+1);
2087 		       	if (bit >= O2NM_MAX_NODES)
2088 				break;
2089 			clear_bit(bit, nodemap);
2090 		}
2091 	}
2092 
2093 	/*
2094 	 * If we're migrating this lock to someone else, we are no
2095 	 * longer allowed to assert out own mastery.  OTOH, we need to
2096 	 * prevent migration from starting while we're still asserting
2097 	 * our dominance.  The reserved ast delays migration.
2098 	 */
2099 	spin_lock(&res->spinlock);
2100 	if (res->state & DLM_LOCK_RES_MIGRATING) {
2101 		mlog(0, "Someone asked us to assert mastery, but we're "
2102 		     "in the middle of migration.  Skipping assert, "
2103 		     "the new master will handle that.\n");
2104 		spin_unlock(&res->spinlock);
2105 		goto put;
2106 	} else
2107 		__dlm_lockres_reserve_ast(res);
2108 	spin_unlock(&res->spinlock);
2109 
2110 	/* this call now finishes out the nodemap
2111 	 * even if one or more nodes die */
2112 	mlog(0, "worker about to master %.*s here, this=%u\n",
2113 		     res->lockname.len, res->lockname.name, dlm->node_num);
2114 	ret = dlm_do_assert_master(dlm, res, nodemap, flags);
2115 	if (ret < 0) {
2116 		/* no need to restart, we are done */
2117 		if (!dlm_is_host_down(ret))
2118 			mlog_errno(ret);
2119 	}
2120 
2121 	/* Ok, we've asserted ourselves.  Let's let migration start. */
2122 	dlm_lockres_release_ast(dlm, res);
2123 
2124 put:
2125 	dlm_lockres_put(res);
2126 
2127 	mlog(0, "finished with dlm_assert_master_worker\n");
2128 }
2129 
2130 /* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
2131  * We cannot wait for node recovery to complete to begin mastering this
2132  * lockres because this lockres is used to kick off recovery! ;-)
2133  * So, do a pre-check on all living nodes to see if any of those nodes
2134  * think that $RECOVERY is currently mastered by a dead node.  If so,
2135  * we wait a short time to allow that node to get notified by its own
2136  * heartbeat stack, then check again.  All $RECOVERY lock resources
2137  * mastered by dead nodes are purged when the hearbeat callback is
2138  * fired, so we can know for sure that it is safe to continue once
2139  * the node returns a live node or no node.  */
2140 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2141 				       struct dlm_lock_resource *res)
2142 {
2143 	struct dlm_node_iter iter;
2144 	int nodenum;
2145 	int ret = 0;
2146 	u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
2147 
2148 	spin_lock(&dlm->spinlock);
2149 	dlm_node_iter_init(dlm->domain_map, &iter);
2150 	spin_unlock(&dlm->spinlock);
2151 
2152 	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2153 		/* do not send to self */
2154 		if (nodenum == dlm->node_num)
2155 			continue;
2156 		ret = dlm_do_master_requery(dlm, res, nodenum, &master);
2157 		if (ret < 0) {
2158 			mlog_errno(ret);
2159 			if (!dlm_is_host_down(ret))
2160 				BUG();
2161 			/* host is down, so answer for that node would be
2162 			 * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
2163 			ret = 0;
2164 		}
2165 
2166 		if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
2167 			/* check to see if this master is in the recovery map */
2168 			spin_lock(&dlm->spinlock);
2169 			if (test_bit(master, dlm->recovery_map)) {
2170 				mlog(ML_NOTICE, "%s: node %u has not seen "
2171 				     "node %u go down yet, and thinks the "
2172 				     "dead node is mastering the recovery "
2173 				     "lock.  must wait.\n", dlm->name,
2174 				     nodenum, master);
2175 				ret = -EAGAIN;
2176 			}
2177 			spin_unlock(&dlm->spinlock);
2178 			mlog(0, "%s: reco lock master is %u\n", dlm->name,
2179 			     master);
2180 			break;
2181 		}
2182 	}
2183 	return ret;
2184 }
2185 
2186 /*
2187  * DLM_DEREF_LOCKRES_MSG
2188  */
2189 
2190 int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2191 {
2192 	struct dlm_deref_lockres deref;
2193 	int ret = 0, r;
2194 	const char *lockname;
2195 	unsigned int namelen;
2196 
2197 	lockname = res->lockname.name;
2198 	namelen = res->lockname.len;
2199 	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2200 
2201 	mlog(0, "%s:%.*s: sending deref to %d\n",
2202 	     dlm->name, namelen, lockname, res->owner);
2203 	memset(&deref, 0, sizeof(deref));
2204 	deref.node_idx = dlm->node_num;
2205 	deref.namelen = namelen;
2206 	memcpy(deref.name, lockname, namelen);
2207 
2208 	ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
2209 				 &deref, sizeof(deref), res->owner, &r);
2210 	if (ret < 0)
2211 		mlog_errno(ret);
2212 	else if (r < 0) {
2213 		/* BAD.  other node says I did not have a ref. */
2214 		mlog(ML_ERROR,"while dropping ref on %s:%.*s "
2215 		    "(master=%u) got %d.\n", dlm->name, namelen,
2216 		    lockname, res->owner, r);
2217 		dlm_print_one_lock_resource(res);
2218 		BUG();
2219 	}
2220 	return ret;
2221 }
2222 
2223 int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
2224 			      void **ret_data)
2225 {
2226 	struct dlm_ctxt *dlm = data;
2227 	struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
2228 	struct dlm_lock_resource *res = NULL;
2229 	char *name;
2230 	unsigned int namelen;
2231 	int ret = -EINVAL;
2232 	u8 node;
2233 	unsigned int hash;
2234 	struct dlm_work_item *item;
2235 	int cleared = 0;
2236 	int dispatch = 0;
2237 
2238 	if (!dlm_grab(dlm))
2239 		return 0;
2240 
2241 	name = deref->name;
2242 	namelen = deref->namelen;
2243 	node = deref->node_idx;
2244 
2245 	if (namelen > DLM_LOCKID_NAME_MAX) {
2246 		mlog(ML_ERROR, "Invalid name length!");
2247 		goto done;
2248 	}
2249 	if (deref->node_idx >= O2NM_MAX_NODES) {
2250 		mlog(ML_ERROR, "Invalid node number: %u\n", node);
2251 		goto done;
2252 	}
2253 
2254 	hash = dlm_lockid_hash(name, namelen);
2255 
2256 	spin_lock(&dlm->spinlock);
2257 	res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2258 	if (!res) {
2259 		spin_unlock(&dlm->spinlock);
2260 		mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2261 		     dlm->name, namelen, name);
2262 		goto done;
2263 	}
2264 	spin_unlock(&dlm->spinlock);
2265 
2266 	spin_lock(&res->spinlock);
2267 	if (res->state & DLM_LOCK_RES_SETREF_INPROG)
2268 		dispatch = 1;
2269 	else {
2270 		BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2271 		if (test_bit(node, res->refmap)) {
2272 			dlm_lockres_clear_refmap_bit(node, res);
2273 			cleared = 1;
2274 		}
2275 	}
2276 	spin_unlock(&res->spinlock);
2277 
2278 	if (!dispatch) {
2279 		if (cleared)
2280 			dlm_lockres_calc_usage(dlm, res);
2281 		else {
2282 			mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2283 		     	"but it is already dropped!\n", dlm->name,
2284 		     	res->lockname.len, res->lockname.name, node);
2285 			dlm_print_one_lock_resource(res);
2286 		}
2287 		ret = 0;
2288 		goto done;
2289 	}
2290 
2291 	item = kzalloc(sizeof(*item), GFP_NOFS);
2292 	if (!item) {
2293 		ret = -ENOMEM;
2294 		mlog_errno(ret);
2295 		goto done;
2296 	}
2297 
2298 	dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
2299 	item->u.dl.deref_res = res;
2300 	item->u.dl.deref_node = node;
2301 
2302 	spin_lock(&dlm->work_lock);
2303 	list_add_tail(&item->list, &dlm->work_list);
2304 	spin_unlock(&dlm->work_lock);
2305 
2306 	queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2307 	return 0;
2308 
2309 done:
2310 	if (res)
2311 		dlm_lockres_put(res);
2312 	dlm_put(dlm);
2313 
2314 	return ret;
2315 }
2316 
2317 static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
2318 {
2319 	struct dlm_ctxt *dlm;
2320 	struct dlm_lock_resource *res;
2321 	u8 node;
2322 	u8 cleared = 0;
2323 
2324 	dlm = item->dlm;
2325 	res = item->u.dl.deref_res;
2326 	node = item->u.dl.deref_node;
2327 
2328 	spin_lock(&res->spinlock);
2329 	BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2330 	if (test_bit(node, res->refmap)) {
2331 		__dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
2332 		dlm_lockres_clear_refmap_bit(node, res);
2333 		cleared = 1;
2334 	}
2335 	spin_unlock(&res->spinlock);
2336 
2337 	if (cleared) {
2338 		mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
2339 		     dlm->name, res->lockname.len, res->lockname.name, node);
2340 		dlm_lockres_calc_usage(dlm, res);
2341 	} else {
2342 		mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2343 		     "but it is already dropped!\n", dlm->name,
2344 		     res->lockname.len, res->lockname.name, node);
2345 		dlm_print_one_lock_resource(res);
2346 	}
2347 
2348 	dlm_lockres_put(res);
2349 }
2350 
2351 /* Checks whether the lockres can be migrated. Returns 0 if yes, < 0
2352  * if not. If 0, numlocks is set to the number of locks in the lockres.
2353  */
2354 static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm,
2355 				      struct dlm_lock_resource *res,
2356 				      int *numlocks)
2357 {
2358 	int ret;
2359 	int i;
2360 	int count = 0;
2361 	struct list_head *queue;
2362 	struct dlm_lock *lock;
2363 
2364 	assert_spin_locked(&res->spinlock);
2365 
2366 	ret = -EINVAL;
2367 	if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
2368 		mlog(0, "cannot migrate lockres with unknown owner!\n");
2369 		goto leave;
2370 	}
2371 
2372 	if (res->owner != dlm->node_num) {
2373 		mlog(0, "cannot migrate lockres this node doesn't own!\n");
2374 		goto leave;
2375 	}
2376 
2377 	ret = 0;
2378 	queue = &res->granted;
2379 	for (i = 0; i < 3; i++) {
2380 		list_for_each_entry(lock, queue, list) {
2381 			++count;
2382 			if (lock->ml.node == dlm->node_num) {
2383 				mlog(0, "found a lock owned by this node still "
2384 				     "on the %s queue!  will not migrate this "
2385 				     "lockres\n", (i == 0 ? "granted" :
2386 						   (i == 1 ? "converting" :
2387 						    "blocked")));
2388 				ret = -ENOTEMPTY;
2389 				goto leave;
2390 			}
2391 		}
2392 		queue++;
2393 	}
2394 
2395 	*numlocks = count;
2396 	mlog(0, "migrateable lockres having %d locks\n", *numlocks);
2397 
2398 leave:
2399 	return ret;
2400 }
2401 
2402 /*
2403  * DLM_MIGRATE_LOCKRES
2404  */
2405 
2406 
2407 static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
2408 			       struct dlm_lock_resource *res,
2409 			       u8 target)
2410 {
2411 	struct dlm_master_list_entry *mle = NULL;
2412 	struct dlm_master_list_entry *oldmle = NULL;
2413  	struct dlm_migratable_lockres *mres = NULL;
2414 	int ret = 0;
2415 	const char *name;
2416 	unsigned int namelen;
2417 	int mle_added = 0;
2418 	int numlocks;
2419 	int wake = 0;
2420 
2421 	if (!dlm_grab(dlm))
2422 		return -EINVAL;
2423 
2424 	name = res->lockname.name;
2425 	namelen = res->lockname.len;
2426 
2427 	mlog(0, "migrating %.*s to %u\n", namelen, name, target);
2428 
2429 	/*
2430 	 * ensure this lockres is a proper candidate for migration
2431 	 */
2432 	spin_lock(&res->spinlock);
2433 	ret = dlm_is_lockres_migrateable(dlm, res, &numlocks);
2434 	if (ret < 0) {
2435 		spin_unlock(&res->spinlock);
2436 		goto leave;
2437 	}
2438 	spin_unlock(&res->spinlock);
2439 
2440 	/* no work to do */
2441 	if (numlocks == 0) {
2442 		mlog(0, "no locks were found on this lockres! done!\n");
2443 		goto leave;
2444 	}
2445 
2446 	/*
2447 	 * preallocate up front
2448 	 * if this fails, abort
2449 	 */
2450 
2451 	ret = -ENOMEM;
2452 	mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
2453 	if (!mres) {
2454 		mlog_errno(ret);
2455 		goto leave;
2456 	}
2457 
2458 	mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
2459 								GFP_NOFS);
2460 	if (!mle) {
2461 		mlog_errno(ret);
2462 		goto leave;
2463 	}
2464 	ret = 0;
2465 
2466 	/*
2467 	 * find a node to migrate the lockres to
2468 	 */
2469 
2470 	mlog(0, "picking a migration node\n");
2471 	spin_lock(&dlm->spinlock);
2472 	/* pick a new node */
2473 	if (!test_bit(target, dlm->domain_map) ||
2474 	    target >= O2NM_MAX_NODES) {
2475 		target = dlm_pick_migration_target(dlm, res);
2476 	}
2477 	mlog(0, "node %u chosen for migration\n", target);
2478 
2479 	if (target >= O2NM_MAX_NODES ||
2480 	    !test_bit(target, dlm->domain_map)) {
2481 		/* target chosen is not alive */
2482 		ret = -EINVAL;
2483 	}
2484 
2485 	if (ret) {
2486 		spin_unlock(&dlm->spinlock);
2487 		goto fail;
2488 	}
2489 
2490 	mlog(0, "continuing with target = %u\n", target);
2491 
2492 	/*
2493 	 * clear any existing master requests and
2494 	 * add the migration mle to the list
2495 	 */
2496 	spin_lock(&dlm->master_lock);
2497 	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
2498 				    namelen, target, dlm->node_num);
2499 	spin_unlock(&dlm->master_lock);
2500 	spin_unlock(&dlm->spinlock);
2501 
2502 	if (ret == -EEXIST) {
2503 		mlog(0, "another process is already migrating it\n");
2504 		goto fail;
2505 	}
2506 	mle_added = 1;
2507 
2508 	/*
2509 	 * set the MIGRATING flag and flush asts
2510 	 * if we fail after this we need to re-dirty the lockres
2511 	 */
2512 	if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
2513 		mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
2514 		     "the target went down.\n", res->lockname.len,
2515 		     res->lockname.name, target);
2516 		spin_lock(&res->spinlock);
2517 		res->state &= ~DLM_LOCK_RES_MIGRATING;
2518 		wake = 1;
2519 		spin_unlock(&res->spinlock);
2520 		ret = -EINVAL;
2521 	}
2522 
2523 fail:
2524 	if (oldmle) {
2525 		/* master is known, detach if not already detached */
2526 		dlm_mle_detach_hb_events(dlm, oldmle);
2527 		dlm_put_mle(oldmle);
2528 	}
2529 
2530 	if (ret < 0) {
2531 		if (mle_added) {
2532 			dlm_mle_detach_hb_events(dlm, mle);
2533 			dlm_put_mle(mle);
2534 		} else if (mle) {
2535 			kmem_cache_free(dlm_mle_cache, mle);
2536 		}
2537 		goto leave;
2538 	}
2539 
2540 	/*
2541 	 * at this point, we have a migration target, an mle
2542 	 * in the master list, and the MIGRATING flag set on
2543 	 * the lockres
2544 	 */
2545 
2546 	/* now that remote nodes are spinning on the MIGRATING flag,
2547 	 * ensure that all assert_master work is flushed. */
2548 	flush_workqueue(dlm->dlm_worker);
2549 
2550 	/* get an extra reference on the mle.
2551 	 * otherwise the assert_master from the new
2552 	 * master will destroy this.
2553 	 * also, make sure that all callers of dlm_get_mle
2554 	 * take both dlm->spinlock and dlm->master_lock */
2555 	spin_lock(&dlm->spinlock);
2556 	spin_lock(&dlm->master_lock);
2557 	dlm_get_mle_inuse(mle);
2558 	spin_unlock(&dlm->master_lock);
2559 	spin_unlock(&dlm->spinlock);
2560 
2561 	/* notify new node and send all lock state */
2562 	/* call send_one_lockres with migration flag.
2563 	 * this serves as notice to the target node that a
2564 	 * migration is starting. */
2565 	ret = dlm_send_one_lockres(dlm, res, mres, target,
2566 				   DLM_MRES_MIGRATION);
2567 
2568 	if (ret < 0) {
2569 		mlog(0, "migration to node %u failed with %d\n",
2570 		     target, ret);
2571 		/* migration failed, detach and clean up mle */
2572 		dlm_mle_detach_hb_events(dlm, mle);
2573 		dlm_put_mle(mle);
2574 		dlm_put_mle_inuse(mle);
2575 		spin_lock(&res->spinlock);
2576 		res->state &= ~DLM_LOCK_RES_MIGRATING;
2577 		wake = 1;
2578 		spin_unlock(&res->spinlock);
2579 		goto leave;
2580 	}
2581 
2582 	/* at this point, the target sends a message to all nodes,
2583 	 * (using dlm_do_migrate_request).  this node is skipped since
2584 	 * we had to put an mle in the list to begin the process.  this
2585 	 * node now waits for target to do an assert master.  this node
2586 	 * will be the last one notified, ensuring that the migration
2587 	 * is complete everywhere.  if the target dies while this is
2588 	 * going on, some nodes could potentially see the target as the
2589 	 * master, so it is important that my recovery finds the migration
2590 	 * mle and sets the master to UNKNONWN. */
2591 
2592 
2593 	/* wait for new node to assert master */
2594 	while (1) {
2595 		ret = wait_event_interruptible_timeout(mle->wq,
2596 					(atomic_read(&mle->woken) == 1),
2597 					msecs_to_jiffies(5000));
2598 
2599 		if (ret >= 0) {
2600 		       	if (atomic_read(&mle->woken) == 1 ||
2601 			    res->owner == target)
2602 				break;
2603 
2604 			mlog(0, "%s:%.*s: timed out during migration\n",
2605 			     dlm->name, res->lockname.len, res->lockname.name);
2606 			/* avoid hang during shutdown when migrating lockres
2607 			 * to a node which also goes down */
2608 			if (dlm_is_node_dead(dlm, target)) {
2609 				mlog(0, "%s:%.*s: expected migration "
2610 				     "target %u is no longer up, restarting\n",
2611 				     dlm->name, res->lockname.len,
2612 				     res->lockname.name, target);
2613 				ret = -EINVAL;
2614 				/* migration failed, detach and clean up mle */
2615 				dlm_mle_detach_hb_events(dlm, mle);
2616 				dlm_put_mle(mle);
2617 				dlm_put_mle_inuse(mle);
2618 				spin_lock(&res->spinlock);
2619 				res->state &= ~DLM_LOCK_RES_MIGRATING;
2620 				wake = 1;
2621 				spin_unlock(&res->spinlock);
2622 				goto leave;
2623 			}
2624 		} else
2625 			mlog(0, "%s:%.*s: caught signal during migration\n",
2626 			     dlm->name, res->lockname.len, res->lockname.name);
2627 	}
2628 
2629 	/* all done, set the owner, clear the flag */
2630 	spin_lock(&res->spinlock);
2631 	dlm_set_lockres_owner(dlm, res, target);
2632 	res->state &= ~DLM_LOCK_RES_MIGRATING;
2633 	dlm_remove_nonlocal_locks(dlm, res);
2634 	spin_unlock(&res->spinlock);
2635 	wake_up(&res->wq);
2636 
2637 	/* master is known, detach if not already detached */
2638 	dlm_mle_detach_hb_events(dlm, mle);
2639 	dlm_put_mle_inuse(mle);
2640 	ret = 0;
2641 
2642 	dlm_lockres_calc_usage(dlm, res);
2643 
2644 leave:
2645 	/* re-dirty the lockres if we failed */
2646 	if (ret < 0)
2647 		dlm_kick_thread(dlm, res);
2648 
2649 	/* wake up waiters if the MIGRATING flag got set
2650 	 * but migration failed */
2651 	if (wake)
2652 		wake_up(&res->wq);
2653 
2654 	/* TODO: cleanup */
2655 	if (mres)
2656 		free_page((unsigned long)mres);
2657 
2658 	dlm_put(dlm);
2659 
2660 	mlog(0, "returning %d\n", ret);
2661 	return ret;
2662 }
2663 
2664 #define DLM_MIGRATION_RETRY_MS  100
2665 
2666 /* Should be called only after beginning the domain leave process.
2667  * There should not be any remaining locks on nonlocal lock resources,
2668  * and there should be no local locks left on locally mastered resources.
2669  *
2670  * Called with the dlm spinlock held, may drop it to do migration, but
2671  * will re-acquire before exit.
2672  *
2673  * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped */
2674 int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2675 {
2676 	int ret;
2677 	int lock_dropped = 0;
2678 	int numlocks;
2679 
2680 	spin_lock(&res->spinlock);
2681 	if (res->owner != dlm->node_num) {
2682 		if (!__dlm_lockres_unused(res)) {
2683 			mlog(ML_ERROR, "%s:%.*s: this node is not master, "
2684 			     "trying to free this but locks remain\n",
2685 			     dlm->name, res->lockname.len, res->lockname.name);
2686 		}
2687 		spin_unlock(&res->spinlock);
2688 		goto leave;
2689 	}
2690 
2691 	/* No need to migrate a lockres having no locks */
2692 	ret = dlm_is_lockres_migrateable(dlm, res, &numlocks);
2693 	if (ret >= 0 && numlocks == 0) {
2694 		spin_unlock(&res->spinlock);
2695 		goto leave;
2696 	}
2697 	spin_unlock(&res->spinlock);
2698 
2699 	/* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
2700 	spin_unlock(&dlm->spinlock);
2701 	lock_dropped = 1;
2702 	while (1) {
2703 		ret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES);
2704 		if (ret >= 0)
2705 			break;
2706 		if (ret == -ENOTEMPTY) {
2707 			mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
2708 		     		res->lockname.len, res->lockname.name);
2709 			BUG();
2710 		}
2711 
2712 		mlog(0, "lockres %.*s: migrate failed, "
2713 		     "retrying\n", res->lockname.len,
2714 		     res->lockname.name);
2715 		msleep(DLM_MIGRATION_RETRY_MS);
2716 	}
2717 	spin_lock(&dlm->spinlock);
2718 leave:
2719 	return lock_dropped;
2720 }
2721 
2722 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2723 {
2724 	int ret;
2725 	spin_lock(&dlm->ast_lock);
2726 	spin_lock(&lock->spinlock);
2727 	ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
2728 	spin_unlock(&lock->spinlock);
2729 	spin_unlock(&dlm->ast_lock);
2730 	return ret;
2731 }
2732 
2733 static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2734 				     struct dlm_lock_resource *res,
2735 				     u8 mig_target)
2736 {
2737 	int can_proceed;
2738 	spin_lock(&res->spinlock);
2739 	can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2740 	spin_unlock(&res->spinlock);
2741 
2742 	/* target has died, so make the caller break out of the
2743 	 * wait_event, but caller must recheck the domain_map */
2744 	spin_lock(&dlm->spinlock);
2745 	if (!test_bit(mig_target, dlm->domain_map))
2746 		can_proceed = 1;
2747 	spin_unlock(&dlm->spinlock);
2748 	return can_proceed;
2749 }
2750 
2751 static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
2752 				struct dlm_lock_resource *res)
2753 {
2754 	int ret;
2755 	spin_lock(&res->spinlock);
2756 	ret = !!(res->state & DLM_LOCK_RES_DIRTY);
2757 	spin_unlock(&res->spinlock);
2758 	return ret;
2759 }
2760 
2761 
2762 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2763 				       struct dlm_lock_resource *res,
2764 				       u8 target)
2765 {
2766 	int ret = 0;
2767 
2768 	mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2769 	       res->lockname.len, res->lockname.name, dlm->node_num,
2770 	       target);
2771 	/* need to set MIGRATING flag on lockres.  this is done by
2772 	 * ensuring that all asts have been flushed for this lockres. */
2773 	spin_lock(&res->spinlock);
2774 	BUG_ON(res->migration_pending);
2775 	res->migration_pending = 1;
2776 	/* strategy is to reserve an extra ast then release
2777 	 * it below, letting the release do all of the work */
2778 	__dlm_lockres_reserve_ast(res);
2779 	spin_unlock(&res->spinlock);
2780 
2781 	/* now flush all the pending asts */
2782 	dlm_kick_thread(dlm, res);
2783 	/* before waiting on DIRTY, block processes which may
2784 	 * try to dirty the lockres before MIGRATING is set */
2785 	spin_lock(&res->spinlock);
2786 	BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
2787 	res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
2788 	spin_unlock(&res->spinlock);
2789 	/* now wait on any pending asts and the DIRTY state */
2790 	wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2791 	dlm_lockres_release_ast(dlm, res);
2792 
2793 	mlog(0, "about to wait on migration_wq, dirty=%s\n",
2794 	       res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
2795 	/* if the extra ref we just put was the final one, this
2796 	 * will pass thru immediately.  otherwise, we need to wait
2797 	 * for the last ast to finish. */
2798 again:
2799 	ret = wait_event_interruptible_timeout(dlm->migration_wq,
2800 		   dlm_migration_can_proceed(dlm, res, target),
2801 		   msecs_to_jiffies(1000));
2802 	if (ret < 0) {
2803 		mlog(0, "woken again: migrating? %s, dead? %s\n",
2804 		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2805 		       test_bit(target, dlm->domain_map) ? "no":"yes");
2806 	} else {
2807 		mlog(0, "all is well: migrating? %s, dead? %s\n",
2808 		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2809 		       test_bit(target, dlm->domain_map) ? "no":"yes");
2810 	}
2811 	if (!dlm_migration_can_proceed(dlm, res, target)) {
2812 		mlog(0, "trying again...\n");
2813 		goto again;
2814 	}
2815 	/* now that we are sure the MIGRATING state is there, drop
2816 	 * the unneded state which blocked threads trying to DIRTY */
2817 	spin_lock(&res->spinlock);
2818 	BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
2819 	BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
2820 	res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
2821 	spin_unlock(&res->spinlock);
2822 
2823 	/* did the target go down or die? */
2824 	spin_lock(&dlm->spinlock);
2825 	if (!test_bit(target, dlm->domain_map)) {
2826 		mlog(ML_ERROR, "aha. migration target %u just went down\n",
2827 		     target);
2828 		ret = -EHOSTDOWN;
2829 	}
2830 	spin_unlock(&dlm->spinlock);
2831 
2832 	/*
2833 	 * at this point:
2834 	 *
2835 	 *   o the DLM_LOCK_RES_MIGRATING flag is set
2836 	 *   o there are no pending asts on this lockres
2837 	 *   o all processes trying to reserve an ast on this
2838 	 *     lockres must wait for the MIGRATING flag to clear
2839 	 */
2840 	return ret;
2841 }
2842 
2843 /* last step in the migration process.
2844  * original master calls this to free all of the dlm_lock
2845  * structures that used to be for other nodes. */
2846 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2847 				      struct dlm_lock_resource *res)
2848 {
2849 	struct list_head *queue = &res->granted;
2850 	int i, bit;
2851 	struct dlm_lock *lock, *next;
2852 
2853 	assert_spin_locked(&res->spinlock);
2854 
2855 	BUG_ON(res->owner == dlm->node_num);
2856 
2857 	for (i=0; i<3; i++) {
2858 		list_for_each_entry_safe(lock, next, queue, list) {
2859 			if (lock->ml.node != dlm->node_num) {
2860 				mlog(0, "putting lock for node %u\n",
2861 				     lock->ml.node);
2862 				/* be extra careful */
2863 				BUG_ON(!list_empty(&lock->ast_list));
2864 				BUG_ON(!list_empty(&lock->bast_list));
2865 				BUG_ON(lock->ast_pending);
2866 				BUG_ON(lock->bast_pending);
2867 				dlm_lockres_clear_refmap_bit(lock->ml.node, res);
2868 				list_del_init(&lock->list);
2869 				dlm_lock_put(lock);
2870 				/* In a normal unlock, we would have added a
2871 				 * DLM_UNLOCK_FREE_LOCK action. Force it. */
2872 				dlm_lock_put(lock);
2873 			}
2874 		}
2875 		queue++;
2876 	}
2877 	bit = 0;
2878 	while (1) {
2879 		bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
2880 		if (bit >= O2NM_MAX_NODES)
2881 			break;
2882 		/* do not clear the local node reference, if there is a
2883 		 * process holding this, let it drop the ref itself */
2884 		if (bit != dlm->node_num) {
2885 			mlog(0, "%s:%.*s: node %u had a ref to this "
2886 			     "migrating lockres, clearing\n", dlm->name,
2887 			     res->lockname.len, res->lockname.name, bit);
2888 			dlm_lockres_clear_refmap_bit(bit, res);
2889 		}
2890 		bit++;
2891 	}
2892 }
2893 
2894 /* for now this is not too intelligent.  we will
2895  * need stats to make this do the right thing.
2896  * this just finds the first lock on one of the
2897  * queues and uses that node as the target. */
2898 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2899 				    struct dlm_lock_resource *res)
2900 {
2901 	int i;
2902 	struct list_head *queue = &res->granted;
2903 	struct dlm_lock *lock;
2904 	int nodenum;
2905 
2906 	assert_spin_locked(&dlm->spinlock);
2907 
2908 	spin_lock(&res->spinlock);
2909 	for (i=0; i<3; i++) {
2910 		list_for_each_entry(lock, queue, list) {
2911 			/* up to the caller to make sure this node
2912 			 * is alive */
2913 			if (lock->ml.node != dlm->node_num) {
2914 				spin_unlock(&res->spinlock);
2915 				return lock->ml.node;
2916 			}
2917 		}
2918 		queue++;
2919 	}
2920 	spin_unlock(&res->spinlock);
2921 	mlog(0, "have not found a suitable target yet! checking domain map\n");
2922 
2923 	/* ok now we're getting desperate.  pick anyone alive. */
2924 	nodenum = -1;
2925 	while (1) {
2926 		nodenum = find_next_bit(dlm->domain_map,
2927 					O2NM_MAX_NODES, nodenum+1);
2928 		mlog(0, "found %d in domain map\n", nodenum);
2929 		if (nodenum >= O2NM_MAX_NODES)
2930 			break;
2931 		if (nodenum != dlm->node_num) {
2932 			mlog(0, "picking %d\n", nodenum);
2933 			return nodenum;
2934 		}
2935 	}
2936 
2937 	mlog(0, "giving up.  no master to migrate to\n");
2938 	return DLM_LOCK_RES_OWNER_UNKNOWN;
2939 }
2940 
2941 
2942 
2943 /* this is called by the new master once all lockres
2944  * data has been received */
2945 static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2946 				  struct dlm_lock_resource *res,
2947 				  u8 master, u8 new_master,
2948 				  struct dlm_node_iter *iter)
2949 {
2950 	struct dlm_migrate_request migrate;
2951 	int ret, skip, status = 0;
2952 	int nodenum;
2953 
2954 	memset(&migrate, 0, sizeof(migrate));
2955 	migrate.namelen = res->lockname.len;
2956 	memcpy(migrate.name, res->lockname.name, migrate.namelen);
2957 	migrate.new_master = new_master;
2958 	migrate.master = master;
2959 
2960 	ret = 0;
2961 
2962 	/* send message to all nodes, except the master and myself */
2963 	while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
2964 		if (nodenum == master ||
2965 		    nodenum == new_master)
2966 			continue;
2967 
2968 		/* We could race exit domain. If exited, skip. */
2969 		spin_lock(&dlm->spinlock);
2970 		skip = (!test_bit(nodenum, dlm->domain_map));
2971 		spin_unlock(&dlm->spinlock);
2972 		if (skip) {
2973 			clear_bit(nodenum, iter->node_map);
2974 			continue;
2975 		}
2976 
2977 		ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
2978 					 &migrate, sizeof(migrate), nodenum,
2979 					 &status);
2980 		if (ret < 0) {
2981 			mlog(0, "migrate_request returned %d!\n", ret);
2982 			if (!dlm_is_host_down(ret)) {
2983 				mlog(ML_ERROR, "unhandled error=%d!\n", ret);
2984 				BUG();
2985 			}
2986 			clear_bit(nodenum, iter->node_map);
2987 			ret = 0;
2988 		} else if (status < 0) {
2989 			mlog(0, "migrate request (node %u) returned %d!\n",
2990 			     nodenum, status);
2991 			ret = status;
2992 		} else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
2993 			/* during the migration request we short-circuited
2994 			 * the mastery of the lockres.  make sure we have
2995 			 * a mastery ref for nodenum */
2996 			mlog(0, "%s:%.*s: need ref for node %u\n",
2997 			     dlm->name, res->lockname.len, res->lockname.name,
2998 			     nodenum);
2999 			spin_lock(&res->spinlock);
3000 			dlm_lockres_set_refmap_bit(nodenum, res);
3001 			spin_unlock(&res->spinlock);
3002 		}
3003 	}
3004 
3005 	if (ret < 0)
3006 		mlog_errno(ret);
3007 
3008 	mlog(0, "returning ret=%d\n", ret);
3009 	return ret;
3010 }
3011 
3012 
3013 /* if there is an existing mle for this lockres, we now know who the master is.
3014  * (the one who sent us *this* message) we can clear it up right away.
3015  * since the process that put the mle on the list still has a reference to it,
3016  * we can unhash it now, set the master and wake the process.  as a result,
3017  * we will have no mle in the list to start with.  now we can add an mle for
3018  * the migration and this should be the only one found for those scanning the
3019  * list.  */
3020 int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
3021 				void **ret_data)
3022 {
3023 	struct dlm_ctxt *dlm = data;
3024 	struct dlm_lock_resource *res = NULL;
3025 	struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
3026 	struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
3027 	const char *name;
3028 	unsigned int namelen, hash;
3029 	int ret = 0;
3030 
3031 	if (!dlm_grab(dlm))
3032 		return -EINVAL;
3033 
3034 	name = migrate->name;
3035 	namelen = migrate->namelen;
3036 	hash = dlm_lockid_hash(name, namelen);
3037 
3038 	/* preallocate.. if this fails, abort */
3039 	mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
3040 							 GFP_NOFS);
3041 
3042 	if (!mle) {
3043 		ret = -ENOMEM;
3044 		goto leave;
3045 	}
3046 
3047 	/* check for pre-existing lock */
3048 	spin_lock(&dlm->spinlock);
3049 	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
3050 	spin_lock(&dlm->master_lock);
3051 
3052 	if (res) {
3053 		spin_lock(&res->spinlock);
3054 		if (res->state & DLM_LOCK_RES_RECOVERING) {
3055 			/* if all is working ok, this can only mean that we got
3056 		 	* a migrate request from a node that we now see as
3057 		 	* dead.  what can we do here?  drop it to the floor? */
3058 			spin_unlock(&res->spinlock);
3059 			mlog(ML_ERROR, "Got a migrate request, but the "
3060 			     "lockres is marked as recovering!");
3061 			kmem_cache_free(dlm_mle_cache, mle);
3062 			ret = -EINVAL; /* need a better solution */
3063 			goto unlock;
3064 		}
3065 		res->state |= DLM_LOCK_RES_MIGRATING;
3066 		spin_unlock(&res->spinlock);
3067 	}
3068 
3069 	/* ignore status.  only nonzero status would BUG. */
3070 	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
3071 				    name, namelen,
3072 				    migrate->new_master,
3073 				    migrate->master);
3074 
3075 unlock:
3076 	spin_unlock(&dlm->master_lock);
3077 	spin_unlock(&dlm->spinlock);
3078 
3079 	if (oldmle) {
3080 		/* master is known, detach if not already detached */
3081 		dlm_mle_detach_hb_events(dlm, oldmle);
3082 		dlm_put_mle(oldmle);
3083 	}
3084 
3085 	if (res)
3086 		dlm_lockres_put(res);
3087 leave:
3088 	dlm_put(dlm);
3089 	return ret;
3090 }
3091 
3092 /* must be holding dlm->spinlock and dlm->master_lock
3093  * when adding a migration mle, we can clear any other mles
3094  * in the master list because we know with certainty that
3095  * the master is "master".  so we remove any old mle from
3096  * the list after setting it's master field, and then add
3097  * the new migration mle.  this way we can hold with the rule
3098  * of having only one mle for a given lock name at all times. */
3099 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
3100 				 struct dlm_lock_resource *res,
3101 				 struct dlm_master_list_entry *mle,
3102 				 struct dlm_master_list_entry **oldmle,
3103 				 const char *name, unsigned int namelen,
3104 				 u8 new_master, u8 master)
3105 {
3106 	int found;
3107 	int ret = 0;
3108 
3109 	*oldmle = NULL;
3110 
3111 	mlog_entry_void();
3112 
3113 	assert_spin_locked(&dlm->spinlock);
3114 	assert_spin_locked(&dlm->master_lock);
3115 
3116 	/* caller is responsible for any ref taken here on oldmle */
3117 	found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
3118 	if (found) {
3119 		struct dlm_master_list_entry *tmp = *oldmle;
3120 		spin_lock(&tmp->spinlock);
3121 		if (tmp->type == DLM_MLE_MIGRATION) {
3122 			if (master == dlm->node_num) {
3123 				/* ah another process raced me to it */
3124 				mlog(0, "tried to migrate %.*s, but some "
3125 				     "process beat me to it\n",
3126 				     namelen, name);
3127 				ret = -EEXIST;
3128 			} else {
3129 				/* bad.  2 NODES are trying to migrate! */
3130 				mlog(ML_ERROR, "migration error  mle: "
3131 				     "master=%u new_master=%u // request: "
3132 				     "master=%u new_master=%u // "
3133 				     "lockres=%.*s\n",
3134 				     tmp->master, tmp->new_master,
3135 				     master, new_master,
3136 				     namelen, name);
3137 				BUG();
3138 			}
3139 		} else {
3140 			/* this is essentially what assert_master does */
3141 			tmp->master = master;
3142 			atomic_set(&tmp->woken, 1);
3143 			wake_up(&tmp->wq);
3144 			/* remove it so that only one mle will be found */
3145 			__dlm_unlink_mle(dlm, tmp);
3146 			__dlm_mle_detach_hb_events(dlm, tmp);
3147 			ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
3148 			mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
3149 			    "telling master to get ref for cleared out mle "
3150 			    "during migration\n", dlm->name, namelen, name,
3151 			    master, new_master);
3152 		}
3153 		spin_unlock(&tmp->spinlock);
3154 	}
3155 
3156 	/* now add a migration mle to the tail of the list */
3157 	dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
3158 	mle->new_master = new_master;
3159 	/* the new master will be sending an assert master for this.
3160 	 * at that point we will get the refmap reference */
3161 	mle->master = master;
3162 	/* do this for consistency with other mle types */
3163 	set_bit(new_master, mle->maybe_map);
3164 	__dlm_insert_mle(dlm, mle);
3165 
3166 	return ret;
3167 }
3168 
3169 /*
3170  * Sets the owner of the lockres, associated to the mle, to UNKNOWN
3171  */
3172 static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
3173 					struct dlm_master_list_entry *mle)
3174 {
3175 	struct dlm_lock_resource *res;
3176 
3177 	/* Find the lockres associated to the mle and set its owner to UNK */
3178 	res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen,
3179 				   mle->mnamehash);
3180 	if (res) {
3181 		spin_unlock(&dlm->master_lock);
3182 
3183 		/* move lockres onto recovery list */
3184 		spin_lock(&res->spinlock);
3185 		dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
3186 		dlm_move_lockres_to_recovery_list(dlm, res);
3187 		spin_unlock(&res->spinlock);
3188 		dlm_lockres_put(res);
3189 
3190 		/* about to get rid of mle, detach from heartbeat */
3191 		__dlm_mle_detach_hb_events(dlm, mle);
3192 
3193 		/* dump the mle */
3194 		spin_lock(&dlm->master_lock);
3195 		__dlm_put_mle(mle);
3196 		spin_unlock(&dlm->master_lock);
3197 	}
3198 
3199 	return res;
3200 }
3201 
3202 static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
3203 				    struct dlm_master_list_entry *mle)
3204 {
3205 	__dlm_mle_detach_hb_events(dlm, mle);
3206 
3207 	spin_lock(&mle->spinlock);
3208 	__dlm_unlink_mle(dlm, mle);
3209 	atomic_set(&mle->woken, 1);
3210 	spin_unlock(&mle->spinlock);
3211 
3212 	wake_up(&mle->wq);
3213 }
3214 
3215 static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
3216 				struct dlm_master_list_entry *mle, u8 dead_node)
3217 {
3218 	int bit;
3219 
3220 	BUG_ON(mle->type != DLM_MLE_BLOCK);
3221 
3222 	spin_lock(&mle->spinlock);
3223 	bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
3224 	if (bit != dead_node) {
3225 		mlog(0, "mle found, but dead node %u would not have been "
3226 		     "master\n", dead_node);
3227 		spin_unlock(&mle->spinlock);
3228 	} else {
3229 		/* Must drop the refcount by one since the assert_master will
3230 		 * never arrive. This may result in the mle being unlinked and
3231 		 * freed, but there may still be a process waiting in the
3232 		 * dlmlock path which is fine. */
3233 		mlog(0, "node %u was expected master\n", dead_node);
3234 		atomic_set(&mle->woken, 1);
3235 		spin_unlock(&mle->spinlock);
3236 		wake_up(&mle->wq);
3237 
3238 		/* Do not need events any longer, so detach from heartbeat */
3239 		__dlm_mle_detach_hb_events(dlm, mle);
3240 		__dlm_put_mle(mle);
3241 	}
3242 }
3243 
3244 void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
3245 {
3246 	struct dlm_master_list_entry *mle;
3247 	struct dlm_lock_resource *res;
3248 	struct hlist_head *bucket;
3249 	struct hlist_node *list;
3250 	unsigned int i;
3251 
3252 	mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
3253 top:
3254 	assert_spin_locked(&dlm->spinlock);
3255 
3256 	/* clean the master list */
3257 	spin_lock(&dlm->master_lock);
3258 	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3259 		bucket = dlm_master_hash(dlm, i);
3260 		hlist_for_each(list, bucket) {
3261 			mle = hlist_entry(list, struct dlm_master_list_entry,
3262 					  master_hash_node);
3263 
3264 			BUG_ON(mle->type != DLM_MLE_BLOCK &&
3265 			       mle->type != DLM_MLE_MASTER &&
3266 			       mle->type != DLM_MLE_MIGRATION);
3267 
3268 			/* MASTER mles are initiated locally. The waiting
3269 			 * process will notice the node map change shortly.
3270 			 * Let that happen as normal. */
3271 			if (mle->type == DLM_MLE_MASTER)
3272 				continue;
3273 
3274 			/* BLOCK mles are initiated by other nodes. Need to
3275 			 * clean up if the dead node would have been the
3276 			 * master. */
3277 			if (mle->type == DLM_MLE_BLOCK) {
3278 				dlm_clean_block_mle(dlm, mle, dead_node);
3279 				continue;
3280 			}
3281 
3282 			/* Everything else is a MIGRATION mle */
3283 
3284 			/* The rule for MIGRATION mles is that the master
3285 			 * becomes UNKNOWN if *either* the original or the new
3286 			 * master dies. All UNKNOWN lockres' are sent to
3287 			 * whichever node becomes the recovery master. The new
3288 			 * master is responsible for determining if there is
3289 			 * still a master for this lockres, or if he needs to
3290 			 * take over mastery. Either way, this node should
3291 			 * expect another message to resolve this. */
3292 
3293 			if (mle->master != dead_node &&
3294 			    mle->new_master != dead_node)
3295 				continue;
3296 
3297 			/* If we have reached this point, this mle needs to be
3298 			 * removed from the list and freed. */
3299 			dlm_clean_migration_mle(dlm, mle);
3300 
3301 			mlog(0, "%s: node %u died during migration from "
3302 			     "%u to %u!\n", dlm->name, dead_node, mle->master,
3303 			     mle->new_master);
3304 
3305 			/* If we find a lockres associated with the mle, we've
3306 			 * hit this rare case that messes up our lock ordering.
3307 			 * If so, we need to drop the master lock so that we can
3308 			 * take the lockres lock, meaning that we will have to
3309 			 * restart from the head of list. */
3310 			res = dlm_reset_mleres_owner(dlm, mle);
3311 			if (res)
3312 				/* restart */
3313 				goto top;
3314 
3315 			/* This may be the last reference */
3316 			__dlm_put_mle(mle);
3317 		}
3318 	}
3319 	spin_unlock(&dlm->master_lock);
3320 }
3321 
3322 int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
3323 			 u8 old_master)
3324 {
3325 	struct dlm_node_iter iter;
3326 	int ret = 0;
3327 
3328 	spin_lock(&dlm->spinlock);
3329 	dlm_node_iter_init(dlm->domain_map, &iter);
3330 	clear_bit(old_master, iter.node_map);
3331 	clear_bit(dlm->node_num, iter.node_map);
3332 	spin_unlock(&dlm->spinlock);
3333 
3334 	/* ownership of the lockres is changing.  account for the
3335 	 * mastery reference here since old_master will briefly have
3336 	 * a reference after the migration completes */
3337 	spin_lock(&res->spinlock);
3338 	dlm_lockres_set_refmap_bit(old_master, res);
3339 	spin_unlock(&res->spinlock);
3340 
3341 	mlog(0, "now time to do a migrate request to other nodes\n");
3342 	ret = dlm_do_migrate_request(dlm, res, old_master,
3343 				     dlm->node_num, &iter);
3344 	if (ret < 0) {
3345 		mlog_errno(ret);
3346 		goto leave;
3347 	}
3348 
3349 	mlog(0, "doing assert master of %.*s to all except the original node\n",
3350 	     res->lockname.len, res->lockname.name);
3351 	/* this call now finishes out the nodemap
3352 	 * even if one or more nodes die */
3353 	ret = dlm_do_assert_master(dlm, res, iter.node_map,
3354 				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
3355 	if (ret < 0) {
3356 		/* no longer need to retry.  all living nodes contacted. */
3357 		mlog_errno(ret);
3358 		ret = 0;
3359 	}
3360 
3361 	memset(iter.node_map, 0, sizeof(iter.node_map));
3362 	set_bit(old_master, iter.node_map);
3363 	mlog(0, "doing assert master of %.*s back to %u\n",
3364 	     res->lockname.len, res->lockname.name, old_master);
3365 	ret = dlm_do_assert_master(dlm, res, iter.node_map,
3366 				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
3367 	if (ret < 0) {
3368 		mlog(0, "assert master to original master failed "
3369 		     "with %d.\n", ret);
3370 		/* the only nonzero status here would be because of
3371 		 * a dead original node.  we're done. */
3372 		ret = 0;
3373 	}
3374 
3375 	/* all done, set the owner, clear the flag */
3376 	spin_lock(&res->spinlock);
3377 	dlm_set_lockres_owner(dlm, res, dlm->node_num);
3378 	res->state &= ~DLM_LOCK_RES_MIGRATING;
3379 	spin_unlock(&res->spinlock);
3380 	/* re-dirty it on the new master */
3381 	dlm_kick_thread(dlm, res);
3382 	wake_up(&res->wq);
3383 leave:
3384 	return ret;
3385 }
3386 
3387 /*
3388  * LOCKRES AST REFCOUNT
3389  * this is integral to migration
3390  */
3391 
3392 /* for future intent to call an ast, reserve one ahead of time.
3393  * this should be called only after waiting on the lockres
3394  * with dlm_wait_on_lockres, and while still holding the
3395  * spinlock after the call. */
3396 void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
3397 {
3398 	assert_spin_locked(&res->spinlock);
3399 	if (res->state & DLM_LOCK_RES_MIGRATING) {
3400 		__dlm_print_one_lock_resource(res);
3401 	}
3402 	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3403 
3404 	atomic_inc(&res->asts_reserved);
3405 }
3406 
3407 /*
3408  * used to drop the reserved ast, either because it went unused,
3409  * or because the ast/bast was actually called.
3410  *
3411  * also, if there is a pending migration on this lockres,
3412  * and this was the last pending ast on the lockres,
3413  * atomically set the MIGRATING flag before we drop the lock.
3414  * this is how we ensure that migration can proceed with no
3415  * asts in progress.  note that it is ok if the state of the
3416  * queues is such that a lock should be granted in the future
3417  * or that a bast should be fired, because the new master will
3418  * shuffle the lists on this lockres as soon as it is migrated.
3419  */
3420 void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
3421 			     struct dlm_lock_resource *res)
3422 {
3423 	if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
3424 		return;
3425 
3426 	if (!res->migration_pending) {
3427 		spin_unlock(&res->spinlock);
3428 		return;
3429 	}
3430 
3431 	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3432 	res->migration_pending = 0;
3433 	res->state |= DLM_LOCK_RES_MIGRATING;
3434 	spin_unlock(&res->spinlock);
3435 	wake_up(&res->wq);
3436 	wake_up(&dlm->migration_wq);
3437 }
3438