xref: /openbmc/linux/fs/ocfs2/dlm/dlmmaster.c (revision a2bf0477)
16714d8e8SKurt Hackel /* -*- mode: c; c-basic-offset: 8; -*-
26714d8e8SKurt Hackel  * vim: noexpandtab sw=8 ts=8 sts=0:
36714d8e8SKurt Hackel  *
46714d8e8SKurt Hackel  * dlmmod.c
56714d8e8SKurt Hackel  *
66714d8e8SKurt Hackel  * standalone DLM module
76714d8e8SKurt Hackel  *
86714d8e8SKurt Hackel  * Copyright (C) 2004 Oracle.  All rights reserved.
96714d8e8SKurt Hackel  *
106714d8e8SKurt Hackel  * This program is free software; you can redistribute it and/or
116714d8e8SKurt Hackel  * modify it under the terms of the GNU General Public
126714d8e8SKurt Hackel  * License as published by the Free Software Foundation; either
136714d8e8SKurt Hackel  * version 2 of the License, or (at your option) any later version.
146714d8e8SKurt Hackel  *
156714d8e8SKurt Hackel  * This program is distributed in the hope that it will be useful,
166714d8e8SKurt Hackel  * but WITHOUT ANY WARRANTY; without even the implied warranty of
176714d8e8SKurt Hackel  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
186714d8e8SKurt Hackel  * General Public License for more details.
196714d8e8SKurt Hackel  *
206714d8e8SKurt Hackel  * You should have received a copy of the GNU General Public
216714d8e8SKurt Hackel  * License along with this program; if not, write to the
226714d8e8SKurt Hackel  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
236714d8e8SKurt Hackel  * Boston, MA 021110-1307, USA.
246714d8e8SKurt Hackel  *
256714d8e8SKurt Hackel  */
266714d8e8SKurt Hackel 
276714d8e8SKurt Hackel 
286714d8e8SKurt Hackel #include <linux/module.h>
296714d8e8SKurt Hackel #include <linux/fs.h>
306714d8e8SKurt Hackel #include <linux/types.h>
316714d8e8SKurt Hackel #include <linux/slab.h>
326714d8e8SKurt Hackel #include <linux/highmem.h>
336714d8e8SKurt Hackel #include <linux/utsname.h>
346714d8e8SKurt Hackel #include <linux/init.h>
356714d8e8SKurt Hackel #include <linux/sysctl.h>
366714d8e8SKurt Hackel #include <linux/random.h>
376714d8e8SKurt Hackel #include <linux/blkdev.h>
386714d8e8SKurt Hackel #include <linux/socket.h>
396714d8e8SKurt Hackel #include <linux/inet.h>
406714d8e8SKurt Hackel #include <linux/spinlock.h>
416714d8e8SKurt Hackel #include <linux/delay.h>
426714d8e8SKurt Hackel 
436714d8e8SKurt Hackel 
446714d8e8SKurt Hackel #include "cluster/heartbeat.h"
456714d8e8SKurt Hackel #include "cluster/nodemanager.h"
466714d8e8SKurt Hackel #include "cluster/tcp.h"
476714d8e8SKurt Hackel 
486714d8e8SKurt Hackel #include "dlmapi.h"
496714d8e8SKurt Hackel #include "dlmcommon.h"
506714d8e8SKurt Hackel #include "dlmdebug.h"
5182353b59SAdrian Bunk #include "dlmdomain.h"
526714d8e8SKurt Hackel 
536714d8e8SKurt Hackel #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
546714d8e8SKurt Hackel #include "cluster/masklog.h"
556714d8e8SKurt Hackel 
566714d8e8SKurt Hackel enum dlm_mle_type {
576714d8e8SKurt Hackel 	DLM_MLE_BLOCK,
586714d8e8SKurt Hackel 	DLM_MLE_MASTER,
596714d8e8SKurt Hackel 	DLM_MLE_MIGRATION
606714d8e8SKurt Hackel };
616714d8e8SKurt Hackel 
626714d8e8SKurt Hackel struct dlm_lock_name
636714d8e8SKurt Hackel {
646714d8e8SKurt Hackel 	u8 len;
656714d8e8SKurt Hackel 	u8 name[DLM_LOCKID_NAME_MAX];
666714d8e8SKurt Hackel };
676714d8e8SKurt Hackel 
686714d8e8SKurt Hackel struct dlm_master_list_entry
696714d8e8SKurt Hackel {
706714d8e8SKurt Hackel 	struct list_head list;
716714d8e8SKurt Hackel 	struct list_head hb_events;
726714d8e8SKurt Hackel 	struct dlm_ctxt *dlm;
736714d8e8SKurt Hackel 	spinlock_t spinlock;
746714d8e8SKurt Hackel 	wait_queue_head_t wq;
756714d8e8SKurt Hackel 	atomic_t woken;
766714d8e8SKurt Hackel 	struct kref mle_refs;
77a2bf0477SKurt Hackel 	int inuse;
786714d8e8SKurt Hackel 	unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
796714d8e8SKurt Hackel 	unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
806714d8e8SKurt Hackel 	unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
816714d8e8SKurt Hackel 	unsigned long node_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
826714d8e8SKurt Hackel 	u8 master;
836714d8e8SKurt Hackel 	u8 new_master;
846714d8e8SKurt Hackel 	enum dlm_mle_type type;
856714d8e8SKurt Hackel 	struct o2hb_callback_func mle_hb_up;
866714d8e8SKurt Hackel 	struct o2hb_callback_func mle_hb_down;
876714d8e8SKurt Hackel 	union {
886714d8e8SKurt Hackel 		struct dlm_lock_resource *res;
896714d8e8SKurt Hackel 		struct dlm_lock_name name;
906714d8e8SKurt Hackel 	} u;
916714d8e8SKurt Hackel };
926714d8e8SKurt Hackel 
936714d8e8SKurt Hackel static void dlm_mle_node_down(struct dlm_ctxt *dlm,
946714d8e8SKurt Hackel 			      struct dlm_master_list_entry *mle,
956714d8e8SKurt Hackel 			      struct o2nm_node *node,
966714d8e8SKurt Hackel 			      int idx);
976714d8e8SKurt Hackel static void dlm_mle_node_up(struct dlm_ctxt *dlm,
986714d8e8SKurt Hackel 			    struct dlm_master_list_entry *mle,
996714d8e8SKurt Hackel 			    struct o2nm_node *node,
1006714d8e8SKurt Hackel 			    int idx);
1016714d8e8SKurt Hackel 
1026714d8e8SKurt Hackel static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
1036714d8e8SKurt Hackel static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
1046714d8e8SKurt Hackel 				unsigned int namelen, void *nodemap,
1056714d8e8SKurt Hackel 				u32 flags);
1066714d8e8SKurt Hackel 
1076714d8e8SKurt Hackel static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
1086714d8e8SKurt Hackel 				struct dlm_master_list_entry *mle,
1096714d8e8SKurt Hackel 				const char *name,
1106714d8e8SKurt Hackel 				unsigned int namelen)
1116714d8e8SKurt Hackel {
1126714d8e8SKurt Hackel 	struct dlm_lock_resource *res;
1136714d8e8SKurt Hackel 
1146714d8e8SKurt Hackel 	if (dlm != mle->dlm)
1156714d8e8SKurt Hackel 		return 0;
1166714d8e8SKurt Hackel 
1176714d8e8SKurt Hackel 	if (mle->type == DLM_MLE_BLOCK ||
1186714d8e8SKurt Hackel 	    mle->type == DLM_MLE_MIGRATION) {
1196714d8e8SKurt Hackel 		if (namelen != mle->u.name.len ||
1206714d8e8SKurt Hackel     	    	    memcmp(name, mle->u.name.name, namelen)!=0)
1216714d8e8SKurt Hackel 			return 0;
1226714d8e8SKurt Hackel 	} else {
1236714d8e8SKurt Hackel 		res = mle->u.res;
1246714d8e8SKurt Hackel 		if (namelen != res->lockname.len ||
1256714d8e8SKurt Hackel 		    memcmp(res->lockname.name, name, namelen) != 0)
1266714d8e8SKurt Hackel 			return 0;
1276714d8e8SKurt Hackel 	}
1286714d8e8SKurt Hackel 	return 1;
1296714d8e8SKurt Hackel }
1306714d8e8SKurt Hackel 
1316714d8e8SKurt Hackel #if 0
1326714d8e8SKurt Hackel /* Code here is included but defined out as it aids debugging */
1336714d8e8SKurt Hackel 
13495883719SKurt Hackel #define dlm_print_nodemap(m)  _dlm_print_nodemap(m,#m)
13595883719SKurt Hackel void _dlm_print_nodemap(unsigned long *map, const char *mapname)
13695883719SKurt Hackel {
13795883719SKurt Hackel 	int i;
13895883719SKurt Hackel 	printk("%s=[ ", mapname);
13995883719SKurt Hackel 	for (i=0; i<O2NM_MAX_NODES; i++)
14095883719SKurt Hackel 		if (test_bit(i, map))
14195883719SKurt Hackel 			printk("%d ", i);
14295883719SKurt Hackel 	printk("]");
14395883719SKurt Hackel }
14495883719SKurt Hackel 
1456714d8e8SKurt Hackel void dlm_print_one_mle(struct dlm_master_list_entry *mle)
1466714d8e8SKurt Hackel {
14795883719SKurt Hackel 	int refs;
1486714d8e8SKurt Hackel 	char *type;
1496714d8e8SKurt Hackel 	char attached;
1506714d8e8SKurt Hackel 	u8 master;
1516714d8e8SKurt Hackel 	unsigned int namelen;
1526714d8e8SKurt Hackel 	const char *name;
1536714d8e8SKurt Hackel 	struct kref *k;
15495883719SKurt Hackel 	unsigned long *maybe = mle->maybe_map,
15595883719SKurt Hackel 		      *vote = mle->vote_map,
15695883719SKurt Hackel 		      *resp = mle->response_map,
15795883719SKurt Hackel 		      *node = mle->node_map;
1586714d8e8SKurt Hackel 
1596714d8e8SKurt Hackel 	k = &mle->mle_refs;
1606714d8e8SKurt Hackel 	if (mle->type == DLM_MLE_BLOCK)
1616714d8e8SKurt Hackel 		type = "BLK";
1626714d8e8SKurt Hackel 	else if (mle->type == DLM_MLE_MASTER)
1636714d8e8SKurt Hackel 		type = "MAS";
1646714d8e8SKurt Hackel 	else
1656714d8e8SKurt Hackel 		type = "MIG";
1666714d8e8SKurt Hackel 	refs = atomic_read(&k->refcount);
1676714d8e8SKurt Hackel 	master = mle->master;
1686714d8e8SKurt Hackel 	attached = (list_empty(&mle->hb_events) ? 'N' : 'Y');
1696714d8e8SKurt Hackel 
1706714d8e8SKurt Hackel 	if (mle->type != DLM_MLE_MASTER) {
1716714d8e8SKurt Hackel 		namelen = mle->u.name.len;
1726714d8e8SKurt Hackel 		name = mle->u.name.name;
1736714d8e8SKurt Hackel 	} else {
1746714d8e8SKurt Hackel 		namelen = mle->u.res->lockname.len;
1756714d8e8SKurt Hackel 		name = mle->u.res->lockname.name;
1766714d8e8SKurt Hackel 	}
1776714d8e8SKurt Hackel 
17895883719SKurt Hackel 	mlog(ML_NOTICE, "%.*s: %3s refs=%3d mas=%3u new=%3u evt=%c inuse=%d ",
17995883719SKurt Hackel 		  namelen, name, type, refs, master, mle->new_master, attached,
18095883719SKurt Hackel 		  mle->inuse);
18195883719SKurt Hackel 	dlm_print_nodemap(maybe);
18295883719SKurt Hackel 	printk(", ");
18395883719SKurt Hackel 	dlm_print_nodemap(vote);
18495883719SKurt Hackel 	printk(", ");
18595883719SKurt Hackel 	dlm_print_nodemap(resp);
18695883719SKurt Hackel 	printk(", ");
18795883719SKurt Hackel 	dlm_print_nodemap(node);
18895883719SKurt Hackel 	printk(", ");
18995883719SKurt Hackel 	printk("\n");
1906714d8e8SKurt Hackel }
1916714d8e8SKurt Hackel 
1926714d8e8SKurt Hackel static void dlm_dump_mles(struct dlm_ctxt *dlm)
1936714d8e8SKurt Hackel {
1946714d8e8SKurt Hackel 	struct dlm_master_list_entry *mle;
1956714d8e8SKurt Hackel 	struct list_head *iter;
1966714d8e8SKurt Hackel 
1976714d8e8SKurt Hackel 	mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
1986714d8e8SKurt Hackel 	spin_lock(&dlm->master_lock);
1996714d8e8SKurt Hackel 	list_for_each(iter, &dlm->master_list) {
2006714d8e8SKurt Hackel 		mle = list_entry(iter, struct dlm_master_list_entry, list);
2016714d8e8SKurt Hackel 		dlm_print_one_mle(mle);
2026714d8e8SKurt Hackel 	}
2036714d8e8SKurt Hackel 	spin_unlock(&dlm->master_lock);
2046714d8e8SKurt Hackel }
2056714d8e8SKurt Hackel 
2066714d8e8SKurt Hackel int dlm_dump_all_mles(const char __user *data, unsigned int len)
2076714d8e8SKurt Hackel {
2086714d8e8SKurt Hackel 	struct list_head *iter;
2096714d8e8SKurt Hackel 	struct dlm_ctxt *dlm;
2106714d8e8SKurt Hackel 
2116714d8e8SKurt Hackel 	spin_lock(&dlm_domain_lock);
2126714d8e8SKurt Hackel 	list_for_each(iter, &dlm_domains) {
2136714d8e8SKurt Hackel 		dlm = list_entry (iter, struct dlm_ctxt, list);
2146714d8e8SKurt Hackel 		mlog(ML_NOTICE, "found dlm: %p, name=%s\n", dlm, dlm->name);
2156714d8e8SKurt Hackel 		dlm_dump_mles(dlm);
2166714d8e8SKurt Hackel 	}
2176714d8e8SKurt Hackel 	spin_unlock(&dlm_domain_lock);
2186714d8e8SKurt Hackel 	return len;
2196714d8e8SKurt Hackel }
2206714d8e8SKurt Hackel EXPORT_SYMBOL_GPL(dlm_dump_all_mles);
2216714d8e8SKurt Hackel 
2226714d8e8SKurt Hackel #endif  /*  0  */
2236714d8e8SKurt Hackel 
2246714d8e8SKurt Hackel 
2256714d8e8SKurt Hackel static kmem_cache_t *dlm_mle_cache = NULL;
2266714d8e8SKurt Hackel 
2276714d8e8SKurt Hackel 
2286714d8e8SKurt Hackel static void dlm_mle_release(struct kref *kref);
2296714d8e8SKurt Hackel static void dlm_init_mle(struct dlm_master_list_entry *mle,
2306714d8e8SKurt Hackel 			enum dlm_mle_type type,
2316714d8e8SKurt Hackel 			struct dlm_ctxt *dlm,
2326714d8e8SKurt Hackel 			struct dlm_lock_resource *res,
2336714d8e8SKurt Hackel 			const char *name,
2346714d8e8SKurt Hackel 			unsigned int namelen);
2356714d8e8SKurt Hackel static void dlm_put_mle(struct dlm_master_list_entry *mle);
2366714d8e8SKurt Hackel static void __dlm_put_mle(struct dlm_master_list_entry *mle);
2376714d8e8SKurt Hackel static int dlm_find_mle(struct dlm_ctxt *dlm,
2386714d8e8SKurt Hackel 			struct dlm_master_list_entry **mle,
2396714d8e8SKurt Hackel 			char *name, unsigned int namelen);
2406714d8e8SKurt Hackel 
2416714d8e8SKurt Hackel static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to);
2426714d8e8SKurt Hackel 
2436714d8e8SKurt Hackel 
2446714d8e8SKurt Hackel static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
2456714d8e8SKurt Hackel 				     struct dlm_lock_resource *res,
2466714d8e8SKurt Hackel 				     struct dlm_master_list_entry *mle,
2476714d8e8SKurt Hackel 				     int *blocked);
2486714d8e8SKurt Hackel static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
2496714d8e8SKurt Hackel 				    struct dlm_lock_resource *res,
2506714d8e8SKurt Hackel 				    struct dlm_master_list_entry *mle,
2516714d8e8SKurt Hackel 				    int blocked);
2526714d8e8SKurt Hackel static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
2536714d8e8SKurt Hackel 				 struct dlm_lock_resource *res,
2546714d8e8SKurt Hackel 				 struct dlm_master_list_entry *mle,
2556714d8e8SKurt Hackel 				 struct dlm_master_list_entry **oldmle,
2566714d8e8SKurt Hackel 				 const char *name, unsigned int namelen,
2576714d8e8SKurt Hackel 				 u8 new_master, u8 master);
2586714d8e8SKurt Hackel 
2596714d8e8SKurt Hackel static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2606714d8e8SKurt Hackel 				    struct dlm_lock_resource *res);
2616714d8e8SKurt Hackel static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2626714d8e8SKurt Hackel 				      struct dlm_lock_resource *res);
2636714d8e8SKurt Hackel static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2646714d8e8SKurt Hackel 				       struct dlm_lock_resource *res,
2656714d8e8SKurt Hackel 				       u8 target);
266c03872f5SKurt Hackel static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
267c03872f5SKurt Hackel 				       struct dlm_lock_resource *res);
2686714d8e8SKurt Hackel 
2696714d8e8SKurt Hackel 
2706714d8e8SKurt Hackel int dlm_is_host_down(int errno)
2716714d8e8SKurt Hackel {
2726714d8e8SKurt Hackel 	switch (errno) {
2736714d8e8SKurt Hackel 		case -EBADF:
2746714d8e8SKurt Hackel 		case -ECONNREFUSED:
2756714d8e8SKurt Hackel 		case -ENOTCONN:
2766714d8e8SKurt Hackel 		case -ECONNRESET:
2776714d8e8SKurt Hackel 		case -EPIPE:
2786714d8e8SKurt Hackel 		case -EHOSTDOWN:
2796714d8e8SKurt Hackel 		case -EHOSTUNREACH:
2806714d8e8SKurt Hackel 		case -ETIMEDOUT:
2816714d8e8SKurt Hackel 		case -ECONNABORTED:
2826714d8e8SKurt Hackel 		case -ENETDOWN:
2836714d8e8SKurt Hackel 		case -ENETUNREACH:
2846714d8e8SKurt Hackel 		case -ENETRESET:
2856714d8e8SKurt Hackel 		case -ESHUTDOWN:
2866714d8e8SKurt Hackel 		case -ENOPROTOOPT:
2876714d8e8SKurt Hackel 		case -EINVAL:   /* if returned from our tcp code,
2886714d8e8SKurt Hackel 				   this means there is no socket */
2896714d8e8SKurt Hackel 			return 1;
2906714d8e8SKurt Hackel 	}
2916714d8e8SKurt Hackel 	return 0;
2926714d8e8SKurt Hackel }
2936714d8e8SKurt Hackel 
2946714d8e8SKurt Hackel 
2956714d8e8SKurt Hackel /*
2966714d8e8SKurt Hackel  * MASTER LIST FUNCTIONS
2976714d8e8SKurt Hackel  */
2986714d8e8SKurt Hackel 
2996714d8e8SKurt Hackel 
3006714d8e8SKurt Hackel /*
3016714d8e8SKurt Hackel  * regarding master list entries and heartbeat callbacks:
3026714d8e8SKurt Hackel  *
3036714d8e8SKurt Hackel  * in order to avoid sleeping and allocation that occurs in
3046714d8e8SKurt Hackel  * heartbeat, master list entries are simply attached to the
3056714d8e8SKurt Hackel  * dlm's established heartbeat callbacks.  the mle is attached
3066714d8e8SKurt Hackel  * when it is created, and since the dlm->spinlock is held at
3076714d8e8SKurt Hackel  * that time, any heartbeat event will be properly discovered
3086714d8e8SKurt Hackel  * by the mle.  the mle needs to be detached from the
3096714d8e8SKurt Hackel  * dlm->mle_hb_events list as soon as heartbeat events are no
3106714d8e8SKurt Hackel  * longer useful to the mle, and before the mle is freed.
3116714d8e8SKurt Hackel  *
3126714d8e8SKurt Hackel  * as a general rule, heartbeat events are no longer needed by
3136714d8e8SKurt Hackel  * the mle once an "answer" regarding the lock master has been
3146714d8e8SKurt Hackel  * received.
3156714d8e8SKurt Hackel  */
3166714d8e8SKurt Hackel static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
3176714d8e8SKurt Hackel 					      struct dlm_master_list_entry *mle)
3186714d8e8SKurt Hackel {
3196714d8e8SKurt Hackel 	assert_spin_locked(&dlm->spinlock);
3206714d8e8SKurt Hackel 
3216714d8e8SKurt Hackel 	list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
3226714d8e8SKurt Hackel }
3236714d8e8SKurt Hackel 
3246714d8e8SKurt Hackel 
3256714d8e8SKurt Hackel static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
3266714d8e8SKurt Hackel 					      struct dlm_master_list_entry *mle)
3276714d8e8SKurt Hackel {
3286714d8e8SKurt Hackel 	if (!list_empty(&mle->hb_events))
3296714d8e8SKurt Hackel 		list_del_init(&mle->hb_events);
3306714d8e8SKurt Hackel }
3316714d8e8SKurt Hackel 
3326714d8e8SKurt Hackel 
3336714d8e8SKurt Hackel static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
3346714d8e8SKurt Hackel 					    struct dlm_master_list_entry *mle)
3356714d8e8SKurt Hackel {
3366714d8e8SKurt Hackel 	spin_lock(&dlm->spinlock);
3376714d8e8SKurt Hackel 	__dlm_mle_detach_hb_events(dlm, mle);
3386714d8e8SKurt Hackel 	spin_unlock(&dlm->spinlock);
3396714d8e8SKurt Hackel }
3406714d8e8SKurt Hackel 
341a2bf0477SKurt Hackel static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
342a2bf0477SKurt Hackel {
343a2bf0477SKurt Hackel 	struct dlm_ctxt *dlm;
344a2bf0477SKurt Hackel 	dlm = mle->dlm;
345a2bf0477SKurt Hackel 
346a2bf0477SKurt Hackel 	assert_spin_locked(&dlm->spinlock);
347a2bf0477SKurt Hackel 	assert_spin_locked(&dlm->master_lock);
348a2bf0477SKurt Hackel 	mle->inuse++;
349a2bf0477SKurt Hackel 	kref_get(&mle->mle_refs);
350a2bf0477SKurt Hackel }
351a2bf0477SKurt Hackel 
352a2bf0477SKurt Hackel static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
353a2bf0477SKurt Hackel {
354a2bf0477SKurt Hackel 	struct dlm_ctxt *dlm;
355a2bf0477SKurt Hackel 	dlm = mle->dlm;
356a2bf0477SKurt Hackel 
357a2bf0477SKurt Hackel 	spin_lock(&dlm->spinlock);
358a2bf0477SKurt Hackel 	spin_lock(&dlm->master_lock);
359a2bf0477SKurt Hackel 	mle->inuse--;
360a2bf0477SKurt Hackel 	__dlm_put_mle(mle);
361a2bf0477SKurt Hackel 	spin_unlock(&dlm->master_lock);
362a2bf0477SKurt Hackel 	spin_unlock(&dlm->spinlock);
363a2bf0477SKurt Hackel 
364a2bf0477SKurt Hackel }
365a2bf0477SKurt Hackel 
3666714d8e8SKurt Hackel /* remove from list and free */
3676714d8e8SKurt Hackel static void __dlm_put_mle(struct dlm_master_list_entry *mle)
3686714d8e8SKurt Hackel {
3696714d8e8SKurt Hackel 	struct dlm_ctxt *dlm;
3706714d8e8SKurt Hackel 	dlm = mle->dlm;
3716714d8e8SKurt Hackel 
3726714d8e8SKurt Hackel 	assert_spin_locked(&dlm->spinlock);
3736714d8e8SKurt Hackel 	assert_spin_locked(&dlm->master_lock);
3746714d8e8SKurt Hackel 	BUG_ON(!atomic_read(&mle->mle_refs.refcount));
3756714d8e8SKurt Hackel 
3766714d8e8SKurt Hackel 	kref_put(&mle->mle_refs, dlm_mle_release);
3776714d8e8SKurt Hackel }
3786714d8e8SKurt Hackel 
3796714d8e8SKurt Hackel 
3806714d8e8SKurt Hackel /* must not have any spinlocks coming in */
3816714d8e8SKurt Hackel static void dlm_put_mle(struct dlm_master_list_entry *mle)
3826714d8e8SKurt Hackel {
3836714d8e8SKurt Hackel 	struct dlm_ctxt *dlm;
3846714d8e8SKurt Hackel 	dlm = mle->dlm;
3856714d8e8SKurt Hackel 
3866714d8e8SKurt Hackel 	spin_lock(&dlm->spinlock);
3876714d8e8SKurt Hackel 	spin_lock(&dlm->master_lock);
3886714d8e8SKurt Hackel 	__dlm_put_mle(mle);
3896714d8e8SKurt Hackel 	spin_unlock(&dlm->master_lock);
3906714d8e8SKurt Hackel 	spin_unlock(&dlm->spinlock);
3916714d8e8SKurt Hackel }
3926714d8e8SKurt Hackel 
3936714d8e8SKurt Hackel static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
3946714d8e8SKurt Hackel {
3956714d8e8SKurt Hackel 	kref_get(&mle->mle_refs);
3966714d8e8SKurt Hackel }
3976714d8e8SKurt Hackel 
3986714d8e8SKurt Hackel static void dlm_init_mle(struct dlm_master_list_entry *mle,
3996714d8e8SKurt Hackel 			enum dlm_mle_type type,
4006714d8e8SKurt Hackel 			struct dlm_ctxt *dlm,
4016714d8e8SKurt Hackel 			struct dlm_lock_resource *res,
4026714d8e8SKurt Hackel 			const char *name,
4036714d8e8SKurt Hackel 			unsigned int namelen)
4046714d8e8SKurt Hackel {
4056714d8e8SKurt Hackel 	assert_spin_locked(&dlm->spinlock);
4066714d8e8SKurt Hackel 
4076714d8e8SKurt Hackel 	mle->dlm = dlm;
4086714d8e8SKurt Hackel 	mle->type = type;
4096714d8e8SKurt Hackel 	INIT_LIST_HEAD(&mle->list);
4106714d8e8SKurt Hackel 	INIT_LIST_HEAD(&mle->hb_events);
4116714d8e8SKurt Hackel 	memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
4126714d8e8SKurt Hackel 	spin_lock_init(&mle->spinlock);
4136714d8e8SKurt Hackel 	init_waitqueue_head(&mle->wq);
4146714d8e8SKurt Hackel 	atomic_set(&mle->woken, 0);
4156714d8e8SKurt Hackel 	kref_init(&mle->mle_refs);
4166714d8e8SKurt Hackel 	memset(mle->response_map, 0, sizeof(mle->response_map));
4176714d8e8SKurt Hackel 	mle->master = O2NM_MAX_NODES;
4186714d8e8SKurt Hackel 	mle->new_master = O2NM_MAX_NODES;
419a2bf0477SKurt Hackel 	mle->inuse = 0;
4206714d8e8SKurt Hackel 
4216714d8e8SKurt Hackel 	if (mle->type == DLM_MLE_MASTER) {
4226714d8e8SKurt Hackel 		BUG_ON(!res);
4236714d8e8SKurt Hackel 		mle->u.res = res;
4246714d8e8SKurt Hackel 	} else if (mle->type == DLM_MLE_BLOCK) {
4256714d8e8SKurt Hackel 		BUG_ON(!name);
4266714d8e8SKurt Hackel 		memcpy(mle->u.name.name, name, namelen);
4276714d8e8SKurt Hackel 		mle->u.name.len = namelen;
4286714d8e8SKurt Hackel 	} else /* DLM_MLE_MIGRATION */ {
4296714d8e8SKurt Hackel 		BUG_ON(!name);
4306714d8e8SKurt Hackel 		memcpy(mle->u.name.name, name, namelen);
4316714d8e8SKurt Hackel 		mle->u.name.len = namelen;
4326714d8e8SKurt Hackel 	}
4336714d8e8SKurt Hackel 
4346714d8e8SKurt Hackel 	/* copy off the node_map and register hb callbacks on our copy */
4356714d8e8SKurt Hackel 	memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
4366714d8e8SKurt Hackel 	memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
4376714d8e8SKurt Hackel 	clear_bit(dlm->node_num, mle->vote_map);
4386714d8e8SKurt Hackel 	clear_bit(dlm->node_num, mle->node_map);
4396714d8e8SKurt Hackel 
4406714d8e8SKurt Hackel 	/* attach the mle to the domain node up/down events */
4416714d8e8SKurt Hackel 	__dlm_mle_attach_hb_events(dlm, mle);
4426714d8e8SKurt Hackel }
4436714d8e8SKurt Hackel 
4446714d8e8SKurt Hackel 
4456714d8e8SKurt Hackel /* returns 1 if found, 0 if not */
4466714d8e8SKurt Hackel static int dlm_find_mle(struct dlm_ctxt *dlm,
4476714d8e8SKurt Hackel 			struct dlm_master_list_entry **mle,
4486714d8e8SKurt Hackel 			char *name, unsigned int namelen)
4496714d8e8SKurt Hackel {
4506714d8e8SKurt Hackel 	struct dlm_master_list_entry *tmpmle;
4516714d8e8SKurt Hackel 	struct list_head *iter;
4526714d8e8SKurt Hackel 
4536714d8e8SKurt Hackel 	assert_spin_locked(&dlm->master_lock);
4546714d8e8SKurt Hackel 
4556714d8e8SKurt Hackel 	list_for_each(iter, &dlm->master_list) {
4566714d8e8SKurt Hackel 		tmpmle = list_entry(iter, struct dlm_master_list_entry, list);
4576714d8e8SKurt Hackel 		if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
4586714d8e8SKurt Hackel 			continue;
4596714d8e8SKurt Hackel 		dlm_get_mle(tmpmle);
4606714d8e8SKurt Hackel 		*mle = tmpmle;
4616714d8e8SKurt Hackel 		return 1;
4626714d8e8SKurt Hackel 	}
4636714d8e8SKurt Hackel 	return 0;
4646714d8e8SKurt Hackel }
4656714d8e8SKurt Hackel 
4666714d8e8SKurt Hackel void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
4676714d8e8SKurt Hackel {
4686714d8e8SKurt Hackel 	struct dlm_master_list_entry *mle;
4696714d8e8SKurt Hackel 	struct list_head *iter;
4706714d8e8SKurt Hackel 
4716714d8e8SKurt Hackel 	assert_spin_locked(&dlm->spinlock);
4726714d8e8SKurt Hackel 
4736714d8e8SKurt Hackel 	list_for_each(iter, &dlm->mle_hb_events) {
4746714d8e8SKurt Hackel 		mle = list_entry(iter, struct dlm_master_list_entry,
4756714d8e8SKurt Hackel 				 hb_events);
4766714d8e8SKurt Hackel 		if (node_up)
4776714d8e8SKurt Hackel 			dlm_mle_node_up(dlm, mle, NULL, idx);
4786714d8e8SKurt Hackel 		else
4796714d8e8SKurt Hackel 			dlm_mle_node_down(dlm, mle, NULL, idx);
4806714d8e8SKurt Hackel 	}
4816714d8e8SKurt Hackel }
4826714d8e8SKurt Hackel 
4836714d8e8SKurt Hackel static void dlm_mle_node_down(struct dlm_ctxt *dlm,
4846714d8e8SKurt Hackel 			      struct dlm_master_list_entry *mle,
4856714d8e8SKurt Hackel 			      struct o2nm_node *node, int idx)
4866714d8e8SKurt Hackel {
4876714d8e8SKurt Hackel 	spin_lock(&mle->spinlock);
4886714d8e8SKurt Hackel 
4896714d8e8SKurt Hackel 	if (!test_bit(idx, mle->node_map))
4906714d8e8SKurt Hackel 		mlog(0, "node %u already removed from nodemap!\n", idx);
4916714d8e8SKurt Hackel 	else
4926714d8e8SKurt Hackel 		clear_bit(idx, mle->node_map);
4936714d8e8SKurt Hackel 
4946714d8e8SKurt Hackel 	spin_unlock(&mle->spinlock);
4956714d8e8SKurt Hackel }
4966714d8e8SKurt Hackel 
4976714d8e8SKurt Hackel static void dlm_mle_node_up(struct dlm_ctxt *dlm,
4986714d8e8SKurt Hackel 			    struct dlm_master_list_entry *mle,
4996714d8e8SKurt Hackel 			    struct o2nm_node *node, int idx)
5006714d8e8SKurt Hackel {
5016714d8e8SKurt Hackel 	spin_lock(&mle->spinlock);
5026714d8e8SKurt Hackel 
5036714d8e8SKurt Hackel 	if (test_bit(idx, mle->node_map))
5046714d8e8SKurt Hackel 		mlog(0, "node %u already in node map!\n", idx);
5056714d8e8SKurt Hackel 	else
5066714d8e8SKurt Hackel 		set_bit(idx, mle->node_map);
5076714d8e8SKurt Hackel 
5086714d8e8SKurt Hackel 	spin_unlock(&mle->spinlock);
5096714d8e8SKurt Hackel }
5106714d8e8SKurt Hackel 
5116714d8e8SKurt Hackel 
5126714d8e8SKurt Hackel int dlm_init_mle_cache(void)
5136714d8e8SKurt Hackel {
5146714d8e8SKurt Hackel 	dlm_mle_cache = kmem_cache_create("dlm_mle_cache",
5156714d8e8SKurt Hackel 					  sizeof(struct dlm_master_list_entry),
5166714d8e8SKurt Hackel 					  0, SLAB_HWCACHE_ALIGN,
5176714d8e8SKurt Hackel 					  NULL, NULL);
5186714d8e8SKurt Hackel 	if (dlm_mle_cache == NULL)
5196714d8e8SKurt Hackel 		return -ENOMEM;
5206714d8e8SKurt Hackel 	return 0;
5216714d8e8SKurt Hackel }
5226714d8e8SKurt Hackel 
5236714d8e8SKurt Hackel void dlm_destroy_mle_cache(void)
5246714d8e8SKurt Hackel {
5256714d8e8SKurt Hackel 	if (dlm_mle_cache)
5266714d8e8SKurt Hackel 		kmem_cache_destroy(dlm_mle_cache);
5276714d8e8SKurt Hackel }
5286714d8e8SKurt Hackel 
5296714d8e8SKurt Hackel static void dlm_mle_release(struct kref *kref)
5306714d8e8SKurt Hackel {
5316714d8e8SKurt Hackel 	struct dlm_master_list_entry *mle;
5326714d8e8SKurt Hackel 	struct dlm_ctxt *dlm;
5336714d8e8SKurt Hackel 
5346714d8e8SKurt Hackel 	mlog_entry_void();
5356714d8e8SKurt Hackel 
5366714d8e8SKurt Hackel 	mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
5376714d8e8SKurt Hackel 	dlm = mle->dlm;
5386714d8e8SKurt Hackel 
5396714d8e8SKurt Hackel 	if (mle->type != DLM_MLE_MASTER) {
5406714d8e8SKurt Hackel 		mlog(0, "calling mle_release for %.*s, type %d\n",
5416714d8e8SKurt Hackel 		     mle->u.name.len, mle->u.name.name, mle->type);
5426714d8e8SKurt Hackel 	} else {
5436714d8e8SKurt Hackel 		mlog(0, "calling mle_release for %.*s, type %d\n",
5446714d8e8SKurt Hackel 		     mle->u.res->lockname.len,
5456714d8e8SKurt Hackel 		     mle->u.res->lockname.name, mle->type);
5466714d8e8SKurt Hackel 	}
5476714d8e8SKurt Hackel 	assert_spin_locked(&dlm->spinlock);
5486714d8e8SKurt Hackel 	assert_spin_locked(&dlm->master_lock);
5496714d8e8SKurt Hackel 
5506714d8e8SKurt Hackel 	/* remove from list if not already */
5516714d8e8SKurt Hackel 	if (!list_empty(&mle->list))
5526714d8e8SKurt Hackel 		list_del_init(&mle->list);
5536714d8e8SKurt Hackel 
5546714d8e8SKurt Hackel 	/* detach the mle from the domain node up/down events */
5556714d8e8SKurt Hackel 	__dlm_mle_detach_hb_events(dlm, mle);
5566714d8e8SKurt Hackel 
5576714d8e8SKurt Hackel 	/* NOTE: kfree under spinlock here.
5586714d8e8SKurt Hackel 	 * if this is bad, we can move this to a freelist. */
5596714d8e8SKurt Hackel 	kmem_cache_free(dlm_mle_cache, mle);
5606714d8e8SKurt Hackel }
5616714d8e8SKurt Hackel 
5626714d8e8SKurt Hackel 
5636714d8e8SKurt Hackel /*
5646714d8e8SKurt Hackel  * LOCK RESOURCE FUNCTIONS
5656714d8e8SKurt Hackel  */
5666714d8e8SKurt Hackel 
5676714d8e8SKurt Hackel static void dlm_set_lockres_owner(struct dlm_ctxt *dlm,
5686714d8e8SKurt Hackel 				  struct dlm_lock_resource *res,
5696714d8e8SKurt Hackel 				  u8 owner)
5706714d8e8SKurt Hackel {
5716714d8e8SKurt Hackel 	assert_spin_locked(&res->spinlock);
5726714d8e8SKurt Hackel 
5736714d8e8SKurt Hackel 	mlog_entry("%.*s, %u\n", res->lockname.len, res->lockname.name, owner);
5746714d8e8SKurt Hackel 
5756714d8e8SKurt Hackel 	if (owner == dlm->node_num)
5766714d8e8SKurt Hackel 		atomic_inc(&dlm->local_resources);
5776714d8e8SKurt Hackel 	else if (owner == DLM_LOCK_RES_OWNER_UNKNOWN)
5786714d8e8SKurt Hackel 		atomic_inc(&dlm->unknown_resources);
5796714d8e8SKurt Hackel 	else
5806714d8e8SKurt Hackel 		atomic_inc(&dlm->remote_resources);
5816714d8e8SKurt Hackel 
5826714d8e8SKurt Hackel 	res->owner = owner;
5836714d8e8SKurt Hackel }
5846714d8e8SKurt Hackel 
5856714d8e8SKurt Hackel void dlm_change_lockres_owner(struct dlm_ctxt *dlm,
5866714d8e8SKurt Hackel 			      struct dlm_lock_resource *res, u8 owner)
5876714d8e8SKurt Hackel {
5886714d8e8SKurt Hackel 	assert_spin_locked(&res->spinlock);
5896714d8e8SKurt Hackel 
5906714d8e8SKurt Hackel 	if (owner == res->owner)
5916714d8e8SKurt Hackel 		return;
5926714d8e8SKurt Hackel 
5936714d8e8SKurt Hackel 	if (res->owner == dlm->node_num)
5946714d8e8SKurt Hackel 		atomic_dec(&dlm->local_resources);
5956714d8e8SKurt Hackel 	else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN)
5966714d8e8SKurt Hackel 		atomic_dec(&dlm->unknown_resources);
5976714d8e8SKurt Hackel 	else
5986714d8e8SKurt Hackel 		atomic_dec(&dlm->remote_resources);
5996714d8e8SKurt Hackel 
6006714d8e8SKurt Hackel 	dlm_set_lockres_owner(dlm, res, owner);
6016714d8e8SKurt Hackel }
6026714d8e8SKurt Hackel 
6036714d8e8SKurt Hackel 
6046714d8e8SKurt Hackel static void dlm_lockres_release(struct kref *kref)
6056714d8e8SKurt Hackel {
6066714d8e8SKurt Hackel 	struct dlm_lock_resource *res;
6076714d8e8SKurt Hackel 
6086714d8e8SKurt Hackel 	res = container_of(kref, struct dlm_lock_resource, refs);
6096714d8e8SKurt Hackel 
6106714d8e8SKurt Hackel 	/* This should not happen -- all lockres' have a name
6116714d8e8SKurt Hackel 	 * associated with them at init time. */
6126714d8e8SKurt Hackel 	BUG_ON(!res->lockname.name);
6136714d8e8SKurt Hackel 
6146714d8e8SKurt Hackel 	mlog(0, "destroying lockres %.*s\n", res->lockname.len,
6156714d8e8SKurt Hackel 	     res->lockname.name);
6166714d8e8SKurt Hackel 
6176714d8e8SKurt Hackel 	/* By the time we're ready to blow this guy away, we shouldn't
6186714d8e8SKurt Hackel 	 * be on any lists. */
61981f2094aSMark Fasheh 	BUG_ON(!hlist_unhashed(&res->hash_node));
6206714d8e8SKurt Hackel 	BUG_ON(!list_empty(&res->granted));
6216714d8e8SKurt Hackel 	BUG_ON(!list_empty(&res->converting));
6226714d8e8SKurt Hackel 	BUG_ON(!list_empty(&res->blocked));
6236714d8e8SKurt Hackel 	BUG_ON(!list_empty(&res->dirty));
6246714d8e8SKurt Hackel 	BUG_ON(!list_empty(&res->recovering));
6256714d8e8SKurt Hackel 	BUG_ON(!list_empty(&res->purge));
6266714d8e8SKurt Hackel 
6276714d8e8SKurt Hackel 	kfree(res->lockname.name);
6286714d8e8SKurt Hackel 
6296714d8e8SKurt Hackel 	kfree(res);
6306714d8e8SKurt Hackel }
6316714d8e8SKurt Hackel 
6326714d8e8SKurt Hackel void dlm_lockres_put(struct dlm_lock_resource *res)
6336714d8e8SKurt Hackel {
6346714d8e8SKurt Hackel 	kref_put(&res->refs, dlm_lockres_release);
6356714d8e8SKurt Hackel }
6366714d8e8SKurt Hackel 
6376714d8e8SKurt Hackel static void dlm_init_lockres(struct dlm_ctxt *dlm,
6386714d8e8SKurt Hackel 			     struct dlm_lock_resource *res,
6396714d8e8SKurt Hackel 			     const char *name, unsigned int namelen)
6406714d8e8SKurt Hackel {
6416714d8e8SKurt Hackel 	char *qname;
6426714d8e8SKurt Hackel 
6436714d8e8SKurt Hackel 	/* If we memset here, we lose our reference to the kmalloc'd
6446714d8e8SKurt Hackel 	 * res->lockname.name, so be sure to init every field
6456714d8e8SKurt Hackel 	 * correctly! */
6466714d8e8SKurt Hackel 
6476714d8e8SKurt Hackel 	qname = (char *) res->lockname.name;
6486714d8e8SKurt Hackel 	memcpy(qname, name, namelen);
6496714d8e8SKurt Hackel 
6506714d8e8SKurt Hackel 	res->lockname.len = namelen;
651a3d33291SMark Fasheh 	res->lockname.hash = dlm_lockid_hash(name, namelen);
6526714d8e8SKurt Hackel 
6536714d8e8SKurt Hackel 	init_waitqueue_head(&res->wq);
6546714d8e8SKurt Hackel 	spin_lock_init(&res->spinlock);
65581f2094aSMark Fasheh 	INIT_HLIST_NODE(&res->hash_node);
6566714d8e8SKurt Hackel 	INIT_LIST_HEAD(&res->granted);
6576714d8e8SKurt Hackel 	INIT_LIST_HEAD(&res->converting);
6586714d8e8SKurt Hackel 	INIT_LIST_HEAD(&res->blocked);
6596714d8e8SKurt Hackel 	INIT_LIST_HEAD(&res->dirty);
6606714d8e8SKurt Hackel 	INIT_LIST_HEAD(&res->recovering);
6616714d8e8SKurt Hackel 	INIT_LIST_HEAD(&res->purge);
6626714d8e8SKurt Hackel 	atomic_set(&res->asts_reserved, 0);
6636714d8e8SKurt Hackel 	res->migration_pending = 0;
6646714d8e8SKurt Hackel 
6656714d8e8SKurt Hackel 	kref_init(&res->refs);
6666714d8e8SKurt Hackel 
6676714d8e8SKurt Hackel 	/* just for consistency */
6686714d8e8SKurt Hackel 	spin_lock(&res->spinlock);
6696714d8e8SKurt Hackel 	dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
6706714d8e8SKurt Hackel 	spin_unlock(&res->spinlock);
6716714d8e8SKurt Hackel 
6726714d8e8SKurt Hackel 	res->state = DLM_LOCK_RES_IN_PROGRESS;
6736714d8e8SKurt Hackel 
6746714d8e8SKurt Hackel 	res->last_used = 0;
6756714d8e8SKurt Hackel 
6766714d8e8SKurt Hackel 	memset(res->lvb, 0, DLM_LVB_LEN);
6776714d8e8SKurt Hackel }
6786714d8e8SKurt Hackel 
6796714d8e8SKurt Hackel struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
6806714d8e8SKurt Hackel 				   const char *name,
6816714d8e8SKurt Hackel 				   unsigned int namelen)
6826714d8e8SKurt Hackel {
6836714d8e8SKurt Hackel 	struct dlm_lock_resource *res;
6846714d8e8SKurt Hackel 
6856714d8e8SKurt Hackel 	res = kmalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
6866714d8e8SKurt Hackel 	if (!res)
6876714d8e8SKurt Hackel 		return NULL;
6886714d8e8SKurt Hackel 
6896714d8e8SKurt Hackel 	res->lockname.name = kmalloc(namelen, GFP_KERNEL);
6906714d8e8SKurt Hackel 	if (!res->lockname.name) {
6916714d8e8SKurt Hackel 		kfree(res);
6926714d8e8SKurt Hackel 		return NULL;
6936714d8e8SKurt Hackel 	}
6946714d8e8SKurt Hackel 
6956714d8e8SKurt Hackel 	dlm_init_lockres(dlm, res, name, namelen);
6966714d8e8SKurt Hackel 	return res;
6976714d8e8SKurt Hackel }
6986714d8e8SKurt Hackel 
6996714d8e8SKurt Hackel /*
7006714d8e8SKurt Hackel  * lookup a lock resource by name.
7016714d8e8SKurt Hackel  * may already exist in the hashtable.
7026714d8e8SKurt Hackel  * lockid is null terminated
7036714d8e8SKurt Hackel  *
7046714d8e8SKurt Hackel  * if not, allocate enough for the lockres and for
7056714d8e8SKurt Hackel  * the temporary structure used in doing the mastering.
7066714d8e8SKurt Hackel  *
7076714d8e8SKurt Hackel  * also, do a lookup in the dlm->master_list to see
7086714d8e8SKurt Hackel  * if another node has begun mastering the same lock.
7096714d8e8SKurt Hackel  * if so, there should be a block entry in there
7106714d8e8SKurt Hackel  * for this name, and we should *not* attempt to master
7116714d8e8SKurt Hackel  * the lock here.   need to wait around for that node
7126714d8e8SKurt Hackel  * to assert_master (or die).
7136714d8e8SKurt Hackel  *
7146714d8e8SKurt Hackel  */
7156714d8e8SKurt Hackel struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
7166714d8e8SKurt Hackel 					  const char *lockid,
7176714d8e8SKurt Hackel 					  int flags)
7186714d8e8SKurt Hackel {
7196714d8e8SKurt Hackel 	struct dlm_lock_resource *tmpres=NULL, *res=NULL;
7206714d8e8SKurt Hackel 	struct dlm_master_list_entry *mle = NULL;
7216714d8e8SKurt Hackel 	struct dlm_master_list_entry *alloc_mle = NULL;
7226714d8e8SKurt Hackel 	int blocked = 0;
7236714d8e8SKurt Hackel 	int ret, nodenum;
7246714d8e8SKurt Hackel 	struct dlm_node_iter iter;
725a3d33291SMark Fasheh 	unsigned int namelen, hash;
7266714d8e8SKurt Hackel 	int tries = 0;
727c03872f5SKurt Hackel 	int bit, wait_on_recovery = 0;
7286714d8e8SKurt Hackel 
7296714d8e8SKurt Hackel 	BUG_ON(!lockid);
7306714d8e8SKurt Hackel 
7316714d8e8SKurt Hackel 	namelen = strlen(lockid);
732a3d33291SMark Fasheh 	hash = dlm_lockid_hash(lockid, namelen);
7336714d8e8SKurt Hackel 
7346714d8e8SKurt Hackel 	mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
7356714d8e8SKurt Hackel 
7366714d8e8SKurt Hackel lookup:
7376714d8e8SKurt Hackel 	spin_lock(&dlm->spinlock);
738a3d33291SMark Fasheh 	tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash);
7396714d8e8SKurt Hackel 	if (tmpres) {
7406714d8e8SKurt Hackel 		spin_unlock(&dlm->spinlock);
7416714d8e8SKurt Hackel 		mlog(0, "found in hash!\n");
7426714d8e8SKurt Hackel 		if (res)
7436714d8e8SKurt Hackel 			dlm_lockres_put(res);
7446714d8e8SKurt Hackel 		res = tmpres;
7456714d8e8SKurt Hackel 		goto leave;
7466714d8e8SKurt Hackel 	}
7476714d8e8SKurt Hackel 
7486714d8e8SKurt Hackel 	if (!res) {
7496714d8e8SKurt Hackel 		spin_unlock(&dlm->spinlock);
7506714d8e8SKurt Hackel 		mlog(0, "allocating a new resource\n");
7516714d8e8SKurt Hackel 		/* nothing found and we need to allocate one. */
7526714d8e8SKurt Hackel 		alloc_mle = (struct dlm_master_list_entry *)
7536714d8e8SKurt Hackel 			kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
7546714d8e8SKurt Hackel 		if (!alloc_mle)
7556714d8e8SKurt Hackel 			goto leave;
7566714d8e8SKurt Hackel 		res = dlm_new_lockres(dlm, lockid, namelen);
7576714d8e8SKurt Hackel 		if (!res)
7586714d8e8SKurt Hackel 			goto leave;
7596714d8e8SKurt Hackel 		goto lookup;
7606714d8e8SKurt Hackel 	}
7616714d8e8SKurt Hackel 
7626714d8e8SKurt Hackel 	mlog(0, "no lockres found, allocated our own: %p\n", res);
7636714d8e8SKurt Hackel 
7646714d8e8SKurt Hackel 	if (flags & LKM_LOCAL) {
7656714d8e8SKurt Hackel 		/* caller knows it's safe to assume it's not mastered elsewhere
7666714d8e8SKurt Hackel 		 * DONE!  return right away */
7676714d8e8SKurt Hackel 		spin_lock(&res->spinlock);
7686714d8e8SKurt Hackel 		dlm_change_lockres_owner(dlm, res, dlm->node_num);
7696714d8e8SKurt Hackel 		__dlm_insert_lockres(dlm, res);
7706714d8e8SKurt Hackel 		spin_unlock(&res->spinlock);
7716714d8e8SKurt Hackel 		spin_unlock(&dlm->spinlock);
7726714d8e8SKurt Hackel 		/* lockres still marked IN_PROGRESS */
7736714d8e8SKurt Hackel 		goto wake_waiters;
7746714d8e8SKurt Hackel 	}
7756714d8e8SKurt Hackel 
7766714d8e8SKurt Hackel 	/* check master list to see if another node has started mastering it */
7776714d8e8SKurt Hackel 	spin_lock(&dlm->master_lock);
7786714d8e8SKurt Hackel 
7796714d8e8SKurt Hackel 	/* if we found a block, wait for lock to be mastered by another node */
7806714d8e8SKurt Hackel 	blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
7816714d8e8SKurt Hackel 	if (blocked) {
7826714d8e8SKurt Hackel 		if (mle->type == DLM_MLE_MASTER) {
7836714d8e8SKurt Hackel 			mlog(ML_ERROR, "master entry for nonexistent lock!\n");
7846714d8e8SKurt Hackel 			BUG();
7856714d8e8SKurt Hackel 		} else if (mle->type == DLM_MLE_MIGRATION) {
7866714d8e8SKurt Hackel 			/* migration is in progress! */
7876714d8e8SKurt Hackel 			/* the good news is that we now know the
7886714d8e8SKurt Hackel 			 * "current" master (mle->master). */
7896714d8e8SKurt Hackel 
7906714d8e8SKurt Hackel 			spin_unlock(&dlm->master_lock);
7916714d8e8SKurt Hackel 			assert_spin_locked(&dlm->spinlock);
7926714d8e8SKurt Hackel 
7936714d8e8SKurt Hackel 			/* set the lockres owner and hash it */
7946714d8e8SKurt Hackel 			spin_lock(&res->spinlock);
7956714d8e8SKurt Hackel 			dlm_set_lockres_owner(dlm, res, mle->master);
7966714d8e8SKurt Hackel 			__dlm_insert_lockres(dlm, res);
7976714d8e8SKurt Hackel 			spin_unlock(&res->spinlock);
7986714d8e8SKurt Hackel 			spin_unlock(&dlm->spinlock);
7996714d8e8SKurt Hackel 
8006714d8e8SKurt Hackel 			/* master is known, detach */
8016714d8e8SKurt Hackel 			dlm_mle_detach_hb_events(dlm, mle);
8026714d8e8SKurt Hackel 			dlm_put_mle(mle);
8036714d8e8SKurt Hackel 			mle = NULL;
8046714d8e8SKurt Hackel 			goto wake_waiters;
8056714d8e8SKurt Hackel 		}
8066714d8e8SKurt Hackel 	} else {
8076714d8e8SKurt Hackel 		/* go ahead and try to master lock on this node */
8086714d8e8SKurt Hackel 		mle = alloc_mle;
8096714d8e8SKurt Hackel 		/* make sure this does not get freed below */
8106714d8e8SKurt Hackel 		alloc_mle = NULL;
8116714d8e8SKurt Hackel 		dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
8126714d8e8SKurt Hackel 		set_bit(dlm->node_num, mle->maybe_map);
8136714d8e8SKurt Hackel 		list_add(&mle->list, &dlm->master_list);
814c03872f5SKurt Hackel 
815c03872f5SKurt Hackel 		/* still holding the dlm spinlock, check the recovery map
816c03872f5SKurt Hackel 		 * to see if there are any nodes that still need to be
817c03872f5SKurt Hackel 		 * considered.  these will not appear in the mle nodemap
818c03872f5SKurt Hackel 		 * but they might own this lockres.  wait on them. */
819c03872f5SKurt Hackel 		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
820c03872f5SKurt Hackel 		if (bit < O2NM_MAX_NODES) {
821c03872f5SKurt Hackel 			mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
822c03872f5SKurt Hackel 			     "recover before lock mastery can begin\n",
823c03872f5SKurt Hackel 			     dlm->name, namelen, (char *)lockid, bit);
824c03872f5SKurt Hackel 			wait_on_recovery = 1;
825c03872f5SKurt Hackel 		}
8266714d8e8SKurt Hackel 	}
8276714d8e8SKurt Hackel 
8286714d8e8SKurt Hackel 	/* at this point there is either a DLM_MLE_BLOCK or a
8296714d8e8SKurt Hackel 	 * DLM_MLE_MASTER on the master list, so it's safe to add the
8306714d8e8SKurt Hackel 	 * lockres to the hashtable.  anyone who finds the lock will
8316714d8e8SKurt Hackel 	 * still have to wait on the IN_PROGRESS. */
8326714d8e8SKurt Hackel 
8336714d8e8SKurt Hackel 	/* finally add the lockres to its hash bucket */
8346714d8e8SKurt Hackel 	__dlm_insert_lockres(dlm, res);
8356714d8e8SKurt Hackel 	/* get an extra ref on the mle in case this is a BLOCK
8366714d8e8SKurt Hackel 	 * if so, the creator of the BLOCK may try to put the last
8376714d8e8SKurt Hackel 	 * ref at this time in the assert master handler, so we
8386714d8e8SKurt Hackel 	 * need an extra one to keep from a bad ptr deref. */
839a2bf0477SKurt Hackel 	dlm_get_mle_inuse(mle);
8406714d8e8SKurt Hackel 	spin_unlock(&dlm->master_lock);
8416714d8e8SKurt Hackel 	spin_unlock(&dlm->spinlock);
8426714d8e8SKurt Hackel 
843c03872f5SKurt Hackel 	while (wait_on_recovery) {
844c03872f5SKurt Hackel 		/* any cluster changes that occurred after dropping the
845c03872f5SKurt Hackel 		 * dlm spinlock would be detectable be a change on the mle,
846c03872f5SKurt Hackel 		 * so we only need to clear out the recovery map once. */
847c03872f5SKurt Hackel 		if (dlm_is_recovery_lock(lockid, namelen)) {
848c03872f5SKurt Hackel 			mlog(ML_NOTICE, "%s: recovery map is not empty, but "
849c03872f5SKurt Hackel 			     "must master $RECOVERY lock now\n", dlm->name);
850c03872f5SKurt Hackel 			if (!dlm_pre_master_reco_lockres(dlm, res))
851c03872f5SKurt Hackel 				wait_on_recovery = 0;
852c03872f5SKurt Hackel 			else {
853c03872f5SKurt Hackel 				mlog(0, "%s: waiting 500ms for heartbeat state "
854c03872f5SKurt Hackel 				    "change\n", dlm->name);
855c03872f5SKurt Hackel 				msleep(500);
856c03872f5SKurt Hackel 			}
857c03872f5SKurt Hackel 			continue;
858c03872f5SKurt Hackel 		}
859c03872f5SKurt Hackel 
860c03872f5SKurt Hackel 		dlm_kick_recovery_thread(dlm);
861c03872f5SKurt Hackel 		msleep(100);
862c03872f5SKurt Hackel 		dlm_wait_for_recovery(dlm);
863c03872f5SKurt Hackel 
864c03872f5SKurt Hackel 		spin_lock(&dlm->spinlock);
865c03872f5SKurt Hackel 		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
866c03872f5SKurt Hackel 		if (bit < O2NM_MAX_NODES) {
867c03872f5SKurt Hackel 			mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
868c03872f5SKurt Hackel 			     "recover before lock mastery can begin\n",
869c03872f5SKurt Hackel 			     dlm->name, namelen, (char *)lockid, bit);
870c03872f5SKurt Hackel 			wait_on_recovery = 1;
871c03872f5SKurt Hackel 		} else
872c03872f5SKurt Hackel 			wait_on_recovery = 0;
873c03872f5SKurt Hackel 		spin_unlock(&dlm->spinlock);
874c03872f5SKurt Hackel 	}
875c03872f5SKurt Hackel 
8766714d8e8SKurt Hackel 	/* must wait for lock to be mastered elsewhere */
8776714d8e8SKurt Hackel 	if (blocked)
8786714d8e8SKurt Hackel 		goto wait;
8796714d8e8SKurt Hackel 
8806714d8e8SKurt Hackel redo_request:
8816714d8e8SKurt Hackel 	ret = -EINVAL;
8826714d8e8SKurt Hackel 	dlm_node_iter_init(mle->vote_map, &iter);
8836714d8e8SKurt Hackel 	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
8846714d8e8SKurt Hackel 		ret = dlm_do_master_request(mle, nodenum);
8856714d8e8SKurt Hackel 		if (ret < 0)
8866714d8e8SKurt Hackel 			mlog_errno(ret);
8876714d8e8SKurt Hackel 		if (mle->master != O2NM_MAX_NODES) {
8886714d8e8SKurt Hackel 			/* found a master ! */
8899c6510a5SKurt Hackel 			if (mle->master <= nodenum)
8906714d8e8SKurt Hackel 				break;
8919c6510a5SKurt Hackel 			/* if our master request has not reached the master
8929c6510a5SKurt Hackel 			 * yet, keep going until it does.  this is how the
8939c6510a5SKurt Hackel 			 * master will know that asserts are needed back to
8949c6510a5SKurt Hackel 			 * the lower nodes. */
8959c6510a5SKurt Hackel 			mlog(0, "%s:%.*s: requests only up to %u but master "
8969c6510a5SKurt Hackel 			     "is %u, keep going\n", dlm->name, namelen,
8979c6510a5SKurt Hackel 			     lockid, nodenum, mle->master);
8986714d8e8SKurt Hackel 		}
8996714d8e8SKurt Hackel 	}
9006714d8e8SKurt Hackel 
9016714d8e8SKurt Hackel wait:
9026714d8e8SKurt Hackel 	/* keep going until the response map includes all nodes */
9036714d8e8SKurt Hackel 	ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
9046714d8e8SKurt Hackel 	if (ret < 0) {
9056714d8e8SKurt Hackel 		mlog(0, "%s:%.*s: node map changed, redo the "
9066714d8e8SKurt Hackel 		     "master request now, blocked=%d\n",
9076714d8e8SKurt Hackel 		     dlm->name, res->lockname.len,
9086714d8e8SKurt Hackel 		     res->lockname.name, blocked);
9096714d8e8SKurt Hackel 		if (++tries > 20) {
9106714d8e8SKurt Hackel 			mlog(ML_ERROR, "%s:%.*s: spinning on "
9116714d8e8SKurt Hackel 			     "dlm_wait_for_lock_mastery, blocked=%d\n",
9126714d8e8SKurt Hackel 			     dlm->name, res->lockname.len,
9136714d8e8SKurt Hackel 			     res->lockname.name, blocked);
9146714d8e8SKurt Hackel 			dlm_print_one_lock_resource(res);
9156714d8e8SKurt Hackel 			/* dlm_print_one_mle(mle); */
9166714d8e8SKurt Hackel 			tries = 0;
9176714d8e8SKurt Hackel 		}
9186714d8e8SKurt Hackel 		goto redo_request;
9196714d8e8SKurt Hackel 	}
9206714d8e8SKurt Hackel 
9216714d8e8SKurt Hackel 	mlog(0, "lockres mastered by %u\n", res->owner);
9226714d8e8SKurt Hackel 	/* make sure we never continue without this */
9236714d8e8SKurt Hackel 	BUG_ON(res->owner == O2NM_MAX_NODES);
9246714d8e8SKurt Hackel 
9256714d8e8SKurt Hackel 	/* master is known, detach if not already detached */
9266714d8e8SKurt Hackel 	dlm_mle_detach_hb_events(dlm, mle);
9276714d8e8SKurt Hackel 	dlm_put_mle(mle);
9286714d8e8SKurt Hackel 	/* put the extra ref */
929a2bf0477SKurt Hackel 	dlm_put_mle_inuse(mle);
9306714d8e8SKurt Hackel 
9316714d8e8SKurt Hackel wake_waiters:
9326714d8e8SKurt Hackel 	spin_lock(&res->spinlock);
9336714d8e8SKurt Hackel 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
9346714d8e8SKurt Hackel 	spin_unlock(&res->spinlock);
9356714d8e8SKurt Hackel 	wake_up(&res->wq);
9366714d8e8SKurt Hackel 
9376714d8e8SKurt Hackel leave:
9386714d8e8SKurt Hackel 	/* need to free the unused mle */
9396714d8e8SKurt Hackel 	if (alloc_mle)
9406714d8e8SKurt Hackel 		kmem_cache_free(dlm_mle_cache, alloc_mle);
9416714d8e8SKurt Hackel 
9426714d8e8SKurt Hackel 	return res;
9436714d8e8SKurt Hackel }
9446714d8e8SKurt Hackel 
9456714d8e8SKurt Hackel 
9466714d8e8SKurt Hackel #define DLM_MASTERY_TIMEOUT_MS   5000
9476714d8e8SKurt Hackel 
9486714d8e8SKurt Hackel static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
9496714d8e8SKurt Hackel 				     struct dlm_lock_resource *res,
9506714d8e8SKurt Hackel 				     struct dlm_master_list_entry *mle,
9516714d8e8SKurt Hackel 				     int *blocked)
9526714d8e8SKurt Hackel {
9536714d8e8SKurt Hackel 	u8 m;
9546714d8e8SKurt Hackel 	int ret, bit;
9556714d8e8SKurt Hackel 	int map_changed, voting_done;
9566714d8e8SKurt Hackel 	int assert, sleep;
9576714d8e8SKurt Hackel 
9586714d8e8SKurt Hackel recheck:
9596714d8e8SKurt Hackel 	ret = 0;
9606714d8e8SKurt Hackel 	assert = 0;
9616714d8e8SKurt Hackel 
9626714d8e8SKurt Hackel 	/* check if another node has already become the owner */
9636714d8e8SKurt Hackel 	spin_lock(&res->spinlock);
9646714d8e8SKurt Hackel 	if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
9659c6510a5SKurt Hackel 		mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
9669c6510a5SKurt Hackel 		     res->lockname.len, res->lockname.name, res->owner);
9676714d8e8SKurt Hackel 		spin_unlock(&res->spinlock);
9689c6510a5SKurt Hackel 		/* this will cause the master to re-assert across
9699c6510a5SKurt Hackel 		 * the whole cluster, freeing up mles */
9709c6510a5SKurt Hackel 		ret = dlm_do_master_request(mle, res->owner);
9719c6510a5SKurt Hackel 		if (ret < 0) {
9729c6510a5SKurt Hackel 			/* give recovery a chance to run */
9739c6510a5SKurt Hackel 			mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
9749c6510a5SKurt Hackel 			msleep(500);
9759c6510a5SKurt Hackel 			goto recheck;
9769c6510a5SKurt Hackel 		}
9779c6510a5SKurt Hackel 		ret = 0;
9786714d8e8SKurt Hackel 		goto leave;
9796714d8e8SKurt Hackel 	}
9806714d8e8SKurt Hackel 	spin_unlock(&res->spinlock);
9816714d8e8SKurt Hackel 
9826714d8e8SKurt Hackel 	spin_lock(&mle->spinlock);
9836714d8e8SKurt Hackel 	m = mle->master;
9846714d8e8SKurt Hackel 	map_changed = (memcmp(mle->vote_map, mle->node_map,
9856714d8e8SKurt Hackel 			      sizeof(mle->vote_map)) != 0);
9866714d8e8SKurt Hackel 	voting_done = (memcmp(mle->vote_map, mle->response_map,
9876714d8e8SKurt Hackel 			     sizeof(mle->vote_map)) == 0);
9886714d8e8SKurt Hackel 
9896714d8e8SKurt Hackel 	/* restart if we hit any errors */
9906714d8e8SKurt Hackel 	if (map_changed) {
9916714d8e8SKurt Hackel 		int b;
9926714d8e8SKurt Hackel 		mlog(0, "%s: %.*s: node map changed, restarting\n",
9936714d8e8SKurt Hackel 		     dlm->name, res->lockname.len, res->lockname.name);
9946714d8e8SKurt Hackel 		ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
9956714d8e8SKurt Hackel 		b = (mle->type == DLM_MLE_BLOCK);
9966714d8e8SKurt Hackel 		if ((*blocked && !b) || (!*blocked && b)) {
9976714d8e8SKurt Hackel 			mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
9986714d8e8SKurt Hackel 			     dlm->name, res->lockname.len, res->lockname.name,
9996714d8e8SKurt Hackel 			     *blocked, b);
10006714d8e8SKurt Hackel 			*blocked = b;
10016714d8e8SKurt Hackel 		}
10026714d8e8SKurt Hackel 		spin_unlock(&mle->spinlock);
10036714d8e8SKurt Hackel 		if (ret < 0) {
10046714d8e8SKurt Hackel 			mlog_errno(ret);
10056714d8e8SKurt Hackel 			goto leave;
10066714d8e8SKurt Hackel 		}
10076714d8e8SKurt Hackel 		mlog(0, "%s:%.*s: restart lock mastery succeeded, "
10086714d8e8SKurt Hackel 		     "rechecking now\n", dlm->name, res->lockname.len,
10096714d8e8SKurt Hackel 		     res->lockname.name);
10106714d8e8SKurt Hackel 		goto recheck;
10116714d8e8SKurt Hackel 	}
10126714d8e8SKurt Hackel 
10136714d8e8SKurt Hackel 	if (m != O2NM_MAX_NODES) {
10146714d8e8SKurt Hackel 		/* another node has done an assert!
10156714d8e8SKurt Hackel 		 * all done! */
10166714d8e8SKurt Hackel 		sleep = 0;
10176714d8e8SKurt Hackel 	} else {
10186714d8e8SKurt Hackel 		sleep = 1;
10196714d8e8SKurt Hackel 		/* have all nodes responded? */
10206714d8e8SKurt Hackel 		if (voting_done && !*blocked) {
10216714d8e8SKurt Hackel 			bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
10226714d8e8SKurt Hackel 			if (dlm->node_num <= bit) {
10236714d8e8SKurt Hackel 				/* my node number is lowest.
10246714d8e8SKurt Hackel 			 	 * now tell other nodes that I am
10256714d8e8SKurt Hackel 				 * mastering this. */
10266714d8e8SKurt Hackel 				mle->master = dlm->node_num;
10276714d8e8SKurt Hackel 				assert = 1;
10286714d8e8SKurt Hackel 				sleep = 0;
10296714d8e8SKurt Hackel 			}
10306714d8e8SKurt Hackel 			/* if voting is done, but we have not received
10316714d8e8SKurt Hackel 			 * an assert master yet, we must sleep */
10326714d8e8SKurt Hackel 		}
10336714d8e8SKurt Hackel 	}
10346714d8e8SKurt Hackel 
10356714d8e8SKurt Hackel 	spin_unlock(&mle->spinlock);
10366714d8e8SKurt Hackel 
10376714d8e8SKurt Hackel 	/* sleep if we haven't finished voting yet */
10386714d8e8SKurt Hackel 	if (sleep) {
10396714d8e8SKurt Hackel 		unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
10406714d8e8SKurt Hackel 
10416714d8e8SKurt Hackel 		/*
10426714d8e8SKurt Hackel 		if (atomic_read(&mle->mle_refs.refcount) < 2)
10436714d8e8SKurt Hackel 			mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
10446714d8e8SKurt Hackel 			atomic_read(&mle->mle_refs.refcount),
10456714d8e8SKurt Hackel 			res->lockname.len, res->lockname.name);
10466714d8e8SKurt Hackel 		*/
10476714d8e8SKurt Hackel 		atomic_set(&mle->woken, 0);
10486714d8e8SKurt Hackel 		(void)wait_event_timeout(mle->wq,
10496714d8e8SKurt Hackel 					 (atomic_read(&mle->woken) == 1),
10506714d8e8SKurt Hackel 					 timeo);
10516714d8e8SKurt Hackel 		if (res->owner == O2NM_MAX_NODES) {
10526714d8e8SKurt Hackel 			mlog(0, "waiting again\n");
10536714d8e8SKurt Hackel 			goto recheck;
10546714d8e8SKurt Hackel 		}
10556714d8e8SKurt Hackel 		mlog(0, "done waiting, master is %u\n", res->owner);
10566714d8e8SKurt Hackel 		ret = 0;
10576714d8e8SKurt Hackel 		goto leave;
10586714d8e8SKurt Hackel 	}
10596714d8e8SKurt Hackel 
10606714d8e8SKurt Hackel 	ret = 0;   /* done */
10616714d8e8SKurt Hackel 	if (assert) {
10626714d8e8SKurt Hackel 		m = dlm->node_num;
10636714d8e8SKurt Hackel 		mlog(0, "about to master %.*s here, this=%u\n",
10646714d8e8SKurt Hackel 		     res->lockname.len, res->lockname.name, m);
10656714d8e8SKurt Hackel 		ret = dlm_do_assert_master(dlm, res->lockname.name,
10666714d8e8SKurt Hackel 					   res->lockname.len, mle->vote_map, 0);
10676714d8e8SKurt Hackel 		if (ret) {
10686714d8e8SKurt Hackel 			/* This is a failure in the network path,
10696714d8e8SKurt Hackel 			 * not in the response to the assert_master
10706714d8e8SKurt Hackel 			 * (any nonzero response is a BUG on this node).
10716714d8e8SKurt Hackel 			 * Most likely a socket just got disconnected
10726714d8e8SKurt Hackel 			 * due to node death. */
10736714d8e8SKurt Hackel 			mlog_errno(ret);
10746714d8e8SKurt Hackel 		}
10756714d8e8SKurt Hackel 		/* no longer need to restart lock mastery.
10766714d8e8SKurt Hackel 		 * all living nodes have been contacted. */
10776714d8e8SKurt Hackel 		ret = 0;
10786714d8e8SKurt Hackel 	}
10796714d8e8SKurt Hackel 
10806714d8e8SKurt Hackel 	/* set the lockres owner */
10816714d8e8SKurt Hackel 	spin_lock(&res->spinlock);
10826714d8e8SKurt Hackel 	dlm_change_lockres_owner(dlm, res, m);
10836714d8e8SKurt Hackel 	spin_unlock(&res->spinlock);
10846714d8e8SKurt Hackel 
10856714d8e8SKurt Hackel leave:
10866714d8e8SKurt Hackel 	return ret;
10876714d8e8SKurt Hackel }
10886714d8e8SKurt Hackel 
10896714d8e8SKurt Hackel struct dlm_bitmap_diff_iter
10906714d8e8SKurt Hackel {
10916714d8e8SKurt Hackel 	int curnode;
10926714d8e8SKurt Hackel 	unsigned long *orig_bm;
10936714d8e8SKurt Hackel 	unsigned long *cur_bm;
10946714d8e8SKurt Hackel 	unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
10956714d8e8SKurt Hackel };
10966714d8e8SKurt Hackel 
10976714d8e8SKurt Hackel enum dlm_node_state_change
10986714d8e8SKurt Hackel {
10996714d8e8SKurt Hackel 	NODE_DOWN = -1,
11006714d8e8SKurt Hackel 	NODE_NO_CHANGE = 0,
11016714d8e8SKurt Hackel 	NODE_UP
11026714d8e8SKurt Hackel };
11036714d8e8SKurt Hackel 
11046714d8e8SKurt Hackel static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
11056714d8e8SKurt Hackel 				      unsigned long *orig_bm,
11066714d8e8SKurt Hackel 				      unsigned long *cur_bm)
11076714d8e8SKurt Hackel {
11086714d8e8SKurt Hackel 	unsigned long p1, p2;
11096714d8e8SKurt Hackel 	int i;
11106714d8e8SKurt Hackel 
11116714d8e8SKurt Hackel 	iter->curnode = -1;
11126714d8e8SKurt Hackel 	iter->orig_bm = orig_bm;
11136714d8e8SKurt Hackel 	iter->cur_bm = cur_bm;
11146714d8e8SKurt Hackel 
11156714d8e8SKurt Hackel 	for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
11166714d8e8SKurt Hackel        		p1 = *(iter->orig_bm + i);
11176714d8e8SKurt Hackel 	       	p2 = *(iter->cur_bm + i);
11186714d8e8SKurt Hackel 		iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
11196714d8e8SKurt Hackel 	}
11206714d8e8SKurt Hackel }
11216714d8e8SKurt Hackel 
11226714d8e8SKurt Hackel static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
11236714d8e8SKurt Hackel 				     enum dlm_node_state_change *state)
11246714d8e8SKurt Hackel {
11256714d8e8SKurt Hackel 	int bit;
11266714d8e8SKurt Hackel 
11276714d8e8SKurt Hackel 	if (iter->curnode >= O2NM_MAX_NODES)
11286714d8e8SKurt Hackel 		return -ENOENT;
11296714d8e8SKurt Hackel 
11306714d8e8SKurt Hackel 	bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
11316714d8e8SKurt Hackel 			    iter->curnode+1);
11326714d8e8SKurt Hackel 	if (bit >= O2NM_MAX_NODES) {
11336714d8e8SKurt Hackel 		iter->curnode = O2NM_MAX_NODES;
11346714d8e8SKurt Hackel 		return -ENOENT;
11356714d8e8SKurt Hackel 	}
11366714d8e8SKurt Hackel 
11376714d8e8SKurt Hackel 	/* if it was there in the original then this node died */
11386714d8e8SKurt Hackel 	if (test_bit(bit, iter->orig_bm))
11396714d8e8SKurt Hackel 		*state = NODE_DOWN;
11406714d8e8SKurt Hackel 	else
11416714d8e8SKurt Hackel 		*state = NODE_UP;
11426714d8e8SKurt Hackel 
11436714d8e8SKurt Hackel 	iter->curnode = bit;
11446714d8e8SKurt Hackel 	return bit;
11456714d8e8SKurt Hackel }
11466714d8e8SKurt Hackel 
11476714d8e8SKurt Hackel 
11486714d8e8SKurt Hackel static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
11496714d8e8SKurt Hackel 				    struct dlm_lock_resource *res,
11506714d8e8SKurt Hackel 				    struct dlm_master_list_entry *mle,
11516714d8e8SKurt Hackel 				    int blocked)
11526714d8e8SKurt Hackel {
11536714d8e8SKurt Hackel 	struct dlm_bitmap_diff_iter bdi;
11546714d8e8SKurt Hackel 	enum dlm_node_state_change sc;
11556714d8e8SKurt Hackel 	int node;
11566714d8e8SKurt Hackel 	int ret = 0;
11576714d8e8SKurt Hackel 
11586714d8e8SKurt Hackel 	mlog(0, "something happened such that the "
11596714d8e8SKurt Hackel 	     "master process may need to be restarted!\n");
11606714d8e8SKurt Hackel 
11616714d8e8SKurt Hackel 	assert_spin_locked(&mle->spinlock);
11626714d8e8SKurt Hackel 
11636714d8e8SKurt Hackel 	dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
11646714d8e8SKurt Hackel 	node = dlm_bitmap_diff_iter_next(&bdi, &sc);
11656714d8e8SKurt Hackel 	while (node >= 0) {
11666714d8e8SKurt Hackel 		if (sc == NODE_UP) {
1167e2faea4cSKurt Hackel 			/* a node came up.  clear any old vote from
1168e2faea4cSKurt Hackel 			 * the response map and set it in the vote map
1169e2faea4cSKurt Hackel 			 * then restart the mastery. */
1170e2faea4cSKurt Hackel 			mlog(ML_NOTICE, "node %d up while restarting\n", node);
11716714d8e8SKurt Hackel 
11726714d8e8SKurt Hackel 			/* redo the master request, but only for the new node */
11736714d8e8SKurt Hackel 			mlog(0, "sending request to new node\n");
11746714d8e8SKurt Hackel 			clear_bit(node, mle->response_map);
11756714d8e8SKurt Hackel 			set_bit(node, mle->vote_map);
11766714d8e8SKurt Hackel 		} else {
11776714d8e8SKurt Hackel 			mlog(ML_ERROR, "node down! %d\n", node);
11786714d8e8SKurt Hackel 
11796714d8e8SKurt Hackel 			/* if the node wasn't involved in mastery skip it,
11806714d8e8SKurt Hackel 			 * but clear it out from the maps so that it will
11816714d8e8SKurt Hackel 			 * not affect mastery of this lockres */
11826714d8e8SKurt Hackel 			clear_bit(node, mle->response_map);
11836714d8e8SKurt Hackel 			clear_bit(node, mle->vote_map);
11846714d8e8SKurt Hackel 			if (!test_bit(node, mle->maybe_map))
11856714d8e8SKurt Hackel 				goto next;
11866714d8e8SKurt Hackel 
11876714d8e8SKurt Hackel 			/* if we're already blocked on lock mastery, and the
11886714d8e8SKurt Hackel 			 * dead node wasn't the expected master, or there is
11896714d8e8SKurt Hackel 			 * another node in the maybe_map, keep waiting */
11906714d8e8SKurt Hackel 			if (blocked) {
11916714d8e8SKurt Hackel 				int lowest = find_next_bit(mle->maybe_map,
11926714d8e8SKurt Hackel 						       O2NM_MAX_NODES, 0);
11936714d8e8SKurt Hackel 
11946714d8e8SKurt Hackel 				/* act like it was never there */
11956714d8e8SKurt Hackel 				clear_bit(node, mle->maybe_map);
11966714d8e8SKurt Hackel 
11976714d8e8SKurt Hackel 			       	if (node != lowest)
11986714d8e8SKurt Hackel 					goto next;
11996714d8e8SKurt Hackel 
12006714d8e8SKurt Hackel 				mlog(ML_ERROR, "expected master %u died while "
12016714d8e8SKurt Hackel 				     "this node was blocked waiting on it!\n",
12026714d8e8SKurt Hackel 				     node);
12036714d8e8SKurt Hackel 				lowest = find_next_bit(mle->maybe_map,
12046714d8e8SKurt Hackel 						       O2NM_MAX_NODES,
12056714d8e8SKurt Hackel 						       lowest+1);
12066714d8e8SKurt Hackel 				if (lowest < O2NM_MAX_NODES) {
12076714d8e8SKurt Hackel 					mlog(0, "still blocked. waiting "
12086714d8e8SKurt Hackel 					     "on %u now\n", lowest);
12096714d8e8SKurt Hackel 					goto next;
12106714d8e8SKurt Hackel 				}
12116714d8e8SKurt Hackel 
12126714d8e8SKurt Hackel 				/* mle is an MLE_BLOCK, but there is now
12136714d8e8SKurt Hackel 				 * nothing left to block on.  we need to return
12146714d8e8SKurt Hackel 				 * all the way back out and try again with
12156714d8e8SKurt Hackel 				 * an MLE_MASTER. dlm_do_local_recovery_cleanup
12166714d8e8SKurt Hackel 				 * has already run, so the mle refcount is ok */
12176714d8e8SKurt Hackel 				mlog(0, "no longer blocking. we can "
12186714d8e8SKurt Hackel 				     "try to master this here\n");
12196714d8e8SKurt Hackel 				mle->type = DLM_MLE_MASTER;
12206714d8e8SKurt Hackel 				memset(mle->maybe_map, 0,
12216714d8e8SKurt Hackel 				       sizeof(mle->maybe_map));
12226714d8e8SKurt Hackel 				memset(mle->response_map, 0,
12236714d8e8SKurt Hackel 				       sizeof(mle->maybe_map));
12246714d8e8SKurt Hackel 				memcpy(mle->vote_map, mle->node_map,
12256714d8e8SKurt Hackel 				       sizeof(mle->node_map));
12266714d8e8SKurt Hackel 				mle->u.res = res;
12276714d8e8SKurt Hackel 				set_bit(dlm->node_num, mle->maybe_map);
12286714d8e8SKurt Hackel 
12296714d8e8SKurt Hackel 				ret = -EAGAIN;
12306714d8e8SKurt Hackel 				goto next;
12316714d8e8SKurt Hackel 			}
12326714d8e8SKurt Hackel 
12336714d8e8SKurt Hackel 			clear_bit(node, mle->maybe_map);
12346714d8e8SKurt Hackel 			if (node > dlm->node_num)
12356714d8e8SKurt Hackel 				goto next;
12366714d8e8SKurt Hackel 
12376714d8e8SKurt Hackel 			mlog(0, "dead node in map!\n");
12386714d8e8SKurt Hackel 			/* yuck. go back and re-contact all nodes
12396714d8e8SKurt Hackel 			 * in the vote_map, removing this node. */
12406714d8e8SKurt Hackel 			memset(mle->response_map, 0,
12416714d8e8SKurt Hackel 			       sizeof(mle->response_map));
12426714d8e8SKurt Hackel 		}
12436714d8e8SKurt Hackel 		ret = -EAGAIN;
12446714d8e8SKurt Hackel next:
12456714d8e8SKurt Hackel 		node = dlm_bitmap_diff_iter_next(&bdi, &sc);
12466714d8e8SKurt Hackel 	}
12476714d8e8SKurt Hackel 	return ret;
12486714d8e8SKurt Hackel }
12496714d8e8SKurt Hackel 
12506714d8e8SKurt Hackel 
12516714d8e8SKurt Hackel /*
12526714d8e8SKurt Hackel  * DLM_MASTER_REQUEST_MSG
12536714d8e8SKurt Hackel  *
12546714d8e8SKurt Hackel  * returns: 0 on success,
12556714d8e8SKurt Hackel  *          -errno on a network error
12566714d8e8SKurt Hackel  *
12576714d8e8SKurt Hackel  * on error, the caller should assume the target node is "dead"
12586714d8e8SKurt Hackel  *
12596714d8e8SKurt Hackel  */
12606714d8e8SKurt Hackel 
12616714d8e8SKurt Hackel static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to)
12626714d8e8SKurt Hackel {
12636714d8e8SKurt Hackel 	struct dlm_ctxt *dlm = mle->dlm;
12646714d8e8SKurt Hackel 	struct dlm_master_request request;
12656714d8e8SKurt Hackel 	int ret, response=0, resend;
12666714d8e8SKurt Hackel 
12676714d8e8SKurt Hackel 	memset(&request, 0, sizeof(request));
12686714d8e8SKurt Hackel 	request.node_idx = dlm->node_num;
12696714d8e8SKurt Hackel 
12706714d8e8SKurt Hackel 	BUG_ON(mle->type == DLM_MLE_MIGRATION);
12716714d8e8SKurt Hackel 
12726714d8e8SKurt Hackel 	if (mle->type != DLM_MLE_MASTER) {
12736714d8e8SKurt Hackel 		request.namelen = mle->u.name.len;
12746714d8e8SKurt Hackel 		memcpy(request.name, mle->u.name.name, request.namelen);
12756714d8e8SKurt Hackel 	} else {
12766714d8e8SKurt Hackel 		request.namelen = mle->u.res->lockname.len;
12776714d8e8SKurt Hackel 		memcpy(request.name, mle->u.res->lockname.name,
12786714d8e8SKurt Hackel 			request.namelen);
12796714d8e8SKurt Hackel 	}
12806714d8e8SKurt Hackel 
12816714d8e8SKurt Hackel again:
12826714d8e8SKurt Hackel 	ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
12836714d8e8SKurt Hackel 				 sizeof(request), to, &response);
12846714d8e8SKurt Hackel 	if (ret < 0)  {
12856714d8e8SKurt Hackel 		if (ret == -ESRCH) {
12866714d8e8SKurt Hackel 			/* should never happen */
12876714d8e8SKurt Hackel 			mlog(ML_ERROR, "TCP stack not ready!\n");
12886714d8e8SKurt Hackel 			BUG();
12896714d8e8SKurt Hackel 		} else if (ret == -EINVAL) {
12906714d8e8SKurt Hackel 			mlog(ML_ERROR, "bad args passed to o2net!\n");
12916714d8e8SKurt Hackel 			BUG();
12926714d8e8SKurt Hackel 		} else if (ret == -ENOMEM) {
12936714d8e8SKurt Hackel 			mlog(ML_ERROR, "out of memory while trying to send "
12946714d8e8SKurt Hackel 			     "network message!  retrying\n");
12956714d8e8SKurt Hackel 			/* this is totally crude */
12966714d8e8SKurt Hackel 			msleep(50);
12976714d8e8SKurt Hackel 			goto again;
12986714d8e8SKurt Hackel 		} else if (!dlm_is_host_down(ret)) {
12996714d8e8SKurt Hackel 			/* not a network error. bad. */
13006714d8e8SKurt Hackel 			mlog_errno(ret);
13016714d8e8SKurt Hackel 			mlog(ML_ERROR, "unhandled error!");
13026714d8e8SKurt Hackel 			BUG();
13036714d8e8SKurt Hackel 		}
13046714d8e8SKurt Hackel 		/* all other errors should be network errors,
13056714d8e8SKurt Hackel 		 * and likely indicate node death */
13066714d8e8SKurt Hackel 		mlog(ML_ERROR, "link to %d went down!\n", to);
13076714d8e8SKurt Hackel 		goto out;
13086714d8e8SKurt Hackel 	}
13096714d8e8SKurt Hackel 
13106714d8e8SKurt Hackel 	ret = 0;
13116714d8e8SKurt Hackel 	resend = 0;
13126714d8e8SKurt Hackel 	spin_lock(&mle->spinlock);
13136714d8e8SKurt Hackel 	switch (response) {
13146714d8e8SKurt Hackel 		case DLM_MASTER_RESP_YES:
13156714d8e8SKurt Hackel 			set_bit(to, mle->response_map);
13166714d8e8SKurt Hackel 			mlog(0, "node %u is the master, response=YES\n", to);
13176714d8e8SKurt Hackel 			mle->master = to;
13186714d8e8SKurt Hackel 			break;
13196714d8e8SKurt Hackel 		case DLM_MASTER_RESP_NO:
13206714d8e8SKurt Hackel 			mlog(0, "node %u not master, response=NO\n", to);
13216714d8e8SKurt Hackel 			set_bit(to, mle->response_map);
13226714d8e8SKurt Hackel 			break;
13236714d8e8SKurt Hackel 		case DLM_MASTER_RESP_MAYBE:
13246714d8e8SKurt Hackel 			mlog(0, "node %u not master, response=MAYBE\n", to);
13256714d8e8SKurt Hackel 			set_bit(to, mle->response_map);
13266714d8e8SKurt Hackel 			set_bit(to, mle->maybe_map);
13276714d8e8SKurt Hackel 			break;
13286714d8e8SKurt Hackel 		case DLM_MASTER_RESP_ERROR:
13296714d8e8SKurt Hackel 			mlog(0, "node %u hit an error, resending\n", to);
13306714d8e8SKurt Hackel 			resend = 1;
13316714d8e8SKurt Hackel 			response = 0;
13326714d8e8SKurt Hackel 			break;
13336714d8e8SKurt Hackel 		default:
13346714d8e8SKurt Hackel 			mlog(ML_ERROR, "bad response! %u\n", response);
13356714d8e8SKurt Hackel 			BUG();
13366714d8e8SKurt Hackel 	}
13376714d8e8SKurt Hackel 	spin_unlock(&mle->spinlock);
13386714d8e8SKurt Hackel 	if (resend) {
13396714d8e8SKurt Hackel 		/* this is also totally crude */
13406714d8e8SKurt Hackel 		msleep(50);
13416714d8e8SKurt Hackel 		goto again;
13426714d8e8SKurt Hackel 	}
13436714d8e8SKurt Hackel 
13446714d8e8SKurt Hackel out:
13456714d8e8SKurt Hackel 	return ret;
13466714d8e8SKurt Hackel }
13476714d8e8SKurt Hackel 
13486714d8e8SKurt Hackel /*
13496714d8e8SKurt Hackel  * locks that can be taken here:
13506714d8e8SKurt Hackel  * dlm->spinlock
13516714d8e8SKurt Hackel  * res->spinlock
13526714d8e8SKurt Hackel  * mle->spinlock
13536714d8e8SKurt Hackel  * dlm->master_list
13546714d8e8SKurt Hackel  *
13556714d8e8SKurt Hackel  * if possible, TRIM THIS DOWN!!!
13566714d8e8SKurt Hackel  */
13576714d8e8SKurt Hackel int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
13586714d8e8SKurt Hackel {
13596714d8e8SKurt Hackel 	u8 response = DLM_MASTER_RESP_MAYBE;
13606714d8e8SKurt Hackel 	struct dlm_ctxt *dlm = data;
13619c6510a5SKurt Hackel 	struct dlm_lock_resource *res = NULL;
13626714d8e8SKurt Hackel 	struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
13636714d8e8SKurt Hackel 	struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
13646714d8e8SKurt Hackel 	char *name;
1365a3d33291SMark Fasheh 	unsigned int namelen, hash;
13666714d8e8SKurt Hackel 	int found, ret;
13676714d8e8SKurt Hackel 	int set_maybe;
13689c6510a5SKurt Hackel 	int dispatch_assert = 0;
13696714d8e8SKurt Hackel 
13706714d8e8SKurt Hackel 	if (!dlm_grab(dlm))
13716714d8e8SKurt Hackel 		return DLM_MASTER_RESP_NO;
13726714d8e8SKurt Hackel 
13736714d8e8SKurt Hackel 	if (!dlm_domain_fully_joined(dlm)) {
13746714d8e8SKurt Hackel 		response = DLM_MASTER_RESP_NO;
13756714d8e8SKurt Hackel 		goto send_response;
13766714d8e8SKurt Hackel 	}
13776714d8e8SKurt Hackel 
13786714d8e8SKurt Hackel 	name = request->name;
13796714d8e8SKurt Hackel 	namelen = request->namelen;
1380a3d33291SMark Fasheh 	hash = dlm_lockid_hash(name, namelen);
13816714d8e8SKurt Hackel 
13826714d8e8SKurt Hackel 	if (namelen > DLM_LOCKID_NAME_MAX) {
13836714d8e8SKurt Hackel 		response = DLM_IVBUFLEN;
13846714d8e8SKurt Hackel 		goto send_response;
13856714d8e8SKurt Hackel 	}
13866714d8e8SKurt Hackel 
13876714d8e8SKurt Hackel way_up_top:
13886714d8e8SKurt Hackel 	spin_lock(&dlm->spinlock);
1389a3d33291SMark Fasheh 	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
13906714d8e8SKurt Hackel 	if (res) {
13916714d8e8SKurt Hackel 		spin_unlock(&dlm->spinlock);
13926714d8e8SKurt Hackel 
13936714d8e8SKurt Hackel 		/* take care of the easy cases up front */
13946714d8e8SKurt Hackel 		spin_lock(&res->spinlock);
13956714d8e8SKurt Hackel 		if (res->state & DLM_LOCK_RES_RECOVERING) {
13966714d8e8SKurt Hackel 			spin_unlock(&res->spinlock);
13976714d8e8SKurt Hackel 			mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
13986714d8e8SKurt Hackel 			     "being recovered\n");
13996714d8e8SKurt Hackel 			response = DLM_MASTER_RESP_ERROR;
14006714d8e8SKurt Hackel 			if (mle)
14016714d8e8SKurt Hackel 				kmem_cache_free(dlm_mle_cache, mle);
14026714d8e8SKurt Hackel 			goto send_response;
14036714d8e8SKurt Hackel 		}
14046714d8e8SKurt Hackel 
14056714d8e8SKurt Hackel 		if (res->owner == dlm->node_num) {
14066714d8e8SKurt Hackel 			spin_unlock(&res->spinlock);
14076714d8e8SKurt Hackel 			// mlog(0, "this node is the master\n");
14086714d8e8SKurt Hackel 			response = DLM_MASTER_RESP_YES;
14096714d8e8SKurt Hackel 			if (mle)
14106714d8e8SKurt Hackel 				kmem_cache_free(dlm_mle_cache, mle);
14116714d8e8SKurt Hackel 
14126714d8e8SKurt Hackel 			/* this node is the owner.
14136714d8e8SKurt Hackel 			 * there is some extra work that needs to
14146714d8e8SKurt Hackel 			 * happen now.  the requesting node has
14156714d8e8SKurt Hackel 			 * caused all nodes up to this one to
14166714d8e8SKurt Hackel 			 * create mles.  this node now needs to
14176714d8e8SKurt Hackel 			 * go back and clean those up. */
14189c6510a5SKurt Hackel 			dispatch_assert = 1;
14196714d8e8SKurt Hackel 			goto send_response;
14206714d8e8SKurt Hackel 		} else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
14216714d8e8SKurt Hackel 			spin_unlock(&res->spinlock);
14226714d8e8SKurt Hackel 			// mlog(0, "node %u is the master\n", res->owner);
14236714d8e8SKurt Hackel 			response = DLM_MASTER_RESP_NO;
14246714d8e8SKurt Hackel 			if (mle)
14256714d8e8SKurt Hackel 				kmem_cache_free(dlm_mle_cache, mle);
14266714d8e8SKurt Hackel 			goto send_response;
14276714d8e8SKurt Hackel 		}
14286714d8e8SKurt Hackel 
14296714d8e8SKurt Hackel 		/* ok, there is no owner.  either this node is
14306714d8e8SKurt Hackel 		 * being blocked, or it is actively trying to
14316714d8e8SKurt Hackel 		 * master this lock. */
14326714d8e8SKurt Hackel 		if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
14336714d8e8SKurt Hackel 			mlog(ML_ERROR, "lock with no owner should be "
14346714d8e8SKurt Hackel 			     "in-progress!\n");
14356714d8e8SKurt Hackel 			BUG();
14366714d8e8SKurt Hackel 		}
14376714d8e8SKurt Hackel 
14386714d8e8SKurt Hackel 		// mlog(0, "lockres is in progress...\n");
14396714d8e8SKurt Hackel 		spin_lock(&dlm->master_lock);
14406714d8e8SKurt Hackel 		found = dlm_find_mle(dlm, &tmpmle, name, namelen);
14416714d8e8SKurt Hackel 		if (!found) {
14426714d8e8SKurt Hackel 			mlog(ML_ERROR, "no mle found for this lock!\n");
14436714d8e8SKurt Hackel 			BUG();
14446714d8e8SKurt Hackel 		}
14456714d8e8SKurt Hackel 		set_maybe = 1;
14466714d8e8SKurt Hackel 		spin_lock(&tmpmle->spinlock);
14476714d8e8SKurt Hackel 		if (tmpmle->type == DLM_MLE_BLOCK) {
14486714d8e8SKurt Hackel 			// mlog(0, "this node is waiting for "
14496714d8e8SKurt Hackel 			// "lockres to be mastered\n");
14506714d8e8SKurt Hackel 			response = DLM_MASTER_RESP_NO;
14516714d8e8SKurt Hackel 		} else if (tmpmle->type == DLM_MLE_MIGRATION) {
14526714d8e8SKurt Hackel 			mlog(0, "node %u is master, but trying to migrate to "
14536714d8e8SKurt Hackel 			     "node %u.\n", tmpmle->master, tmpmle->new_master);
14546714d8e8SKurt Hackel 			if (tmpmle->master == dlm->node_num) {
14556714d8e8SKurt Hackel 				response = DLM_MASTER_RESP_YES;
14566714d8e8SKurt Hackel 				mlog(ML_ERROR, "no owner on lockres, but this "
14576714d8e8SKurt Hackel 				     "node is trying to migrate it to %u?!\n",
14586714d8e8SKurt Hackel 				     tmpmle->new_master);
14596714d8e8SKurt Hackel 				BUG();
14606714d8e8SKurt Hackel 			} else {
14616714d8e8SKurt Hackel 				/* the real master can respond on its own */
14626714d8e8SKurt Hackel 				response = DLM_MASTER_RESP_NO;
14636714d8e8SKurt Hackel 			}
14646714d8e8SKurt Hackel 		} else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
14656714d8e8SKurt Hackel 			set_maybe = 0;
14669c6510a5SKurt Hackel 			if (tmpmle->master == dlm->node_num) {
14676714d8e8SKurt Hackel 				response = DLM_MASTER_RESP_YES;
14689c6510a5SKurt Hackel 				/* this node will be the owner.
14699c6510a5SKurt Hackel 				 * go back and clean the mles on any
14709c6510a5SKurt Hackel 				 * other nodes */
14719c6510a5SKurt Hackel 				dispatch_assert = 1;
14729c6510a5SKurt Hackel 			} else
14736714d8e8SKurt Hackel 				response = DLM_MASTER_RESP_NO;
14746714d8e8SKurt Hackel 		} else {
14756714d8e8SKurt Hackel 			// mlog(0, "this node is attempting to "
14766714d8e8SKurt Hackel 			// "master lockres\n");
14776714d8e8SKurt Hackel 			response = DLM_MASTER_RESP_MAYBE;
14786714d8e8SKurt Hackel 		}
14796714d8e8SKurt Hackel 		if (set_maybe)
14806714d8e8SKurt Hackel 			set_bit(request->node_idx, tmpmle->maybe_map);
14816714d8e8SKurt Hackel 		spin_unlock(&tmpmle->spinlock);
14826714d8e8SKurt Hackel 
14836714d8e8SKurt Hackel 		spin_unlock(&dlm->master_lock);
14846714d8e8SKurt Hackel 		spin_unlock(&res->spinlock);
14856714d8e8SKurt Hackel 
14866714d8e8SKurt Hackel 		/* keep the mle attached to heartbeat events */
14876714d8e8SKurt Hackel 		dlm_put_mle(tmpmle);
14886714d8e8SKurt Hackel 		if (mle)
14896714d8e8SKurt Hackel 			kmem_cache_free(dlm_mle_cache, mle);
14906714d8e8SKurt Hackel 		goto send_response;
14916714d8e8SKurt Hackel 	}
14926714d8e8SKurt Hackel 
14936714d8e8SKurt Hackel 	/*
14946714d8e8SKurt Hackel 	 * lockres doesn't exist on this node
14956714d8e8SKurt Hackel 	 * if there is an MLE_BLOCK, return NO
14966714d8e8SKurt Hackel 	 * if there is an MLE_MASTER, return MAYBE
14976714d8e8SKurt Hackel 	 * otherwise, add an MLE_BLOCK, return NO
14986714d8e8SKurt Hackel 	 */
14996714d8e8SKurt Hackel 	spin_lock(&dlm->master_lock);
15006714d8e8SKurt Hackel 	found = dlm_find_mle(dlm, &tmpmle, name, namelen);
15016714d8e8SKurt Hackel 	if (!found) {
15026714d8e8SKurt Hackel 		/* this lockid has never been seen on this node yet */
15036714d8e8SKurt Hackel 		// mlog(0, "no mle found\n");
15046714d8e8SKurt Hackel 		if (!mle) {
15056714d8e8SKurt Hackel 			spin_unlock(&dlm->master_lock);
15066714d8e8SKurt Hackel 			spin_unlock(&dlm->spinlock);
15076714d8e8SKurt Hackel 
15086714d8e8SKurt Hackel 			mle = (struct dlm_master_list_entry *)
15096714d8e8SKurt Hackel 				kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
15106714d8e8SKurt Hackel 			if (!mle) {
15116714d8e8SKurt Hackel 				response = DLM_MASTER_RESP_ERROR;
15129c6510a5SKurt Hackel 				mlog_errno(-ENOMEM);
15136714d8e8SKurt Hackel 				goto send_response;
15146714d8e8SKurt Hackel 			}
15156714d8e8SKurt Hackel 			spin_lock(&dlm->spinlock);
15166714d8e8SKurt Hackel 			dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL,
15176714d8e8SKurt Hackel 					 name, namelen);
15186714d8e8SKurt Hackel 			spin_unlock(&dlm->spinlock);
15196714d8e8SKurt Hackel 			goto way_up_top;
15206714d8e8SKurt Hackel 		}
15216714d8e8SKurt Hackel 
15226714d8e8SKurt Hackel 		// mlog(0, "this is second time thru, already allocated, "
15236714d8e8SKurt Hackel 		// "add the block.\n");
15246714d8e8SKurt Hackel 		set_bit(request->node_idx, mle->maybe_map);
15256714d8e8SKurt Hackel 		list_add(&mle->list, &dlm->master_list);
15266714d8e8SKurt Hackel 		response = DLM_MASTER_RESP_NO;
15276714d8e8SKurt Hackel 	} else {
15286714d8e8SKurt Hackel 		// mlog(0, "mle was found\n");
15296714d8e8SKurt Hackel 		set_maybe = 1;
15306714d8e8SKurt Hackel 		spin_lock(&tmpmle->spinlock);
15319c6510a5SKurt Hackel 		if (tmpmle->master == dlm->node_num) {
15329c6510a5SKurt Hackel 			mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
15339c6510a5SKurt Hackel 			BUG();
15349c6510a5SKurt Hackel 		}
15356714d8e8SKurt Hackel 		if (tmpmle->type == DLM_MLE_BLOCK)
15366714d8e8SKurt Hackel 			response = DLM_MASTER_RESP_NO;
15376714d8e8SKurt Hackel 		else if (tmpmle->type == DLM_MLE_MIGRATION) {
15386714d8e8SKurt Hackel 			mlog(0, "migration mle was found (%u->%u)\n",
15396714d8e8SKurt Hackel 			     tmpmle->master, tmpmle->new_master);
15406714d8e8SKurt Hackel 			/* real master can respond on its own */
15416714d8e8SKurt Hackel 			response = DLM_MASTER_RESP_NO;
15426714d8e8SKurt Hackel 		} else
15436714d8e8SKurt Hackel 			response = DLM_MASTER_RESP_MAYBE;
15446714d8e8SKurt Hackel 		if (set_maybe)
15456714d8e8SKurt Hackel 			set_bit(request->node_idx, tmpmle->maybe_map);
15466714d8e8SKurt Hackel 		spin_unlock(&tmpmle->spinlock);
15476714d8e8SKurt Hackel 	}
15486714d8e8SKurt Hackel 	spin_unlock(&dlm->master_lock);
15496714d8e8SKurt Hackel 	spin_unlock(&dlm->spinlock);
15506714d8e8SKurt Hackel 
15516714d8e8SKurt Hackel 	if (found) {
15526714d8e8SKurt Hackel 		/* keep the mle attached to heartbeat events */
15536714d8e8SKurt Hackel 		dlm_put_mle(tmpmle);
15546714d8e8SKurt Hackel 	}
15556714d8e8SKurt Hackel send_response:
15569c6510a5SKurt Hackel 
15579c6510a5SKurt Hackel 	if (dispatch_assert) {
15589c6510a5SKurt Hackel 		if (response != DLM_MASTER_RESP_YES)
15599c6510a5SKurt Hackel 			mlog(ML_ERROR, "invalid response %d\n", response);
15609c6510a5SKurt Hackel 		if (!res) {
15619c6510a5SKurt Hackel 			mlog(ML_ERROR, "bad lockres while trying to assert!\n");
15629c6510a5SKurt Hackel 			BUG();
15639c6510a5SKurt Hackel 		}
15649c6510a5SKurt Hackel 		mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
15659c6510a5SKurt Hackel 			     dlm->node_num, res->lockname.len, res->lockname.name);
15669c6510a5SKurt Hackel 		ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
15679c6510a5SKurt Hackel 						 DLM_ASSERT_MASTER_MLE_CLEANUP);
15689c6510a5SKurt Hackel 		if (ret < 0) {
15699c6510a5SKurt Hackel 			mlog(ML_ERROR, "failed to dispatch assert master work\n");
15709c6510a5SKurt Hackel 			response = DLM_MASTER_RESP_ERROR;
15719c6510a5SKurt Hackel 		}
15729c6510a5SKurt Hackel 	}
15739c6510a5SKurt Hackel 
15746714d8e8SKurt Hackel 	dlm_put(dlm);
15756714d8e8SKurt Hackel 	return response;
15766714d8e8SKurt Hackel }
15776714d8e8SKurt Hackel 
15786714d8e8SKurt Hackel /*
15796714d8e8SKurt Hackel  * DLM_ASSERT_MASTER_MSG
15806714d8e8SKurt Hackel  */
15816714d8e8SKurt Hackel 
15826714d8e8SKurt Hackel 
15836714d8e8SKurt Hackel /*
15846714d8e8SKurt Hackel  * NOTE: this can be used for debugging
15856714d8e8SKurt Hackel  * can periodically run all locks owned by this node
15866714d8e8SKurt Hackel  * and re-assert across the cluster...
15876714d8e8SKurt Hackel  */
15886714d8e8SKurt Hackel static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
15896714d8e8SKurt Hackel 				unsigned int namelen, void *nodemap,
15906714d8e8SKurt Hackel 				u32 flags)
15916714d8e8SKurt Hackel {
15926714d8e8SKurt Hackel 	struct dlm_assert_master assert;
15936714d8e8SKurt Hackel 	int to, tmpret;
15946714d8e8SKurt Hackel 	struct dlm_node_iter iter;
15956714d8e8SKurt Hackel 	int ret = 0;
15969c6510a5SKurt Hackel 	int reassert;
15976714d8e8SKurt Hackel 
15986714d8e8SKurt Hackel 	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
15999c6510a5SKurt Hackel again:
16009c6510a5SKurt Hackel 	reassert = 0;
16016714d8e8SKurt Hackel 
16026714d8e8SKurt Hackel 	/* note that if this nodemap is empty, it returns 0 */
16036714d8e8SKurt Hackel 	dlm_node_iter_init(nodemap, &iter);
16046714d8e8SKurt Hackel 	while ((to = dlm_node_iter_next(&iter)) >= 0) {
16056714d8e8SKurt Hackel 		int r = 0;
16066714d8e8SKurt Hackel 		mlog(0, "sending assert master to %d (%.*s)\n", to,
16076714d8e8SKurt Hackel 		     namelen, lockname);
16086714d8e8SKurt Hackel 		memset(&assert, 0, sizeof(assert));
16096714d8e8SKurt Hackel 		assert.node_idx = dlm->node_num;
16106714d8e8SKurt Hackel 		assert.namelen = namelen;
16116714d8e8SKurt Hackel 		memcpy(assert.name, lockname, namelen);
16126714d8e8SKurt Hackel 		assert.flags = cpu_to_be32(flags);
16136714d8e8SKurt Hackel 
16146714d8e8SKurt Hackel 		tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
16156714d8e8SKurt Hackel 					    &assert, sizeof(assert), to, &r);
16166714d8e8SKurt Hackel 		if (tmpret < 0) {
16176714d8e8SKurt Hackel 			mlog(ML_ERROR, "assert_master returned %d!\n", tmpret);
16186714d8e8SKurt Hackel 			if (!dlm_is_host_down(tmpret)) {
16196714d8e8SKurt Hackel 				mlog(ML_ERROR, "unhandled error!\n");
16206714d8e8SKurt Hackel 				BUG();
16216714d8e8SKurt Hackel 			}
16226714d8e8SKurt Hackel 			/* a node died.  finish out the rest of the nodes. */
16236714d8e8SKurt Hackel 			mlog(ML_ERROR, "link to %d went down!\n", to);
16246714d8e8SKurt Hackel 			/* any nonzero status return will do */
16256714d8e8SKurt Hackel 			ret = tmpret;
16266714d8e8SKurt Hackel 		} else if (r < 0) {
16276714d8e8SKurt Hackel 			/* ok, something horribly messed.  kill thyself. */
16286714d8e8SKurt Hackel 			mlog(ML_ERROR,"during assert master of %.*s to %u, "
16296714d8e8SKurt Hackel 			     "got %d.\n", namelen, lockname, to, r);
16306714d8e8SKurt Hackel 			dlm_dump_lock_resources(dlm);
16316714d8e8SKurt Hackel 			BUG();
16329c6510a5SKurt Hackel 		} else if (r == EAGAIN) {
16339c6510a5SKurt Hackel 			mlog(0, "%.*s: node %u create mles on other "
16349c6510a5SKurt Hackel 			     "nodes and requests a re-assert\n",
16359c6510a5SKurt Hackel 			     namelen, lockname, to);
16369c6510a5SKurt Hackel 			reassert = 1;
16376714d8e8SKurt Hackel 		}
16386714d8e8SKurt Hackel 	}
16396714d8e8SKurt Hackel 
16409c6510a5SKurt Hackel 	if (reassert)
16419c6510a5SKurt Hackel 		goto again;
16429c6510a5SKurt Hackel 
16436714d8e8SKurt Hackel 	return ret;
16446714d8e8SKurt Hackel }
16456714d8e8SKurt Hackel 
16466714d8e8SKurt Hackel /*
16476714d8e8SKurt Hackel  * locks that can be taken here:
16486714d8e8SKurt Hackel  * dlm->spinlock
16496714d8e8SKurt Hackel  * res->spinlock
16506714d8e8SKurt Hackel  * mle->spinlock
16516714d8e8SKurt Hackel  * dlm->master_list
16526714d8e8SKurt Hackel  *
16536714d8e8SKurt Hackel  * if possible, TRIM THIS DOWN!!!
16546714d8e8SKurt Hackel  */
16556714d8e8SKurt Hackel int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
16566714d8e8SKurt Hackel {
16576714d8e8SKurt Hackel 	struct dlm_ctxt *dlm = data;
16586714d8e8SKurt Hackel 	struct dlm_master_list_entry *mle = NULL;
16596714d8e8SKurt Hackel 	struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
16606714d8e8SKurt Hackel 	struct dlm_lock_resource *res = NULL;
16616714d8e8SKurt Hackel 	char *name;
1662a3d33291SMark Fasheh 	unsigned int namelen, hash;
16636714d8e8SKurt Hackel 	u32 flags;
16649c6510a5SKurt Hackel 	int master_request = 0;
16659c6510a5SKurt Hackel 	int ret = 0;
16666714d8e8SKurt Hackel 
16676714d8e8SKurt Hackel 	if (!dlm_grab(dlm))
16686714d8e8SKurt Hackel 		return 0;
16696714d8e8SKurt Hackel 
16706714d8e8SKurt Hackel 	name = assert->name;
16716714d8e8SKurt Hackel 	namelen = assert->namelen;
1672a3d33291SMark Fasheh 	hash = dlm_lockid_hash(name, namelen);
16736714d8e8SKurt Hackel 	flags = be32_to_cpu(assert->flags);
16746714d8e8SKurt Hackel 
16756714d8e8SKurt Hackel 	if (namelen > DLM_LOCKID_NAME_MAX) {
16766714d8e8SKurt Hackel 		mlog(ML_ERROR, "Invalid name length!");
16776714d8e8SKurt Hackel 		goto done;
16786714d8e8SKurt Hackel 	}
16796714d8e8SKurt Hackel 
16806714d8e8SKurt Hackel 	spin_lock(&dlm->spinlock);
16816714d8e8SKurt Hackel 
16826714d8e8SKurt Hackel 	if (flags)
16836714d8e8SKurt Hackel 		mlog(0, "assert_master with flags: %u\n", flags);
16846714d8e8SKurt Hackel 
16856714d8e8SKurt Hackel 	/* find the MLE */
16866714d8e8SKurt Hackel 	spin_lock(&dlm->master_lock);
16876714d8e8SKurt Hackel 	if (!dlm_find_mle(dlm, &mle, name, namelen)) {
16886714d8e8SKurt Hackel 		/* not an error, could be master just re-asserting */
16896714d8e8SKurt Hackel 		mlog(0, "just got an assert_master from %u, but no "
16906714d8e8SKurt Hackel 		     "MLE for it! (%.*s)\n", assert->node_idx,
16916714d8e8SKurt Hackel 		     namelen, name);
16926714d8e8SKurt Hackel 	} else {
16936714d8e8SKurt Hackel 		int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
16946714d8e8SKurt Hackel 		if (bit >= O2NM_MAX_NODES) {
16956714d8e8SKurt Hackel 			/* not necessarily an error, though less likely.
16966714d8e8SKurt Hackel 			 * could be master just re-asserting. */
16976714d8e8SKurt Hackel 			mlog(ML_ERROR, "no bits set in the maybe_map, but %u "
16986714d8e8SKurt Hackel 			     "is asserting! (%.*s)\n", assert->node_idx,
16996714d8e8SKurt Hackel 			     namelen, name);
17006714d8e8SKurt Hackel 		} else if (bit != assert->node_idx) {
17016714d8e8SKurt Hackel 			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
17026714d8e8SKurt Hackel 				mlog(0, "master %u was found, %u should "
17036714d8e8SKurt Hackel 				     "back off\n", assert->node_idx, bit);
17046714d8e8SKurt Hackel 			} else {
17056714d8e8SKurt Hackel 				/* with the fix for bug 569, a higher node
17066714d8e8SKurt Hackel 				 * number winning the mastery will respond
17076714d8e8SKurt Hackel 				 * YES to mastery requests, but this node
17086714d8e8SKurt Hackel 				 * had no way of knowing.  let it pass. */
17096714d8e8SKurt Hackel 				mlog(ML_ERROR, "%u is the lowest node, "
17106714d8e8SKurt Hackel 				     "%u is asserting. (%.*s)  %u must "
17116714d8e8SKurt Hackel 				     "have begun after %u won.\n", bit,
17126714d8e8SKurt Hackel 				     assert->node_idx, namelen, name, bit,
17136714d8e8SKurt Hackel 				     assert->node_idx);
17146714d8e8SKurt Hackel 			}
17156714d8e8SKurt Hackel 		}
17166714d8e8SKurt Hackel 	}
17176714d8e8SKurt Hackel 	spin_unlock(&dlm->master_lock);
17186714d8e8SKurt Hackel 
17196714d8e8SKurt Hackel 	/* ok everything checks out with the MLE
17206714d8e8SKurt Hackel 	 * now check to see if there is a lockres */
1721a3d33291SMark Fasheh 	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
17226714d8e8SKurt Hackel 	if (res) {
17236714d8e8SKurt Hackel 		spin_lock(&res->spinlock);
17246714d8e8SKurt Hackel 		if (res->state & DLM_LOCK_RES_RECOVERING)  {
17256714d8e8SKurt Hackel 			mlog(ML_ERROR, "%u asserting but %.*s is "
17266714d8e8SKurt Hackel 			     "RECOVERING!\n", assert->node_idx, namelen, name);
17276714d8e8SKurt Hackel 			goto kill;
17286714d8e8SKurt Hackel 		}
17296714d8e8SKurt Hackel 		if (!mle) {
17306714d8e8SKurt Hackel 			if (res->owner != assert->node_idx) {
17316714d8e8SKurt Hackel 				mlog(ML_ERROR, "assert_master from "
17326714d8e8SKurt Hackel 					  "%u, but current owner is "
17336714d8e8SKurt Hackel 					  "%u! (%.*s)\n",
17346714d8e8SKurt Hackel 				       assert->node_idx, res->owner,
17356714d8e8SKurt Hackel 				       namelen, name);
17366714d8e8SKurt Hackel 				goto kill;
17376714d8e8SKurt Hackel 			}
17386714d8e8SKurt Hackel 		} else if (mle->type != DLM_MLE_MIGRATION) {
17396714d8e8SKurt Hackel 			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
17406714d8e8SKurt Hackel 				/* owner is just re-asserting */
17416714d8e8SKurt Hackel 				if (res->owner == assert->node_idx) {
17426714d8e8SKurt Hackel 					mlog(0, "owner %u re-asserting on "
17436714d8e8SKurt Hackel 					     "lock %.*s\n", assert->node_idx,
17446714d8e8SKurt Hackel 					     namelen, name);
17456714d8e8SKurt Hackel 					goto ok;
17466714d8e8SKurt Hackel 				}
17476714d8e8SKurt Hackel 				mlog(ML_ERROR, "got assert_master from "
17486714d8e8SKurt Hackel 				     "node %u, but %u is the owner! "
17496714d8e8SKurt Hackel 				     "(%.*s)\n", assert->node_idx,
17506714d8e8SKurt Hackel 				     res->owner, namelen, name);
17516714d8e8SKurt Hackel 				goto kill;
17526714d8e8SKurt Hackel 			}
17536714d8e8SKurt Hackel 			if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
17546714d8e8SKurt Hackel 				mlog(ML_ERROR, "got assert from %u, but lock "
17556714d8e8SKurt Hackel 				     "with no owner should be "
17566714d8e8SKurt Hackel 				     "in-progress! (%.*s)\n",
17576714d8e8SKurt Hackel 				     assert->node_idx,
17586714d8e8SKurt Hackel 				     namelen, name);
17596714d8e8SKurt Hackel 				goto kill;
17606714d8e8SKurt Hackel 			}
17616714d8e8SKurt Hackel 		} else /* mle->type == DLM_MLE_MIGRATION */ {
17626714d8e8SKurt Hackel 			/* should only be getting an assert from new master */
17636714d8e8SKurt Hackel 			if (assert->node_idx != mle->new_master) {
17646714d8e8SKurt Hackel 				mlog(ML_ERROR, "got assert from %u, but "
17656714d8e8SKurt Hackel 				     "new master is %u, and old master "
17666714d8e8SKurt Hackel 				     "was %u (%.*s)\n",
17676714d8e8SKurt Hackel 				     assert->node_idx, mle->new_master,
17686714d8e8SKurt Hackel 				     mle->master, namelen, name);
17696714d8e8SKurt Hackel 				goto kill;
17706714d8e8SKurt Hackel 			}
17716714d8e8SKurt Hackel 
17726714d8e8SKurt Hackel 		}
17736714d8e8SKurt Hackel ok:
17746714d8e8SKurt Hackel 		spin_unlock(&res->spinlock);
17756714d8e8SKurt Hackel 	}
17766714d8e8SKurt Hackel 	spin_unlock(&dlm->spinlock);
17776714d8e8SKurt Hackel 
17786714d8e8SKurt Hackel 	// mlog(0, "woo!  got an assert_master from node %u!\n",
17796714d8e8SKurt Hackel 	// 	     assert->node_idx);
17806714d8e8SKurt Hackel 	if (mle) {
17819c6510a5SKurt Hackel 		int extra_ref = 0;
17829c6510a5SKurt Hackel 		int nn = -1;
1783a2bf0477SKurt Hackel 		int rr, err = 0;
17846714d8e8SKurt Hackel 
17856714d8e8SKurt Hackel 		spin_lock(&mle->spinlock);
17869c6510a5SKurt Hackel 		if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
17879c6510a5SKurt Hackel 			extra_ref = 1;
17889c6510a5SKurt Hackel 		else {
17899c6510a5SKurt Hackel 			/* MASTER mle: if any bits set in the response map
17909c6510a5SKurt Hackel 			 * then the calling node needs to re-assert to clear
17919c6510a5SKurt Hackel 			 * up nodes that this node contacted */
17929c6510a5SKurt Hackel 			while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
17939c6510a5SKurt Hackel 						    nn+1)) < O2NM_MAX_NODES) {
17949c6510a5SKurt Hackel 				if (nn != dlm->node_num && nn != assert->node_idx)
17959c6510a5SKurt Hackel 					master_request = 1;
17969c6510a5SKurt Hackel 			}
17979c6510a5SKurt Hackel 		}
17986714d8e8SKurt Hackel 		mle->master = assert->node_idx;
17996714d8e8SKurt Hackel 		atomic_set(&mle->woken, 1);
18006714d8e8SKurt Hackel 		wake_up(&mle->wq);
18016714d8e8SKurt Hackel 		spin_unlock(&mle->spinlock);
18026714d8e8SKurt Hackel 
1803a2bf0477SKurt Hackel 		if (res) {
1804a2bf0477SKurt Hackel 			spin_lock(&res->spinlock);
1805a2bf0477SKurt Hackel 			if (mle->type == DLM_MLE_MIGRATION) {
18066714d8e8SKurt Hackel 				mlog(0, "finishing off migration of lockres %.*s, "
18076714d8e8SKurt Hackel 			     		"from %u to %u\n",
18086714d8e8SKurt Hackel 			       		res->lockname.len, res->lockname.name,
18096714d8e8SKurt Hackel 			       		dlm->node_num, mle->new_master);
18106714d8e8SKurt Hackel 				res->state &= ~DLM_LOCK_RES_MIGRATING;
18116714d8e8SKurt Hackel 				dlm_change_lockres_owner(dlm, res, mle->new_master);
18126714d8e8SKurt Hackel 				BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1813a2bf0477SKurt Hackel 			} else {
1814a2bf0477SKurt Hackel 				dlm_change_lockres_owner(dlm, res, mle->master);
1815a2bf0477SKurt Hackel 			}
18166714d8e8SKurt Hackel 			spin_unlock(&res->spinlock);
18176714d8e8SKurt Hackel 		}
18186714d8e8SKurt Hackel 
1819a2bf0477SKurt Hackel 		/* master is known, detach if not already detached.
1820a2bf0477SKurt Hackel 		 * ensures that only one assert_master call will happen
1821a2bf0477SKurt Hackel 		 * on this mle. */
1822a2bf0477SKurt Hackel 		spin_lock(&dlm->spinlock);
1823a2bf0477SKurt Hackel 		spin_lock(&dlm->master_lock);
1824a2bf0477SKurt Hackel 
1825a2bf0477SKurt Hackel 		rr = atomic_read(&mle->mle_refs.refcount);
1826a2bf0477SKurt Hackel 		if (mle->inuse > 0) {
1827a2bf0477SKurt Hackel 			if (extra_ref && rr < 3)
1828a2bf0477SKurt Hackel 				err = 1;
1829a2bf0477SKurt Hackel 			else if (!extra_ref && rr < 2)
1830a2bf0477SKurt Hackel 				err = 1;
1831a2bf0477SKurt Hackel 		} else {
1832a2bf0477SKurt Hackel 			if (extra_ref && rr < 2)
1833a2bf0477SKurt Hackel 				err = 1;
1834a2bf0477SKurt Hackel 			else if (!extra_ref && rr < 1)
1835a2bf0477SKurt Hackel 				err = 1;
1836a2bf0477SKurt Hackel 		}
1837a2bf0477SKurt Hackel 		if (err) {
1838a2bf0477SKurt Hackel 			mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
1839a2bf0477SKurt Hackel 			     "that will mess up this node, refs=%d, extra=%d, "
1840a2bf0477SKurt Hackel 			     "inuse=%d\n", dlm->name, namelen, name,
1841a2bf0477SKurt Hackel 			     assert->node_idx, rr, extra_ref, mle->inuse);
1842a2bf0477SKurt Hackel 			dlm_print_one_mle(mle);
1843a2bf0477SKurt Hackel 		}
1844a2bf0477SKurt Hackel 		list_del_init(&mle->list);
1845a2bf0477SKurt Hackel 		__dlm_mle_detach_hb_events(dlm, mle);
1846a2bf0477SKurt Hackel 		__dlm_put_mle(mle);
18476714d8e8SKurt Hackel 		if (extra_ref) {
18486714d8e8SKurt Hackel 			/* the assert master message now balances the extra
18496714d8e8SKurt Hackel 		 	 * ref given by the master / migration request message.
18506714d8e8SKurt Hackel 		 	 * if this is the last put, it will be removed
18516714d8e8SKurt Hackel 		 	 * from the list. */
1852a2bf0477SKurt Hackel 			__dlm_put_mle(mle);
1853a2bf0477SKurt Hackel 		}
1854a2bf0477SKurt Hackel 		spin_unlock(&dlm->master_lock);
1855a2bf0477SKurt Hackel 		spin_unlock(&dlm->spinlock);
1856a2bf0477SKurt Hackel 	} else if (res) {
1857a2bf0477SKurt Hackel 		if (res->owner != assert->node_idx) {
1858a2bf0477SKurt Hackel 			mlog(0, "assert_master from %u, but current "
1859a2bf0477SKurt Hackel 			     "owner is %u (%.*s), no mle\n", assert->node_idx,
1860a2bf0477SKurt Hackel 			     res->owner, namelen, name);
18616714d8e8SKurt Hackel 		}
18626714d8e8SKurt Hackel 	}
18636714d8e8SKurt Hackel 
18646714d8e8SKurt Hackel done:
18659c6510a5SKurt Hackel 	ret = 0;
18666714d8e8SKurt Hackel 	if (res)
18676714d8e8SKurt Hackel 		dlm_lockres_put(res);
18686714d8e8SKurt Hackel 	dlm_put(dlm);
18699c6510a5SKurt Hackel 	if (master_request) {
18709c6510a5SKurt Hackel 		mlog(0, "need to tell master to reassert\n");
18719c6510a5SKurt Hackel 		ret = EAGAIN;  // positive. negative would shoot down the node.
18729c6510a5SKurt Hackel 	}
18739c6510a5SKurt Hackel 	return ret;
18746714d8e8SKurt Hackel 
18756714d8e8SKurt Hackel kill:
18766714d8e8SKurt Hackel 	/* kill the caller! */
18776714d8e8SKurt Hackel 	spin_unlock(&res->spinlock);
18786714d8e8SKurt Hackel 	spin_unlock(&dlm->spinlock);
18796714d8e8SKurt Hackel 	dlm_lockres_put(res);
18806714d8e8SKurt Hackel 	mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
18816714d8e8SKurt Hackel 	     "and killing the other node now!  This node is OK and can continue.\n");
18826714d8e8SKurt Hackel 	dlm_dump_lock_resources(dlm);
18836714d8e8SKurt Hackel 	dlm_put(dlm);
18846714d8e8SKurt Hackel 	return -EINVAL;
18856714d8e8SKurt Hackel }
18866714d8e8SKurt Hackel 
18876714d8e8SKurt Hackel int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
18886714d8e8SKurt Hackel 			       struct dlm_lock_resource *res,
18896714d8e8SKurt Hackel 			       int ignore_higher, u8 request_from, u32 flags)
18906714d8e8SKurt Hackel {
18916714d8e8SKurt Hackel 	struct dlm_work_item *item;
18926714d8e8SKurt Hackel 	item = kcalloc(1, sizeof(*item), GFP_KERNEL);
18936714d8e8SKurt Hackel 	if (!item)
18946714d8e8SKurt Hackel 		return -ENOMEM;
18956714d8e8SKurt Hackel 
18966714d8e8SKurt Hackel 
18976714d8e8SKurt Hackel 	/* queue up work for dlm_assert_master_worker */
18986714d8e8SKurt Hackel 	dlm_grab(dlm);  /* get an extra ref for the work item */
18996714d8e8SKurt Hackel 	dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
19006714d8e8SKurt Hackel 	item->u.am.lockres = res; /* already have a ref */
19016714d8e8SKurt Hackel 	/* can optionally ignore node numbers higher than this node */
19026714d8e8SKurt Hackel 	item->u.am.ignore_higher = ignore_higher;
19036714d8e8SKurt Hackel 	item->u.am.request_from = request_from;
19046714d8e8SKurt Hackel 	item->u.am.flags = flags;
19056714d8e8SKurt Hackel 
19069c6510a5SKurt Hackel 	if (ignore_higher)
19079c6510a5SKurt Hackel 		mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
19089c6510a5SKurt Hackel 		     res->lockname.name);
19099c6510a5SKurt Hackel 
19106714d8e8SKurt Hackel 	spin_lock(&dlm->work_lock);
19116714d8e8SKurt Hackel 	list_add_tail(&item->list, &dlm->work_list);
19126714d8e8SKurt Hackel 	spin_unlock(&dlm->work_lock);
19136714d8e8SKurt Hackel 
19146714d8e8SKurt Hackel 	schedule_work(&dlm->dispatched_work);
19156714d8e8SKurt Hackel 	return 0;
19166714d8e8SKurt Hackel }
19176714d8e8SKurt Hackel 
19186714d8e8SKurt Hackel static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
19196714d8e8SKurt Hackel {
19206714d8e8SKurt Hackel 	struct dlm_ctxt *dlm = data;
19216714d8e8SKurt Hackel 	int ret = 0;
19226714d8e8SKurt Hackel 	struct dlm_lock_resource *res;
19236714d8e8SKurt Hackel 	unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
19246714d8e8SKurt Hackel 	int ignore_higher;
19256714d8e8SKurt Hackel 	int bit;
19266714d8e8SKurt Hackel 	u8 request_from;
19276714d8e8SKurt Hackel 	u32 flags;
19286714d8e8SKurt Hackel 
19296714d8e8SKurt Hackel 	dlm = item->dlm;
19306714d8e8SKurt Hackel 	res = item->u.am.lockres;
19316714d8e8SKurt Hackel 	ignore_higher = item->u.am.ignore_higher;
19326714d8e8SKurt Hackel 	request_from = item->u.am.request_from;
19336714d8e8SKurt Hackel 	flags = item->u.am.flags;
19346714d8e8SKurt Hackel 
19356714d8e8SKurt Hackel 	spin_lock(&dlm->spinlock);
19366714d8e8SKurt Hackel 	memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
19376714d8e8SKurt Hackel 	spin_unlock(&dlm->spinlock);
19386714d8e8SKurt Hackel 
19396714d8e8SKurt Hackel 	clear_bit(dlm->node_num, nodemap);
19406714d8e8SKurt Hackel 	if (ignore_higher) {
19416714d8e8SKurt Hackel 		/* if is this just to clear up mles for nodes below
19426714d8e8SKurt Hackel 		 * this node, do not send the message to the original
19436714d8e8SKurt Hackel 		 * caller or any node number higher than this */
19446714d8e8SKurt Hackel 		clear_bit(request_from, nodemap);
19456714d8e8SKurt Hackel 		bit = dlm->node_num;
19466714d8e8SKurt Hackel 		while (1) {
19476714d8e8SKurt Hackel 			bit = find_next_bit(nodemap, O2NM_MAX_NODES,
19486714d8e8SKurt Hackel 					    bit+1);
19496714d8e8SKurt Hackel 		       	if (bit >= O2NM_MAX_NODES)
19506714d8e8SKurt Hackel 				break;
19516714d8e8SKurt Hackel 			clear_bit(bit, nodemap);
19526714d8e8SKurt Hackel 		}
19536714d8e8SKurt Hackel 	}
19546714d8e8SKurt Hackel 
19556714d8e8SKurt Hackel 	/* this call now finishes out the nodemap
19566714d8e8SKurt Hackel 	 * even if one or more nodes die */
19576714d8e8SKurt Hackel 	mlog(0, "worker about to master %.*s here, this=%u\n",
19586714d8e8SKurt Hackel 		     res->lockname.len, res->lockname.name, dlm->node_num);
19596714d8e8SKurt Hackel 	ret = dlm_do_assert_master(dlm, res->lockname.name,
19606714d8e8SKurt Hackel 				   res->lockname.len,
19616714d8e8SKurt Hackel 				   nodemap, flags);
19626714d8e8SKurt Hackel 	if (ret < 0) {
19636714d8e8SKurt Hackel 		/* no need to restart, we are done */
19646714d8e8SKurt Hackel 		mlog_errno(ret);
19656714d8e8SKurt Hackel 	}
19666714d8e8SKurt Hackel 
19676714d8e8SKurt Hackel 	dlm_lockres_put(res);
19686714d8e8SKurt Hackel 
19696714d8e8SKurt Hackel 	mlog(0, "finished with dlm_assert_master_worker\n");
19706714d8e8SKurt Hackel }
19716714d8e8SKurt Hackel 
1972c03872f5SKurt Hackel /* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
1973c03872f5SKurt Hackel  * We cannot wait for node recovery to complete to begin mastering this
1974c03872f5SKurt Hackel  * lockres because this lockres is used to kick off recovery! ;-)
1975c03872f5SKurt Hackel  * So, do a pre-check on all living nodes to see if any of those nodes
1976c03872f5SKurt Hackel  * think that $RECOVERY is currently mastered by a dead node.  If so,
1977c03872f5SKurt Hackel  * we wait a short time to allow that node to get notified by its own
1978c03872f5SKurt Hackel  * heartbeat stack, then check again.  All $RECOVERY lock resources
1979c03872f5SKurt Hackel  * mastered by dead nodes are purged when the hearbeat callback is
1980c03872f5SKurt Hackel  * fired, so we can know for sure that it is safe to continue once
1981c03872f5SKurt Hackel  * the node returns a live node or no node.  */
1982c03872f5SKurt Hackel static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
1983c03872f5SKurt Hackel 				       struct dlm_lock_resource *res)
1984c03872f5SKurt Hackel {
1985c03872f5SKurt Hackel 	struct dlm_node_iter iter;
1986c03872f5SKurt Hackel 	int nodenum;
1987c03872f5SKurt Hackel 	int ret = 0;
1988c03872f5SKurt Hackel 	u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
1989c03872f5SKurt Hackel 
1990c03872f5SKurt Hackel 	spin_lock(&dlm->spinlock);
1991c03872f5SKurt Hackel 	dlm_node_iter_init(dlm->domain_map, &iter);
1992c03872f5SKurt Hackel 	spin_unlock(&dlm->spinlock);
1993c03872f5SKurt Hackel 
1994c03872f5SKurt Hackel 	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
1995c03872f5SKurt Hackel 		/* do not send to self */
1996c03872f5SKurt Hackel 		if (nodenum == dlm->node_num)
1997c03872f5SKurt Hackel 			continue;
1998c03872f5SKurt Hackel 		ret = dlm_do_master_requery(dlm, res, nodenum, &master);
1999c03872f5SKurt Hackel 		if (ret < 0) {
2000c03872f5SKurt Hackel 			mlog_errno(ret);
2001c03872f5SKurt Hackel 			if (!dlm_is_host_down(ret))
2002c03872f5SKurt Hackel 				BUG();
2003c03872f5SKurt Hackel 			/* host is down, so answer for that node would be
2004c03872f5SKurt Hackel 			 * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
2005c03872f5SKurt Hackel 		}
2006c03872f5SKurt Hackel 
2007c03872f5SKurt Hackel 		if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
2008c03872f5SKurt Hackel 			/* check to see if this master is in the recovery map */
2009c03872f5SKurt Hackel 			spin_lock(&dlm->spinlock);
2010c03872f5SKurt Hackel 			if (test_bit(master, dlm->recovery_map)) {
2011c03872f5SKurt Hackel 				mlog(ML_NOTICE, "%s: node %u has not seen "
2012c03872f5SKurt Hackel 				     "node %u go down yet, and thinks the "
2013c03872f5SKurt Hackel 				     "dead node is mastering the recovery "
2014c03872f5SKurt Hackel 				     "lock.  must wait.\n", dlm->name,
2015c03872f5SKurt Hackel 				     nodenum, master);
2016c03872f5SKurt Hackel 				ret = -EAGAIN;
2017c03872f5SKurt Hackel 			}
2018c03872f5SKurt Hackel 			spin_unlock(&dlm->spinlock);
2019c03872f5SKurt Hackel 			mlog(0, "%s: reco lock master is %u\n", dlm->name,
2020c03872f5SKurt Hackel 			     master);
2021c03872f5SKurt Hackel 			break;
2022c03872f5SKurt Hackel 		}
2023c03872f5SKurt Hackel 	}
2024c03872f5SKurt Hackel 	return ret;
2025c03872f5SKurt Hackel }
2026c03872f5SKurt Hackel 
20276714d8e8SKurt Hackel 
20286714d8e8SKurt Hackel /*
20296714d8e8SKurt Hackel  * DLM_MIGRATE_LOCKRES
20306714d8e8SKurt Hackel  */
20316714d8e8SKurt Hackel 
20326714d8e8SKurt Hackel 
20336714d8e8SKurt Hackel int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
20346714d8e8SKurt Hackel 			u8 target)
20356714d8e8SKurt Hackel {
20366714d8e8SKurt Hackel 	struct dlm_master_list_entry *mle = NULL;
20376714d8e8SKurt Hackel 	struct dlm_master_list_entry *oldmle = NULL;
20386714d8e8SKurt Hackel  	struct dlm_migratable_lockres *mres = NULL;
20396714d8e8SKurt Hackel 	int ret = -EINVAL;
20406714d8e8SKurt Hackel 	const char *name;
20416714d8e8SKurt Hackel 	unsigned int namelen;
20426714d8e8SKurt Hackel 	int mle_added = 0;
20436714d8e8SKurt Hackel 	struct list_head *queue, *iter;
20446714d8e8SKurt Hackel 	int i;
20456714d8e8SKurt Hackel 	struct dlm_lock *lock;
20466714d8e8SKurt Hackel 	int empty = 1;
20476714d8e8SKurt Hackel 
20486714d8e8SKurt Hackel 	if (!dlm_grab(dlm))
20496714d8e8SKurt Hackel 		return -EINVAL;
20506714d8e8SKurt Hackel 
20516714d8e8SKurt Hackel 	name = res->lockname.name;
20526714d8e8SKurt Hackel 	namelen = res->lockname.len;
20536714d8e8SKurt Hackel 
20546714d8e8SKurt Hackel 	mlog(0, "migrating %.*s to %u\n", namelen, name, target);
20556714d8e8SKurt Hackel 
20566714d8e8SKurt Hackel 	/*
20576714d8e8SKurt Hackel 	 * ensure this lockres is a proper candidate for migration
20586714d8e8SKurt Hackel 	 */
20596714d8e8SKurt Hackel 	spin_lock(&res->spinlock);
20606714d8e8SKurt Hackel 	if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
20616714d8e8SKurt Hackel 		mlog(0, "cannot migrate lockres with unknown owner!\n");
20626714d8e8SKurt Hackel 		spin_unlock(&res->spinlock);
20636714d8e8SKurt Hackel 		goto leave;
20646714d8e8SKurt Hackel 	}
20656714d8e8SKurt Hackel 	if (res->owner != dlm->node_num) {
20666714d8e8SKurt Hackel 		mlog(0, "cannot migrate lockres this node doesn't own!\n");
20676714d8e8SKurt Hackel 		spin_unlock(&res->spinlock);
20686714d8e8SKurt Hackel 		goto leave;
20696714d8e8SKurt Hackel 	}
20706714d8e8SKurt Hackel 	mlog(0, "checking queues...\n");
20716714d8e8SKurt Hackel 	queue = &res->granted;
20726714d8e8SKurt Hackel 	for (i=0; i<3; i++) {
20736714d8e8SKurt Hackel 		list_for_each(iter, queue) {
20746714d8e8SKurt Hackel 			lock = list_entry (iter, struct dlm_lock, list);
20756714d8e8SKurt Hackel 			empty = 0;
20766714d8e8SKurt Hackel 			if (lock->ml.node == dlm->node_num) {
20776714d8e8SKurt Hackel 				mlog(0, "found a lock owned by this node "
20786714d8e8SKurt Hackel 				     "still on the %s queue!  will not "
20796714d8e8SKurt Hackel 				     "migrate this lockres\n",
20806714d8e8SKurt Hackel 				     i==0 ? "granted" :
20816714d8e8SKurt Hackel 				     (i==1 ? "converting" : "blocked"));
20826714d8e8SKurt Hackel 				spin_unlock(&res->spinlock);
20836714d8e8SKurt Hackel 				ret = -ENOTEMPTY;
20846714d8e8SKurt Hackel 				goto leave;
20856714d8e8SKurt Hackel 			}
20866714d8e8SKurt Hackel 		}
20876714d8e8SKurt Hackel 		queue++;
20886714d8e8SKurt Hackel 	}
20896714d8e8SKurt Hackel 	mlog(0, "all locks on this lockres are nonlocal.  continuing\n");
20906714d8e8SKurt Hackel 	spin_unlock(&res->spinlock);
20916714d8e8SKurt Hackel 
20926714d8e8SKurt Hackel 	/* no work to do */
20936714d8e8SKurt Hackel 	if (empty) {
20946714d8e8SKurt Hackel 		mlog(0, "no locks were found on this lockres! done!\n");
20956714d8e8SKurt Hackel 		ret = 0;
20966714d8e8SKurt Hackel 		goto leave;
20976714d8e8SKurt Hackel 	}
20986714d8e8SKurt Hackel 
20996714d8e8SKurt Hackel 	/*
21006714d8e8SKurt Hackel 	 * preallocate up front
21016714d8e8SKurt Hackel 	 * if this fails, abort
21026714d8e8SKurt Hackel 	 */
21036714d8e8SKurt Hackel 
21046714d8e8SKurt Hackel 	ret = -ENOMEM;
21056714d8e8SKurt Hackel 	mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_KERNEL);
21066714d8e8SKurt Hackel 	if (!mres) {
21076714d8e8SKurt Hackel 		mlog_errno(ret);
21086714d8e8SKurt Hackel 		goto leave;
21096714d8e8SKurt Hackel 	}
21106714d8e8SKurt Hackel 
21116714d8e8SKurt Hackel 	mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
21126714d8e8SKurt Hackel 								GFP_KERNEL);
21136714d8e8SKurt Hackel 	if (!mle) {
21146714d8e8SKurt Hackel 		mlog_errno(ret);
21156714d8e8SKurt Hackel 		goto leave;
21166714d8e8SKurt Hackel 	}
21176714d8e8SKurt Hackel 	ret = 0;
21186714d8e8SKurt Hackel 
21196714d8e8SKurt Hackel 	/*
21206714d8e8SKurt Hackel 	 * find a node to migrate the lockres to
21216714d8e8SKurt Hackel 	 */
21226714d8e8SKurt Hackel 
21236714d8e8SKurt Hackel 	mlog(0, "picking a migration node\n");
21246714d8e8SKurt Hackel 	spin_lock(&dlm->spinlock);
21256714d8e8SKurt Hackel 	/* pick a new node */
21266714d8e8SKurt Hackel 	if (!test_bit(target, dlm->domain_map) ||
21276714d8e8SKurt Hackel 	    target >= O2NM_MAX_NODES) {
21286714d8e8SKurt Hackel 		target = dlm_pick_migration_target(dlm, res);
21296714d8e8SKurt Hackel 	}
21306714d8e8SKurt Hackel 	mlog(0, "node %u chosen for migration\n", target);
21316714d8e8SKurt Hackel 
21326714d8e8SKurt Hackel 	if (target >= O2NM_MAX_NODES ||
21336714d8e8SKurt Hackel 	    !test_bit(target, dlm->domain_map)) {
21346714d8e8SKurt Hackel 		/* target chosen is not alive */
21356714d8e8SKurt Hackel 		ret = -EINVAL;
21366714d8e8SKurt Hackel 	}
21376714d8e8SKurt Hackel 
21386714d8e8SKurt Hackel 	if (ret) {
21396714d8e8SKurt Hackel 		spin_unlock(&dlm->spinlock);
21406714d8e8SKurt Hackel 		goto fail;
21416714d8e8SKurt Hackel 	}
21426714d8e8SKurt Hackel 
21436714d8e8SKurt Hackel 	mlog(0, "continuing with target = %u\n", target);
21446714d8e8SKurt Hackel 
21456714d8e8SKurt Hackel 	/*
21466714d8e8SKurt Hackel 	 * clear any existing master requests and
21476714d8e8SKurt Hackel 	 * add the migration mle to the list
21486714d8e8SKurt Hackel 	 */
21496714d8e8SKurt Hackel 	spin_lock(&dlm->master_lock);
21506714d8e8SKurt Hackel 	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
21516714d8e8SKurt Hackel 				    namelen, target, dlm->node_num);
21526714d8e8SKurt Hackel 	spin_unlock(&dlm->master_lock);
21536714d8e8SKurt Hackel 	spin_unlock(&dlm->spinlock);
21546714d8e8SKurt Hackel 
21556714d8e8SKurt Hackel 	if (ret == -EEXIST) {
21566714d8e8SKurt Hackel 		mlog(0, "another process is already migrating it\n");
21576714d8e8SKurt Hackel 		goto fail;
21586714d8e8SKurt Hackel 	}
21596714d8e8SKurt Hackel 	mle_added = 1;
21606714d8e8SKurt Hackel 
21616714d8e8SKurt Hackel 	/*
21626714d8e8SKurt Hackel 	 * set the MIGRATING flag and flush asts
21636714d8e8SKurt Hackel 	 * if we fail after this we need to re-dirty the lockres
21646714d8e8SKurt Hackel 	 */
21656714d8e8SKurt Hackel 	if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
21666714d8e8SKurt Hackel 		mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
21676714d8e8SKurt Hackel 		     "the target went down.\n", res->lockname.len,
21686714d8e8SKurt Hackel 		     res->lockname.name, target);
21696714d8e8SKurt Hackel 		spin_lock(&res->spinlock);
21706714d8e8SKurt Hackel 		res->state &= ~DLM_LOCK_RES_MIGRATING;
21716714d8e8SKurt Hackel 		spin_unlock(&res->spinlock);
21726714d8e8SKurt Hackel 		ret = -EINVAL;
21736714d8e8SKurt Hackel 	}
21746714d8e8SKurt Hackel 
21756714d8e8SKurt Hackel fail:
21766714d8e8SKurt Hackel 	if (oldmle) {
21776714d8e8SKurt Hackel 		/* master is known, detach if not already detached */
21786714d8e8SKurt Hackel 		dlm_mle_detach_hb_events(dlm, oldmle);
21796714d8e8SKurt Hackel 		dlm_put_mle(oldmle);
21806714d8e8SKurt Hackel 	}
21816714d8e8SKurt Hackel 
21826714d8e8SKurt Hackel 	if (ret < 0) {
21836714d8e8SKurt Hackel 		if (mle_added) {
21846714d8e8SKurt Hackel 			dlm_mle_detach_hb_events(dlm, mle);
21856714d8e8SKurt Hackel 			dlm_put_mle(mle);
21866714d8e8SKurt Hackel 		} else if (mle) {
21876714d8e8SKurt Hackel 			kmem_cache_free(dlm_mle_cache, mle);
21886714d8e8SKurt Hackel 		}
21896714d8e8SKurt Hackel 		goto leave;
21906714d8e8SKurt Hackel 	}
21916714d8e8SKurt Hackel 
21926714d8e8SKurt Hackel 	/*
21936714d8e8SKurt Hackel 	 * at this point, we have a migration target, an mle
21946714d8e8SKurt Hackel 	 * in the master list, and the MIGRATING flag set on
21956714d8e8SKurt Hackel 	 * the lockres
21966714d8e8SKurt Hackel 	 */
21976714d8e8SKurt Hackel 
21986714d8e8SKurt Hackel 
21996714d8e8SKurt Hackel 	/* get an extra reference on the mle.
22006714d8e8SKurt Hackel 	 * otherwise the assert_master from the new
22016714d8e8SKurt Hackel 	 * master will destroy this.
22026714d8e8SKurt Hackel 	 * also, make sure that all callers of dlm_get_mle
22036714d8e8SKurt Hackel 	 * take both dlm->spinlock and dlm->master_lock */
22046714d8e8SKurt Hackel 	spin_lock(&dlm->spinlock);
22056714d8e8SKurt Hackel 	spin_lock(&dlm->master_lock);
2206a2bf0477SKurt Hackel 	dlm_get_mle_inuse(mle);
22076714d8e8SKurt Hackel 	spin_unlock(&dlm->master_lock);
22086714d8e8SKurt Hackel 	spin_unlock(&dlm->spinlock);
22096714d8e8SKurt Hackel 
22106714d8e8SKurt Hackel 	/* notify new node and send all lock state */
22116714d8e8SKurt Hackel 	/* call send_one_lockres with migration flag.
22126714d8e8SKurt Hackel 	 * this serves as notice to the target node that a
22136714d8e8SKurt Hackel 	 * migration is starting. */
22146714d8e8SKurt Hackel 	ret = dlm_send_one_lockres(dlm, res, mres, target,
22156714d8e8SKurt Hackel 				   DLM_MRES_MIGRATION);
22166714d8e8SKurt Hackel 
22176714d8e8SKurt Hackel 	if (ret < 0) {
22186714d8e8SKurt Hackel 		mlog(0, "migration to node %u failed with %d\n",
22196714d8e8SKurt Hackel 		     target, ret);
22206714d8e8SKurt Hackel 		/* migration failed, detach and clean up mle */
22216714d8e8SKurt Hackel 		dlm_mle_detach_hb_events(dlm, mle);
22226714d8e8SKurt Hackel 		dlm_put_mle(mle);
2223a2bf0477SKurt Hackel 		dlm_put_mle_inuse(mle);
2224a2bf0477SKurt Hackel 		spin_lock(&res->spinlock);
2225a2bf0477SKurt Hackel 		res->state &= ~DLM_LOCK_RES_MIGRATING;
2226a2bf0477SKurt Hackel 		spin_unlock(&res->spinlock);
22276714d8e8SKurt Hackel 		goto leave;
22286714d8e8SKurt Hackel 	}
22296714d8e8SKurt Hackel 
22306714d8e8SKurt Hackel 	/* at this point, the target sends a message to all nodes,
22316714d8e8SKurt Hackel 	 * (using dlm_do_migrate_request).  this node is skipped since
22326714d8e8SKurt Hackel 	 * we had to put an mle in the list to begin the process.  this
22336714d8e8SKurt Hackel 	 * node now waits for target to do an assert master.  this node
22346714d8e8SKurt Hackel 	 * will be the last one notified, ensuring that the migration
22356714d8e8SKurt Hackel 	 * is complete everywhere.  if the target dies while this is
22366714d8e8SKurt Hackel 	 * going on, some nodes could potentially see the target as the
22376714d8e8SKurt Hackel 	 * master, so it is important that my recovery finds the migration
22386714d8e8SKurt Hackel 	 * mle and sets the master to UNKNONWN. */
22396714d8e8SKurt Hackel 
22406714d8e8SKurt Hackel 
22416714d8e8SKurt Hackel 	/* wait for new node to assert master */
22426714d8e8SKurt Hackel 	while (1) {
22436714d8e8SKurt Hackel 		ret = wait_event_interruptible_timeout(mle->wq,
22446714d8e8SKurt Hackel 					(atomic_read(&mle->woken) == 1),
22456714d8e8SKurt Hackel 					msecs_to_jiffies(5000));
22466714d8e8SKurt Hackel 
22476714d8e8SKurt Hackel 		if (ret >= 0) {
22486714d8e8SKurt Hackel 		       	if (atomic_read(&mle->woken) == 1 ||
22496714d8e8SKurt Hackel 			    res->owner == target)
22506714d8e8SKurt Hackel 				break;
22516714d8e8SKurt Hackel 
22526714d8e8SKurt Hackel 			mlog(0, "timed out during migration\n");
2253e2faea4cSKurt Hackel 			/* avoid hang during shutdown when migrating lockres
2254e2faea4cSKurt Hackel 			 * to a node which also goes down */
2255e2faea4cSKurt Hackel 			if (dlm_is_node_dead(dlm, target)) {
2256e2faea4cSKurt Hackel 				mlog(0, "%s:%.*s: expected migration target %u "
2257e2faea4cSKurt Hackel 				     "is no longer up.  restarting.\n",
2258e2faea4cSKurt Hackel 				     dlm->name, res->lockname.len,
2259e2faea4cSKurt Hackel 				     res->lockname.name, target);
2260e2faea4cSKurt Hackel 				ret = -ERESTARTSYS;
2261e2faea4cSKurt Hackel 			}
22626714d8e8SKurt Hackel 		}
22636714d8e8SKurt Hackel 		if (ret == -ERESTARTSYS) {
22646714d8e8SKurt Hackel 			/* migration failed, detach and clean up mle */
22656714d8e8SKurt Hackel 			dlm_mle_detach_hb_events(dlm, mle);
22666714d8e8SKurt Hackel 			dlm_put_mle(mle);
2267a2bf0477SKurt Hackel 			dlm_put_mle_inuse(mle);
2268a2bf0477SKurt Hackel 			spin_lock(&res->spinlock);
2269a2bf0477SKurt Hackel 			res->state &= ~DLM_LOCK_RES_MIGRATING;
2270a2bf0477SKurt Hackel 			spin_unlock(&res->spinlock);
22716714d8e8SKurt Hackel 			goto leave;
22726714d8e8SKurt Hackel 		}
22736714d8e8SKurt Hackel 		/* TODO: if node died: stop, clean up, return error */
22746714d8e8SKurt Hackel 	}
22756714d8e8SKurt Hackel 
22766714d8e8SKurt Hackel 	/* all done, set the owner, clear the flag */
22776714d8e8SKurt Hackel 	spin_lock(&res->spinlock);
22786714d8e8SKurt Hackel 	dlm_set_lockres_owner(dlm, res, target);
22796714d8e8SKurt Hackel 	res->state &= ~DLM_LOCK_RES_MIGRATING;
22806714d8e8SKurt Hackel 	dlm_remove_nonlocal_locks(dlm, res);
22816714d8e8SKurt Hackel 	spin_unlock(&res->spinlock);
22826714d8e8SKurt Hackel 	wake_up(&res->wq);
22836714d8e8SKurt Hackel 
22846714d8e8SKurt Hackel 	/* master is known, detach if not already detached */
22856714d8e8SKurt Hackel 	dlm_mle_detach_hb_events(dlm, mle);
2286a2bf0477SKurt Hackel 	dlm_put_mle_inuse(mle);
22876714d8e8SKurt Hackel 	ret = 0;
22886714d8e8SKurt Hackel 
22896714d8e8SKurt Hackel 	dlm_lockres_calc_usage(dlm, res);
22906714d8e8SKurt Hackel 
22916714d8e8SKurt Hackel leave:
22926714d8e8SKurt Hackel 	/* re-dirty the lockres if we failed */
22936714d8e8SKurt Hackel 	if (ret < 0)
22946714d8e8SKurt Hackel 		dlm_kick_thread(dlm, res);
22956714d8e8SKurt Hackel 
22966714d8e8SKurt Hackel 	/* TODO: cleanup */
22976714d8e8SKurt Hackel 	if (mres)
22986714d8e8SKurt Hackel 		free_page((unsigned long)mres);
22996714d8e8SKurt Hackel 
23006714d8e8SKurt Hackel 	dlm_put(dlm);
23016714d8e8SKurt Hackel 
23026714d8e8SKurt Hackel 	mlog(0, "returning %d\n", ret);
23036714d8e8SKurt Hackel 	return ret;
23046714d8e8SKurt Hackel }
23056714d8e8SKurt Hackel EXPORT_SYMBOL_GPL(dlm_migrate_lockres);
23066714d8e8SKurt Hackel 
23076714d8e8SKurt Hackel int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
23086714d8e8SKurt Hackel {
23096714d8e8SKurt Hackel 	int ret;
23106714d8e8SKurt Hackel 	spin_lock(&dlm->ast_lock);
23116714d8e8SKurt Hackel 	spin_lock(&lock->spinlock);
23126714d8e8SKurt Hackel 	ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
23136714d8e8SKurt Hackel 	spin_unlock(&lock->spinlock);
23146714d8e8SKurt Hackel 	spin_unlock(&dlm->ast_lock);
23156714d8e8SKurt Hackel 	return ret;
23166714d8e8SKurt Hackel }
23176714d8e8SKurt Hackel 
23186714d8e8SKurt Hackel static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
23196714d8e8SKurt Hackel 				     struct dlm_lock_resource *res,
23206714d8e8SKurt Hackel 				     u8 mig_target)
23216714d8e8SKurt Hackel {
23226714d8e8SKurt Hackel 	int can_proceed;
23236714d8e8SKurt Hackel 	spin_lock(&res->spinlock);
23246714d8e8SKurt Hackel 	can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
23256714d8e8SKurt Hackel 	spin_unlock(&res->spinlock);
23266714d8e8SKurt Hackel 
23276714d8e8SKurt Hackel 	/* target has died, so make the caller break out of the
23286714d8e8SKurt Hackel 	 * wait_event, but caller must recheck the domain_map */
23296714d8e8SKurt Hackel 	spin_lock(&dlm->spinlock);
23306714d8e8SKurt Hackel 	if (!test_bit(mig_target, dlm->domain_map))
23316714d8e8SKurt Hackel 		can_proceed = 1;
23326714d8e8SKurt Hackel 	spin_unlock(&dlm->spinlock);
23336714d8e8SKurt Hackel 	return can_proceed;
23346714d8e8SKurt Hackel }
23356714d8e8SKurt Hackel 
23366714d8e8SKurt Hackel int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
23376714d8e8SKurt Hackel {
23386714d8e8SKurt Hackel 	int ret;
23396714d8e8SKurt Hackel 	spin_lock(&res->spinlock);
23406714d8e8SKurt Hackel 	ret = !!(res->state & DLM_LOCK_RES_DIRTY);
23416714d8e8SKurt Hackel 	spin_unlock(&res->spinlock);
23426714d8e8SKurt Hackel 	return ret;
23436714d8e8SKurt Hackel }
23446714d8e8SKurt Hackel 
23456714d8e8SKurt Hackel 
23466714d8e8SKurt Hackel static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
23476714d8e8SKurt Hackel 				       struct dlm_lock_resource *res,
23486714d8e8SKurt Hackel 				       u8 target)
23496714d8e8SKurt Hackel {
23506714d8e8SKurt Hackel 	int ret = 0;
23516714d8e8SKurt Hackel 
23526714d8e8SKurt Hackel 	mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
23536714d8e8SKurt Hackel 	       res->lockname.len, res->lockname.name, dlm->node_num,
23546714d8e8SKurt Hackel 	       target);
23556714d8e8SKurt Hackel 	/* need to set MIGRATING flag on lockres.  this is done by
23566714d8e8SKurt Hackel 	 * ensuring that all asts have been flushed for this lockres. */
23576714d8e8SKurt Hackel 	spin_lock(&res->spinlock);
23586714d8e8SKurt Hackel 	BUG_ON(res->migration_pending);
23596714d8e8SKurt Hackel 	res->migration_pending = 1;
23606714d8e8SKurt Hackel 	/* strategy is to reserve an extra ast then release
23616714d8e8SKurt Hackel 	 * it below, letting the release do all of the work */
23626714d8e8SKurt Hackel 	__dlm_lockres_reserve_ast(res);
23636714d8e8SKurt Hackel 	spin_unlock(&res->spinlock);
23646714d8e8SKurt Hackel 
23656714d8e8SKurt Hackel 	/* now flush all the pending asts.. hang out for a bit */
23666714d8e8SKurt Hackel 	dlm_kick_thread(dlm, res);
23676714d8e8SKurt Hackel 	wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
23686714d8e8SKurt Hackel 	dlm_lockres_release_ast(dlm, res);
23696714d8e8SKurt Hackel 
23706714d8e8SKurt Hackel 	mlog(0, "about to wait on migration_wq, dirty=%s\n",
23716714d8e8SKurt Hackel 	       res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
23726714d8e8SKurt Hackel 	/* if the extra ref we just put was the final one, this
23736714d8e8SKurt Hackel 	 * will pass thru immediately.  otherwise, we need to wait
23746714d8e8SKurt Hackel 	 * for the last ast to finish. */
23756714d8e8SKurt Hackel again:
23766714d8e8SKurt Hackel 	ret = wait_event_interruptible_timeout(dlm->migration_wq,
23776714d8e8SKurt Hackel 		   dlm_migration_can_proceed(dlm, res, target),
23786714d8e8SKurt Hackel 		   msecs_to_jiffies(1000));
23796714d8e8SKurt Hackel 	if (ret < 0) {
23806714d8e8SKurt Hackel 		mlog(0, "woken again: migrating? %s, dead? %s\n",
23816714d8e8SKurt Hackel 		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
23826714d8e8SKurt Hackel 		       test_bit(target, dlm->domain_map) ? "no":"yes");
23836714d8e8SKurt Hackel 	} else {
23846714d8e8SKurt Hackel 		mlog(0, "all is well: migrating? %s, dead? %s\n",
23856714d8e8SKurt Hackel 		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
23866714d8e8SKurt Hackel 		       test_bit(target, dlm->domain_map) ? "no":"yes");
23876714d8e8SKurt Hackel 	}
23886714d8e8SKurt Hackel 	if (!dlm_migration_can_proceed(dlm, res, target)) {
23896714d8e8SKurt Hackel 		mlog(0, "trying again...\n");
23906714d8e8SKurt Hackel 		goto again;
23916714d8e8SKurt Hackel 	}
23926714d8e8SKurt Hackel 
23936714d8e8SKurt Hackel 	/* did the target go down or die? */
23946714d8e8SKurt Hackel 	spin_lock(&dlm->spinlock);
23956714d8e8SKurt Hackel 	if (!test_bit(target, dlm->domain_map)) {
23966714d8e8SKurt Hackel 		mlog(ML_ERROR, "aha. migration target %u just went down\n",
23976714d8e8SKurt Hackel 		     target);
23986714d8e8SKurt Hackel 		ret = -EHOSTDOWN;
23996714d8e8SKurt Hackel 	}
24006714d8e8SKurt Hackel 	spin_unlock(&dlm->spinlock);
24016714d8e8SKurt Hackel 
24026714d8e8SKurt Hackel 	/*
24036714d8e8SKurt Hackel 	 * at this point:
24046714d8e8SKurt Hackel 	 *
24056714d8e8SKurt Hackel 	 *   o the DLM_LOCK_RES_MIGRATING flag is set
24066714d8e8SKurt Hackel 	 *   o there are no pending asts on this lockres
24076714d8e8SKurt Hackel 	 *   o all processes trying to reserve an ast on this
24086714d8e8SKurt Hackel 	 *     lockres must wait for the MIGRATING flag to clear
24096714d8e8SKurt Hackel 	 */
24106714d8e8SKurt Hackel 	return ret;
24116714d8e8SKurt Hackel }
24126714d8e8SKurt Hackel 
24136714d8e8SKurt Hackel /* last step in the migration process.
24146714d8e8SKurt Hackel  * original master calls this to free all of the dlm_lock
24156714d8e8SKurt Hackel  * structures that used to be for other nodes. */
24166714d8e8SKurt Hackel static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
24176714d8e8SKurt Hackel 				      struct dlm_lock_resource *res)
24186714d8e8SKurt Hackel {
24196714d8e8SKurt Hackel 	struct list_head *iter, *iter2;
24206714d8e8SKurt Hackel 	struct list_head *queue = &res->granted;
24216714d8e8SKurt Hackel 	int i;
24226714d8e8SKurt Hackel 	struct dlm_lock *lock;
24236714d8e8SKurt Hackel 
24246714d8e8SKurt Hackel 	assert_spin_locked(&res->spinlock);
24256714d8e8SKurt Hackel 
24266714d8e8SKurt Hackel 	BUG_ON(res->owner == dlm->node_num);
24276714d8e8SKurt Hackel 
24286714d8e8SKurt Hackel 	for (i=0; i<3; i++) {
24296714d8e8SKurt Hackel 		list_for_each_safe(iter, iter2, queue) {
24306714d8e8SKurt Hackel 			lock = list_entry (iter, struct dlm_lock, list);
24316714d8e8SKurt Hackel 			if (lock->ml.node != dlm->node_num) {
24326714d8e8SKurt Hackel 				mlog(0, "putting lock for node %u\n",
24336714d8e8SKurt Hackel 				     lock->ml.node);
24346714d8e8SKurt Hackel 				/* be extra careful */
24356714d8e8SKurt Hackel 				BUG_ON(!list_empty(&lock->ast_list));
24366714d8e8SKurt Hackel 				BUG_ON(!list_empty(&lock->bast_list));
24376714d8e8SKurt Hackel 				BUG_ON(lock->ast_pending);
24386714d8e8SKurt Hackel 				BUG_ON(lock->bast_pending);
24396714d8e8SKurt Hackel 				list_del_init(&lock->list);
24406714d8e8SKurt Hackel 				dlm_lock_put(lock);
24416714d8e8SKurt Hackel 			}
24426714d8e8SKurt Hackel 		}
24436714d8e8SKurt Hackel 		queue++;
24446714d8e8SKurt Hackel 	}
24456714d8e8SKurt Hackel }
24466714d8e8SKurt Hackel 
24476714d8e8SKurt Hackel /* for now this is not too intelligent.  we will
24486714d8e8SKurt Hackel  * need stats to make this do the right thing.
24496714d8e8SKurt Hackel  * this just finds the first lock on one of the
24506714d8e8SKurt Hackel  * queues and uses that node as the target. */
24516714d8e8SKurt Hackel static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
24526714d8e8SKurt Hackel 				    struct dlm_lock_resource *res)
24536714d8e8SKurt Hackel {
24546714d8e8SKurt Hackel 	int i;
24556714d8e8SKurt Hackel 	struct list_head *queue = &res->granted;
24566714d8e8SKurt Hackel 	struct list_head *iter;
24576714d8e8SKurt Hackel 	struct dlm_lock *lock;
24586714d8e8SKurt Hackel 	int nodenum;
24596714d8e8SKurt Hackel 
24606714d8e8SKurt Hackel 	assert_spin_locked(&dlm->spinlock);
24616714d8e8SKurt Hackel 
24626714d8e8SKurt Hackel 	spin_lock(&res->spinlock);
24636714d8e8SKurt Hackel 	for (i=0; i<3; i++) {
24646714d8e8SKurt Hackel 		list_for_each(iter, queue) {
24656714d8e8SKurt Hackel 			/* up to the caller to make sure this node
24666714d8e8SKurt Hackel 			 * is alive */
24676714d8e8SKurt Hackel 			lock = list_entry (iter, struct dlm_lock, list);
24686714d8e8SKurt Hackel 			if (lock->ml.node != dlm->node_num) {
24696714d8e8SKurt Hackel 				spin_unlock(&res->spinlock);
24706714d8e8SKurt Hackel 				return lock->ml.node;
24716714d8e8SKurt Hackel 			}
24726714d8e8SKurt Hackel 		}
24736714d8e8SKurt Hackel 		queue++;
24746714d8e8SKurt Hackel 	}
24756714d8e8SKurt Hackel 	spin_unlock(&res->spinlock);
24766714d8e8SKurt Hackel 	mlog(0, "have not found a suitable target yet! checking domain map\n");
24776714d8e8SKurt Hackel 
24786714d8e8SKurt Hackel 	/* ok now we're getting desperate.  pick anyone alive. */
24796714d8e8SKurt Hackel 	nodenum = -1;
24806714d8e8SKurt Hackel 	while (1) {
24816714d8e8SKurt Hackel 		nodenum = find_next_bit(dlm->domain_map,
24826714d8e8SKurt Hackel 					O2NM_MAX_NODES, nodenum+1);
24836714d8e8SKurt Hackel 		mlog(0, "found %d in domain map\n", nodenum);
24846714d8e8SKurt Hackel 		if (nodenum >= O2NM_MAX_NODES)
24856714d8e8SKurt Hackel 			break;
24866714d8e8SKurt Hackel 		if (nodenum != dlm->node_num) {
24876714d8e8SKurt Hackel 			mlog(0, "picking %d\n", nodenum);
24886714d8e8SKurt Hackel 			return nodenum;
24896714d8e8SKurt Hackel 		}
24906714d8e8SKurt Hackel 	}
24916714d8e8SKurt Hackel 
24926714d8e8SKurt Hackel 	mlog(0, "giving up.  no master to migrate to\n");
24936714d8e8SKurt Hackel 	return DLM_LOCK_RES_OWNER_UNKNOWN;
24946714d8e8SKurt Hackel }
24956714d8e8SKurt Hackel 
24966714d8e8SKurt Hackel 
24976714d8e8SKurt Hackel 
24986714d8e8SKurt Hackel /* this is called by the new master once all lockres
24996714d8e8SKurt Hackel  * data has been received */
25006714d8e8SKurt Hackel static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
25016714d8e8SKurt Hackel 				  struct dlm_lock_resource *res,
25026714d8e8SKurt Hackel 				  u8 master, u8 new_master,
25036714d8e8SKurt Hackel 				  struct dlm_node_iter *iter)
25046714d8e8SKurt Hackel {
25056714d8e8SKurt Hackel 	struct dlm_migrate_request migrate;
25066714d8e8SKurt Hackel 	int ret, status = 0;
25076714d8e8SKurt Hackel 	int nodenum;
25086714d8e8SKurt Hackel 
25096714d8e8SKurt Hackel 	memset(&migrate, 0, sizeof(migrate));
25106714d8e8SKurt Hackel 	migrate.namelen = res->lockname.len;
25116714d8e8SKurt Hackel 	memcpy(migrate.name, res->lockname.name, migrate.namelen);
25126714d8e8SKurt Hackel 	migrate.new_master = new_master;
25136714d8e8SKurt Hackel 	migrate.master = master;
25146714d8e8SKurt Hackel 
25156714d8e8SKurt Hackel 	ret = 0;
25166714d8e8SKurt Hackel 
25176714d8e8SKurt Hackel 	/* send message to all nodes, except the master and myself */
25186714d8e8SKurt Hackel 	while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
25196714d8e8SKurt Hackel 		if (nodenum == master ||
25206714d8e8SKurt Hackel 		    nodenum == new_master)
25216714d8e8SKurt Hackel 			continue;
25226714d8e8SKurt Hackel 
25236714d8e8SKurt Hackel 		ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
25246714d8e8SKurt Hackel 					 &migrate, sizeof(migrate), nodenum,
25256714d8e8SKurt Hackel 					 &status);
25266714d8e8SKurt Hackel 		if (ret < 0)
25276714d8e8SKurt Hackel 			mlog_errno(ret);
25286714d8e8SKurt Hackel 		else if (status < 0) {
25296714d8e8SKurt Hackel 			mlog(0, "migrate request (node %u) returned %d!\n",
25306714d8e8SKurt Hackel 			     nodenum, status);
25316714d8e8SKurt Hackel 			ret = status;
25326714d8e8SKurt Hackel 		}
25336714d8e8SKurt Hackel 	}
25346714d8e8SKurt Hackel 
25356714d8e8SKurt Hackel 	if (ret < 0)
25366714d8e8SKurt Hackel 		mlog_errno(ret);
25376714d8e8SKurt Hackel 
25386714d8e8SKurt Hackel 	mlog(0, "returning ret=%d\n", ret);
25396714d8e8SKurt Hackel 	return ret;
25406714d8e8SKurt Hackel }
25416714d8e8SKurt Hackel 
25426714d8e8SKurt Hackel 
25436714d8e8SKurt Hackel /* if there is an existing mle for this lockres, we now know who the master is.
25446714d8e8SKurt Hackel  * (the one who sent us *this* message) we can clear it up right away.
25456714d8e8SKurt Hackel  * since the process that put the mle on the list still has a reference to it,
25466714d8e8SKurt Hackel  * we can unhash it now, set the master and wake the process.  as a result,
25476714d8e8SKurt Hackel  * we will have no mle in the list to start with.  now we can add an mle for
25486714d8e8SKurt Hackel  * the migration and this should be the only one found for those scanning the
25496714d8e8SKurt Hackel  * list.  */
25506714d8e8SKurt Hackel int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
25516714d8e8SKurt Hackel {
25526714d8e8SKurt Hackel 	struct dlm_ctxt *dlm = data;
25536714d8e8SKurt Hackel 	struct dlm_lock_resource *res = NULL;
25546714d8e8SKurt Hackel 	struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
25556714d8e8SKurt Hackel 	struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
25566714d8e8SKurt Hackel 	const char *name;
2557a3d33291SMark Fasheh 	unsigned int namelen, hash;
25586714d8e8SKurt Hackel 	int ret = 0;
25596714d8e8SKurt Hackel 
25606714d8e8SKurt Hackel 	if (!dlm_grab(dlm))
25616714d8e8SKurt Hackel 		return -EINVAL;
25626714d8e8SKurt Hackel 
25636714d8e8SKurt Hackel 	name = migrate->name;
25646714d8e8SKurt Hackel 	namelen = migrate->namelen;
2565a3d33291SMark Fasheh 	hash = dlm_lockid_hash(name, namelen);
25666714d8e8SKurt Hackel 
25676714d8e8SKurt Hackel 	/* preallocate.. if this fails, abort */
25686714d8e8SKurt Hackel 	mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
25696714d8e8SKurt Hackel 							 GFP_KERNEL);
25706714d8e8SKurt Hackel 
25716714d8e8SKurt Hackel 	if (!mle) {
25726714d8e8SKurt Hackel 		ret = -ENOMEM;
25736714d8e8SKurt Hackel 		goto leave;
25746714d8e8SKurt Hackel 	}
25756714d8e8SKurt Hackel 
25766714d8e8SKurt Hackel 	/* check for pre-existing lock */
25776714d8e8SKurt Hackel 	spin_lock(&dlm->spinlock);
2578a3d33291SMark Fasheh 	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
25796714d8e8SKurt Hackel 	spin_lock(&dlm->master_lock);
25806714d8e8SKurt Hackel 
25816714d8e8SKurt Hackel 	if (res) {
25826714d8e8SKurt Hackel 		spin_lock(&res->spinlock);
25836714d8e8SKurt Hackel 		if (res->state & DLM_LOCK_RES_RECOVERING) {
25846714d8e8SKurt Hackel 			/* if all is working ok, this can only mean that we got
25856714d8e8SKurt Hackel 		 	* a migrate request from a node that we now see as
25866714d8e8SKurt Hackel 		 	* dead.  what can we do here?  drop it to the floor? */
25876714d8e8SKurt Hackel 			spin_unlock(&res->spinlock);
25886714d8e8SKurt Hackel 			mlog(ML_ERROR, "Got a migrate request, but the "
25896714d8e8SKurt Hackel 			     "lockres is marked as recovering!");
25906714d8e8SKurt Hackel 			kmem_cache_free(dlm_mle_cache, mle);
25916714d8e8SKurt Hackel 			ret = -EINVAL; /* need a better solution */
25926714d8e8SKurt Hackel 			goto unlock;
25936714d8e8SKurt Hackel 		}
25946714d8e8SKurt Hackel 		res->state |= DLM_LOCK_RES_MIGRATING;
25956714d8e8SKurt Hackel 		spin_unlock(&res->spinlock);
25966714d8e8SKurt Hackel 	}
25976714d8e8SKurt Hackel 
25986714d8e8SKurt Hackel 	/* ignore status.  only nonzero status would BUG. */
25996714d8e8SKurt Hackel 	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
26006714d8e8SKurt Hackel 				    name, namelen,
26016714d8e8SKurt Hackel 				    migrate->new_master,
26026714d8e8SKurt Hackel 				    migrate->master);
26036714d8e8SKurt Hackel 
26046714d8e8SKurt Hackel unlock:
26056714d8e8SKurt Hackel 	spin_unlock(&dlm->master_lock);
26066714d8e8SKurt Hackel 	spin_unlock(&dlm->spinlock);
26076714d8e8SKurt Hackel 
26086714d8e8SKurt Hackel 	if (oldmle) {
26096714d8e8SKurt Hackel 		/* master is known, detach if not already detached */
26106714d8e8SKurt Hackel 		dlm_mle_detach_hb_events(dlm, oldmle);
26116714d8e8SKurt Hackel 		dlm_put_mle(oldmle);
26126714d8e8SKurt Hackel 	}
26136714d8e8SKurt Hackel 
26146714d8e8SKurt Hackel 	if (res)
26156714d8e8SKurt Hackel 		dlm_lockres_put(res);
26166714d8e8SKurt Hackel leave:
26176714d8e8SKurt Hackel 	dlm_put(dlm);
26186714d8e8SKurt Hackel 	return ret;
26196714d8e8SKurt Hackel }
26206714d8e8SKurt Hackel 
26216714d8e8SKurt Hackel /* must be holding dlm->spinlock and dlm->master_lock
26226714d8e8SKurt Hackel  * when adding a migration mle, we can clear any other mles
26236714d8e8SKurt Hackel  * in the master list because we know with certainty that
26246714d8e8SKurt Hackel  * the master is "master".  so we remove any old mle from
26256714d8e8SKurt Hackel  * the list after setting it's master field, and then add
26266714d8e8SKurt Hackel  * the new migration mle.  this way we can hold with the rule
26276714d8e8SKurt Hackel  * of having only one mle for a given lock name at all times. */
26286714d8e8SKurt Hackel static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
26296714d8e8SKurt Hackel 				 struct dlm_lock_resource *res,
26306714d8e8SKurt Hackel 				 struct dlm_master_list_entry *mle,
26316714d8e8SKurt Hackel 				 struct dlm_master_list_entry **oldmle,
26326714d8e8SKurt Hackel 				 const char *name, unsigned int namelen,
26336714d8e8SKurt Hackel 				 u8 new_master, u8 master)
26346714d8e8SKurt Hackel {
26356714d8e8SKurt Hackel 	int found;
26366714d8e8SKurt Hackel 	int ret = 0;
26376714d8e8SKurt Hackel 
26386714d8e8SKurt Hackel 	*oldmle = NULL;
26396714d8e8SKurt Hackel 
26406714d8e8SKurt Hackel 	mlog_entry_void();
26416714d8e8SKurt Hackel 
26426714d8e8SKurt Hackel 	assert_spin_locked(&dlm->spinlock);
26436714d8e8SKurt Hackel 	assert_spin_locked(&dlm->master_lock);
26446714d8e8SKurt Hackel 
26456714d8e8SKurt Hackel 	/* caller is responsible for any ref taken here on oldmle */
26466714d8e8SKurt Hackel 	found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
26476714d8e8SKurt Hackel 	if (found) {
26486714d8e8SKurt Hackel 		struct dlm_master_list_entry *tmp = *oldmle;
26496714d8e8SKurt Hackel 		spin_lock(&tmp->spinlock);
26506714d8e8SKurt Hackel 		if (tmp->type == DLM_MLE_MIGRATION) {
26516714d8e8SKurt Hackel 			if (master == dlm->node_num) {
26526714d8e8SKurt Hackel 				/* ah another process raced me to it */
26536714d8e8SKurt Hackel 				mlog(0, "tried to migrate %.*s, but some "
26546714d8e8SKurt Hackel 				     "process beat me to it\n",
26556714d8e8SKurt Hackel 				     namelen, name);
26566714d8e8SKurt Hackel 				ret = -EEXIST;
26576714d8e8SKurt Hackel 			} else {
26586714d8e8SKurt Hackel 				/* bad.  2 NODES are trying to migrate! */
26596714d8e8SKurt Hackel 				mlog(ML_ERROR, "migration error  mle: "
26606714d8e8SKurt Hackel 				     "master=%u new_master=%u // request: "
26616714d8e8SKurt Hackel 				     "master=%u new_master=%u // "
26626714d8e8SKurt Hackel 				     "lockres=%.*s\n",
26636714d8e8SKurt Hackel 				     tmp->master, tmp->new_master,
26646714d8e8SKurt Hackel 				     master, new_master,
26656714d8e8SKurt Hackel 				     namelen, name);
26666714d8e8SKurt Hackel 				BUG();
26676714d8e8SKurt Hackel 			}
26686714d8e8SKurt Hackel 		} else {
26696714d8e8SKurt Hackel 			/* this is essentially what assert_master does */
26706714d8e8SKurt Hackel 			tmp->master = master;
26716714d8e8SKurt Hackel 			atomic_set(&tmp->woken, 1);
26726714d8e8SKurt Hackel 			wake_up(&tmp->wq);
26736714d8e8SKurt Hackel 			/* remove it from the list so that only one
26746714d8e8SKurt Hackel 			 * mle will be found */
26756714d8e8SKurt Hackel 			list_del_init(&tmp->list);
26766714d8e8SKurt Hackel 		}
26776714d8e8SKurt Hackel 		spin_unlock(&tmp->spinlock);
26786714d8e8SKurt Hackel 	}
26796714d8e8SKurt Hackel 
26806714d8e8SKurt Hackel 	/* now add a migration mle to the tail of the list */
26816714d8e8SKurt Hackel 	dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
26826714d8e8SKurt Hackel 	mle->new_master = new_master;
26836714d8e8SKurt Hackel 	mle->master = master;
26846714d8e8SKurt Hackel 	/* do this for consistency with other mle types */
26856714d8e8SKurt Hackel 	set_bit(new_master, mle->maybe_map);
26866714d8e8SKurt Hackel 	list_add(&mle->list, &dlm->master_list);
26876714d8e8SKurt Hackel 
26886714d8e8SKurt Hackel 	return ret;
26896714d8e8SKurt Hackel }
26906714d8e8SKurt Hackel 
26916714d8e8SKurt Hackel 
26926714d8e8SKurt Hackel void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
26936714d8e8SKurt Hackel {
26946714d8e8SKurt Hackel 	struct list_head *iter, *iter2;
26956714d8e8SKurt Hackel 	struct dlm_master_list_entry *mle;
26966714d8e8SKurt Hackel 	struct dlm_lock_resource *res;
2697a3d33291SMark Fasheh 	unsigned int hash;
26986714d8e8SKurt Hackel 
26996714d8e8SKurt Hackel 	mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
27006714d8e8SKurt Hackel top:
27016714d8e8SKurt Hackel 	assert_spin_locked(&dlm->spinlock);
27026714d8e8SKurt Hackel 
27036714d8e8SKurt Hackel 	/* clean the master list */
27046714d8e8SKurt Hackel 	spin_lock(&dlm->master_lock);
27056714d8e8SKurt Hackel 	list_for_each_safe(iter, iter2, &dlm->master_list) {
27066714d8e8SKurt Hackel 		mle = list_entry(iter, struct dlm_master_list_entry, list);
27076714d8e8SKurt Hackel 
27086714d8e8SKurt Hackel 		BUG_ON(mle->type != DLM_MLE_BLOCK &&
27096714d8e8SKurt Hackel 		       mle->type != DLM_MLE_MASTER &&
27106714d8e8SKurt Hackel 		       mle->type != DLM_MLE_MIGRATION);
27116714d8e8SKurt Hackel 
27126714d8e8SKurt Hackel 		/* MASTER mles are initiated locally.  the waiting
27136714d8e8SKurt Hackel 		 * process will notice the node map change
27146714d8e8SKurt Hackel 		 * shortly.  let that happen as normal. */
27156714d8e8SKurt Hackel 		if (mle->type == DLM_MLE_MASTER)
27166714d8e8SKurt Hackel 			continue;
27176714d8e8SKurt Hackel 
27186714d8e8SKurt Hackel 
27196714d8e8SKurt Hackel 		/* BLOCK mles are initiated by other nodes.
27206714d8e8SKurt Hackel 		 * need to clean up if the dead node would have
27216714d8e8SKurt Hackel 		 * been the master. */
27226714d8e8SKurt Hackel 		if (mle->type == DLM_MLE_BLOCK) {
27236714d8e8SKurt Hackel 			int bit;
27246714d8e8SKurt Hackel 
27256714d8e8SKurt Hackel 			spin_lock(&mle->spinlock);
27266714d8e8SKurt Hackel 			bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
27276714d8e8SKurt Hackel 			if (bit != dead_node) {
27286714d8e8SKurt Hackel 				mlog(0, "mle found, but dead node %u would "
27296714d8e8SKurt Hackel 				     "not have been master\n", dead_node);
27306714d8e8SKurt Hackel 				spin_unlock(&mle->spinlock);
27316714d8e8SKurt Hackel 			} else {
27326714d8e8SKurt Hackel 				/* must drop the refcount by one since the
27336714d8e8SKurt Hackel 				 * assert_master will never arrive.  this
27346714d8e8SKurt Hackel 				 * may result in the mle being unlinked and
27356714d8e8SKurt Hackel 				 * freed, but there may still be a process
27366714d8e8SKurt Hackel 				 * waiting in the dlmlock path which is fine. */
27376714d8e8SKurt Hackel 				mlog(ML_ERROR, "node %u was expected master\n",
27386714d8e8SKurt Hackel 				     dead_node);
27396714d8e8SKurt Hackel 				atomic_set(&mle->woken, 1);
27406714d8e8SKurt Hackel 				spin_unlock(&mle->spinlock);
27416714d8e8SKurt Hackel 				wake_up(&mle->wq);
2742f671c09bSKurt Hackel 				/* do not need events any longer, so detach
2743f671c09bSKurt Hackel 				 * from heartbeat */
2744f671c09bSKurt Hackel 				__dlm_mle_detach_hb_events(dlm, mle);
27456714d8e8SKurt Hackel 				__dlm_put_mle(mle);
27466714d8e8SKurt Hackel 			}
27476714d8e8SKurt Hackel 			continue;
27486714d8e8SKurt Hackel 		}
27496714d8e8SKurt Hackel 
27506714d8e8SKurt Hackel 		/* everything else is a MIGRATION mle */
27516714d8e8SKurt Hackel 
27526714d8e8SKurt Hackel 		/* the rule for MIGRATION mles is that the master
27536714d8e8SKurt Hackel 		 * becomes UNKNOWN if *either* the original or
27546714d8e8SKurt Hackel 		 * the new master dies.  all UNKNOWN lockreses
27556714d8e8SKurt Hackel 		 * are sent to whichever node becomes the recovery
27566714d8e8SKurt Hackel 		 * master.  the new master is responsible for
27576714d8e8SKurt Hackel 		 * determining if there is still a master for
27586714d8e8SKurt Hackel 		 * this lockres, or if he needs to take over
27596714d8e8SKurt Hackel 		 * mastery.  either way, this node should expect
27606714d8e8SKurt Hackel 		 * another message to resolve this. */
27616714d8e8SKurt Hackel 		if (mle->master != dead_node &&
27626714d8e8SKurt Hackel 		    mle->new_master != dead_node)
27636714d8e8SKurt Hackel 			continue;
27646714d8e8SKurt Hackel 
27656714d8e8SKurt Hackel 		/* if we have reached this point, this mle needs to
27666714d8e8SKurt Hackel 		 * be removed from the list and freed. */
27676714d8e8SKurt Hackel 
27686714d8e8SKurt Hackel 		/* remove from the list early.  NOTE: unlinking
27696714d8e8SKurt Hackel 		 * list_head while in list_for_each_safe */
27706714d8e8SKurt Hackel 		spin_lock(&mle->spinlock);
27716714d8e8SKurt Hackel 		list_del_init(&mle->list);
27726714d8e8SKurt Hackel 		atomic_set(&mle->woken, 1);
27736714d8e8SKurt Hackel 		spin_unlock(&mle->spinlock);
27746714d8e8SKurt Hackel 		wake_up(&mle->wq);
27756714d8e8SKurt Hackel 
27766714d8e8SKurt Hackel 		mlog(0, "node %u died during migration from "
27776714d8e8SKurt Hackel 		     "%u to %u!\n", dead_node,
27786714d8e8SKurt Hackel 		     mle->master, mle->new_master);
27796714d8e8SKurt Hackel 		/* if there is a lockres associated with this
27806714d8e8SKurt Hackel 	 	 * mle, find it and set its owner to UNKNOWN */
2781a3d33291SMark Fasheh 		hash = dlm_lockid_hash(mle->u.name.name, mle->u.name.len);
27826714d8e8SKurt Hackel 		res = __dlm_lookup_lockres(dlm, mle->u.name.name,
2783a3d33291SMark Fasheh 					   mle->u.name.len, hash);
27846714d8e8SKurt Hackel 		if (res) {
27856714d8e8SKurt Hackel 			/* unfortunately if we hit this rare case, our
27866714d8e8SKurt Hackel 		 	 * lock ordering is messed.  we need to drop
27876714d8e8SKurt Hackel 		 	 * the master lock so that we can take the
27886714d8e8SKurt Hackel 		  	 * lockres lock, meaning that we will have to
27896714d8e8SKurt Hackel 			 * restart from the head of list. */
27906714d8e8SKurt Hackel 			spin_unlock(&dlm->master_lock);
27916714d8e8SKurt Hackel 
27926714d8e8SKurt Hackel 			/* move lockres onto recovery list */
27936714d8e8SKurt Hackel 			spin_lock(&res->spinlock);
27946714d8e8SKurt Hackel 			dlm_set_lockres_owner(dlm, res,
27956714d8e8SKurt Hackel 				      	DLM_LOCK_RES_OWNER_UNKNOWN);
27966714d8e8SKurt Hackel 			dlm_move_lockres_to_recovery_list(dlm, res);
27976714d8e8SKurt Hackel 			spin_unlock(&res->spinlock);
27986714d8e8SKurt Hackel 			dlm_lockres_put(res);
27996714d8e8SKurt Hackel 
2800f671c09bSKurt Hackel 			/* about to get rid of mle, detach from heartbeat */
2801f671c09bSKurt Hackel 			__dlm_mle_detach_hb_events(dlm, mle);
2802f671c09bSKurt Hackel 
28036714d8e8SKurt Hackel 			/* dump the mle */
28046714d8e8SKurt Hackel 			spin_lock(&dlm->master_lock);
28056714d8e8SKurt Hackel 			__dlm_put_mle(mle);
28066714d8e8SKurt Hackel 			spin_unlock(&dlm->master_lock);
28076714d8e8SKurt Hackel 
28086714d8e8SKurt Hackel 			/* restart */
28096714d8e8SKurt Hackel 			goto top;
28106714d8e8SKurt Hackel 		}
28116714d8e8SKurt Hackel 
28126714d8e8SKurt Hackel 		/* this may be the last reference */
28136714d8e8SKurt Hackel 		__dlm_put_mle(mle);
28146714d8e8SKurt Hackel 	}
28156714d8e8SKurt Hackel 	spin_unlock(&dlm->master_lock);
28166714d8e8SKurt Hackel }
28176714d8e8SKurt Hackel 
28186714d8e8SKurt Hackel 
28196714d8e8SKurt Hackel int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
28206714d8e8SKurt Hackel 			 u8 old_master)
28216714d8e8SKurt Hackel {
28226714d8e8SKurt Hackel 	struct dlm_node_iter iter;
28236714d8e8SKurt Hackel 	int ret = 0;
28246714d8e8SKurt Hackel 
28256714d8e8SKurt Hackel 	spin_lock(&dlm->spinlock);
28266714d8e8SKurt Hackel 	dlm_node_iter_init(dlm->domain_map, &iter);
28276714d8e8SKurt Hackel 	clear_bit(old_master, iter.node_map);
28286714d8e8SKurt Hackel 	clear_bit(dlm->node_num, iter.node_map);
28296714d8e8SKurt Hackel 	spin_unlock(&dlm->spinlock);
28306714d8e8SKurt Hackel 
28316714d8e8SKurt Hackel 	mlog(0, "now time to do a migrate request to other nodes\n");
28326714d8e8SKurt Hackel 	ret = dlm_do_migrate_request(dlm, res, old_master,
28336714d8e8SKurt Hackel 				     dlm->node_num, &iter);
28346714d8e8SKurt Hackel 	if (ret < 0) {
28356714d8e8SKurt Hackel 		mlog_errno(ret);
28366714d8e8SKurt Hackel 		goto leave;
28376714d8e8SKurt Hackel 	}
28386714d8e8SKurt Hackel 
28396714d8e8SKurt Hackel 	mlog(0, "doing assert master of %.*s to all except the original node\n",
28406714d8e8SKurt Hackel 	     res->lockname.len, res->lockname.name);
28416714d8e8SKurt Hackel 	/* this call now finishes out the nodemap
28426714d8e8SKurt Hackel 	 * even if one or more nodes die */
28436714d8e8SKurt Hackel 	ret = dlm_do_assert_master(dlm, res->lockname.name,
28446714d8e8SKurt Hackel 				   res->lockname.len, iter.node_map,
28456714d8e8SKurt Hackel 				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
28466714d8e8SKurt Hackel 	if (ret < 0) {
28476714d8e8SKurt Hackel 		/* no longer need to retry.  all living nodes contacted. */
28486714d8e8SKurt Hackel 		mlog_errno(ret);
28496714d8e8SKurt Hackel 		ret = 0;
28506714d8e8SKurt Hackel 	}
28516714d8e8SKurt Hackel 
28526714d8e8SKurt Hackel 	memset(iter.node_map, 0, sizeof(iter.node_map));
28536714d8e8SKurt Hackel 	set_bit(old_master, iter.node_map);
28546714d8e8SKurt Hackel 	mlog(0, "doing assert master of %.*s back to %u\n",
28556714d8e8SKurt Hackel 	     res->lockname.len, res->lockname.name, old_master);
28566714d8e8SKurt Hackel 	ret = dlm_do_assert_master(dlm, res->lockname.name,
28576714d8e8SKurt Hackel 				   res->lockname.len, iter.node_map,
28586714d8e8SKurt Hackel 				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
28596714d8e8SKurt Hackel 	if (ret < 0) {
28606714d8e8SKurt Hackel 		mlog(0, "assert master to original master failed "
28616714d8e8SKurt Hackel 		     "with %d.\n", ret);
28626714d8e8SKurt Hackel 		/* the only nonzero status here would be because of
28636714d8e8SKurt Hackel 		 * a dead original node.  we're done. */
28646714d8e8SKurt Hackel 		ret = 0;
28656714d8e8SKurt Hackel 	}
28666714d8e8SKurt Hackel 
28676714d8e8SKurt Hackel 	/* all done, set the owner, clear the flag */
28686714d8e8SKurt Hackel 	spin_lock(&res->spinlock);
28696714d8e8SKurt Hackel 	dlm_set_lockres_owner(dlm, res, dlm->node_num);
28706714d8e8SKurt Hackel 	res->state &= ~DLM_LOCK_RES_MIGRATING;
28716714d8e8SKurt Hackel 	spin_unlock(&res->spinlock);
28726714d8e8SKurt Hackel 	/* re-dirty it on the new master */
28736714d8e8SKurt Hackel 	dlm_kick_thread(dlm, res);
28746714d8e8SKurt Hackel 	wake_up(&res->wq);
28756714d8e8SKurt Hackel leave:
28766714d8e8SKurt Hackel 	return ret;
28776714d8e8SKurt Hackel }
28786714d8e8SKurt Hackel 
28796714d8e8SKurt Hackel /*
28806714d8e8SKurt Hackel  * LOCKRES AST REFCOUNT
28816714d8e8SKurt Hackel  * this is integral to migration
28826714d8e8SKurt Hackel  */
28836714d8e8SKurt Hackel 
28846714d8e8SKurt Hackel /* for future intent to call an ast, reserve one ahead of time.
28856714d8e8SKurt Hackel  * this should be called only after waiting on the lockres
28866714d8e8SKurt Hackel  * with dlm_wait_on_lockres, and while still holding the
28876714d8e8SKurt Hackel  * spinlock after the call. */
28886714d8e8SKurt Hackel void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
28896714d8e8SKurt Hackel {
28906714d8e8SKurt Hackel 	assert_spin_locked(&res->spinlock);
28916714d8e8SKurt Hackel 	if (res->state & DLM_LOCK_RES_MIGRATING) {
28926714d8e8SKurt Hackel 		__dlm_print_one_lock_resource(res);
28936714d8e8SKurt Hackel 	}
28946714d8e8SKurt Hackel 	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
28956714d8e8SKurt Hackel 
28966714d8e8SKurt Hackel 	atomic_inc(&res->asts_reserved);
28976714d8e8SKurt Hackel }
28986714d8e8SKurt Hackel 
28996714d8e8SKurt Hackel /*
29006714d8e8SKurt Hackel  * used to drop the reserved ast, either because it went unused,
29016714d8e8SKurt Hackel  * or because the ast/bast was actually called.
29026714d8e8SKurt Hackel  *
29036714d8e8SKurt Hackel  * also, if there is a pending migration on this lockres,
29046714d8e8SKurt Hackel  * and this was the last pending ast on the lockres,
29056714d8e8SKurt Hackel  * atomically set the MIGRATING flag before we drop the lock.
29066714d8e8SKurt Hackel  * this is how we ensure that migration can proceed with no
29076714d8e8SKurt Hackel  * asts in progress.  note that it is ok if the state of the
29086714d8e8SKurt Hackel  * queues is such that a lock should be granted in the future
29096714d8e8SKurt Hackel  * or that a bast should be fired, because the new master will
29106714d8e8SKurt Hackel  * shuffle the lists on this lockres as soon as it is migrated.
29116714d8e8SKurt Hackel  */
29126714d8e8SKurt Hackel void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
29136714d8e8SKurt Hackel 			     struct dlm_lock_resource *res)
29146714d8e8SKurt Hackel {
29156714d8e8SKurt Hackel 	if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
29166714d8e8SKurt Hackel 		return;
29176714d8e8SKurt Hackel 
29186714d8e8SKurt Hackel 	if (!res->migration_pending) {
29196714d8e8SKurt Hackel 		spin_unlock(&res->spinlock);
29206714d8e8SKurt Hackel 		return;
29216714d8e8SKurt Hackel 	}
29226714d8e8SKurt Hackel 
29236714d8e8SKurt Hackel 	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
29246714d8e8SKurt Hackel 	res->migration_pending = 0;
29256714d8e8SKurt Hackel 	res->state |= DLM_LOCK_RES_MIGRATING;
29266714d8e8SKurt Hackel 	spin_unlock(&res->spinlock);
29276714d8e8SKurt Hackel 	wake_up(&res->wq);
29286714d8e8SKurt Hackel 	wake_up(&dlm->migration_wq);
29296714d8e8SKurt Hackel }
2930