xref: /openbmc/linux/fs/ocfs2/dlm/dlmconvert.c (revision 8be98d2f2a0a262f8bf8a0bc1fdf522b3c7aab17)
1328970deSThomas Gleixner // SPDX-License-Identifier: GPL-2.0-or-later
2*fa60ce2cSMasahiro Yamada /*
36714d8e8SKurt Hackel  * dlmconvert.c
46714d8e8SKurt Hackel  *
56714d8e8SKurt Hackel  * underlying calls for lock conversion
66714d8e8SKurt Hackel  *
76714d8e8SKurt Hackel  * Copyright (C) 2004 Oracle.  All rights reserved.
86714d8e8SKurt Hackel  */
96714d8e8SKurt Hackel 
106714d8e8SKurt Hackel 
116714d8e8SKurt Hackel #include <linux/module.h>
126714d8e8SKurt Hackel #include <linux/fs.h>
136714d8e8SKurt Hackel #include <linux/types.h>
146714d8e8SKurt Hackel #include <linux/highmem.h>
156714d8e8SKurt Hackel #include <linux/init.h>
166714d8e8SKurt Hackel #include <linux/sysctl.h>
176714d8e8SKurt Hackel #include <linux/random.h>
186714d8e8SKurt Hackel #include <linux/blkdev.h>
196714d8e8SKurt Hackel #include <linux/socket.h>
206714d8e8SKurt Hackel #include <linux/inet.h>
216714d8e8SKurt Hackel #include <linux/spinlock.h>
226714d8e8SKurt Hackel 
236714d8e8SKurt Hackel 
24ca322fb6SMasahiro Yamada #include "../cluster/heartbeat.h"
25ca322fb6SMasahiro Yamada #include "../cluster/nodemanager.h"
26ca322fb6SMasahiro Yamada #include "../cluster/tcp.h"
276714d8e8SKurt Hackel 
286714d8e8SKurt Hackel #include "dlmapi.h"
296714d8e8SKurt Hackel #include "dlmcommon.h"
306714d8e8SKurt Hackel 
316714d8e8SKurt Hackel #include "dlmconvert.h"
326714d8e8SKurt Hackel 
336714d8e8SKurt Hackel #define MLOG_MASK_PREFIX ML_DLM
34ca322fb6SMasahiro Yamada #include "../cluster/masklog.h"
356714d8e8SKurt Hackel 
366714d8e8SKurt Hackel /* NOTE: __dlmconvert_master is the only function in here that
376714d8e8SKurt Hackel  * needs a spinlock held on entry (res->spinlock) and it is the
386714d8e8SKurt Hackel  * only one that holds a lock on exit (res->spinlock).
396714d8e8SKurt Hackel  * All other functions in here need no locks and drop all of
406714d8e8SKurt Hackel  * the locks that they acquire. */
416714d8e8SKurt Hackel static enum dlm_status __dlmconvert_master(struct dlm_ctxt *dlm,
426714d8e8SKurt Hackel 					   struct dlm_lock_resource *res,
436714d8e8SKurt Hackel 					   struct dlm_lock *lock, int flags,
446714d8e8SKurt Hackel 					   int type, int *call_ast,
456714d8e8SKurt Hackel 					   int *kick_thread);
466714d8e8SKurt Hackel static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm,
476714d8e8SKurt Hackel 					   struct dlm_lock_resource *res,
486714d8e8SKurt Hackel 					   struct dlm_lock *lock, int flags, int type);
496714d8e8SKurt Hackel 
506714d8e8SKurt Hackel /*
516714d8e8SKurt Hackel  * this is only called directly by dlmlock(), and only when the
526714d8e8SKurt Hackel  * local node is the owner of the lockres
536714d8e8SKurt Hackel  * locking:
546714d8e8SKurt Hackel  *   caller needs:  none
556714d8e8SKurt Hackel  *   taken:         takes and drops res->spinlock
566714d8e8SKurt Hackel  *   held on exit:  none
576714d8e8SKurt Hackel  * returns: see __dlmconvert_master
586714d8e8SKurt Hackel  */
dlmconvert_master(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,struct dlm_lock * lock,int flags,int type)596714d8e8SKurt Hackel enum dlm_status dlmconvert_master(struct dlm_ctxt *dlm,
606714d8e8SKurt Hackel 				  struct dlm_lock_resource *res,
616714d8e8SKurt Hackel 				  struct dlm_lock *lock, int flags, int type)
626714d8e8SKurt Hackel {
636714d8e8SKurt Hackel 	int call_ast = 0, kick_thread = 0;
646714d8e8SKurt Hackel 	enum dlm_status status;
656714d8e8SKurt Hackel 
666714d8e8SKurt Hackel 	spin_lock(&res->spinlock);
676714d8e8SKurt Hackel 	/* we are not in a network handler, this is fine */
686714d8e8SKurt Hackel 	__dlm_wait_on_lockres(res);
696714d8e8SKurt Hackel 	__dlm_lockres_reserve_ast(res);
706714d8e8SKurt Hackel 	res->state |= DLM_LOCK_RES_IN_PROGRESS;
716714d8e8SKurt Hackel 
726714d8e8SKurt Hackel 	status = __dlmconvert_master(dlm, res, lock, flags, type,
736714d8e8SKurt Hackel 				     &call_ast, &kick_thread);
746714d8e8SKurt Hackel 
756714d8e8SKurt Hackel 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
766714d8e8SKurt Hackel 	spin_unlock(&res->spinlock);
776714d8e8SKurt Hackel 	wake_up(&res->wq);
786714d8e8SKurt Hackel 	if (status != DLM_NORMAL && status != DLM_NOTQUEUED)
796714d8e8SKurt Hackel 		dlm_error(status);
806714d8e8SKurt Hackel 
816714d8e8SKurt Hackel 	/* either queue the ast or release it */
826714d8e8SKurt Hackel 	if (call_ast)
836714d8e8SKurt Hackel 		dlm_queue_ast(dlm, lock);
846714d8e8SKurt Hackel 	else
856714d8e8SKurt Hackel 		dlm_lockres_release_ast(dlm, res);
866714d8e8SKurt Hackel 
876714d8e8SKurt Hackel 	if (kick_thread)
886714d8e8SKurt Hackel 		dlm_kick_thread(dlm, res);
896714d8e8SKurt Hackel 
906714d8e8SKurt Hackel 	return status;
916714d8e8SKurt Hackel }
926714d8e8SKurt Hackel 
936714d8e8SKurt Hackel /* performs lock conversion at the lockres master site
946714d8e8SKurt Hackel  * locking:
956714d8e8SKurt Hackel  *   caller needs:  res->spinlock
966714d8e8SKurt Hackel  *   taken:         takes and drops lock->spinlock
976714d8e8SKurt Hackel  *   held on exit:  res->spinlock
986714d8e8SKurt Hackel  * returns: DLM_NORMAL, DLM_NOTQUEUED, DLM_DENIED
996714d8e8SKurt Hackel  *   call_ast: whether ast should be called for this lock
1006714d8e8SKurt Hackel  *   kick_thread: whether dlm_kick_thread should be called
1016714d8e8SKurt Hackel  */
__dlmconvert_master(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,struct dlm_lock * lock,int flags,int type,int * call_ast,int * kick_thread)1026714d8e8SKurt Hackel static enum dlm_status __dlmconvert_master(struct dlm_ctxt *dlm,
1036714d8e8SKurt Hackel 					   struct dlm_lock_resource *res,
1046714d8e8SKurt Hackel 					   struct dlm_lock *lock, int flags,
1056714d8e8SKurt Hackel 					   int type, int *call_ast,
1066714d8e8SKurt Hackel 					   int *kick_thread)
1076714d8e8SKurt Hackel {
1086714d8e8SKurt Hackel 	enum dlm_status status = DLM_NORMAL;
1096714d8e8SKurt Hackel 	struct dlm_lock *tmplock=NULL;
1106714d8e8SKurt Hackel 
1116714d8e8SKurt Hackel 	assert_spin_locked(&res->spinlock);
1126714d8e8SKurt Hackel 
113ef6b689bSTao Ma 	mlog(0, "type=%d, convert_type=%d, new convert_type=%d\n",
1146714d8e8SKurt Hackel 	     lock->ml.type, lock->ml.convert_type, type);
1156714d8e8SKurt Hackel 
1166714d8e8SKurt Hackel 	spin_lock(&lock->spinlock);
1176714d8e8SKurt Hackel 
1186714d8e8SKurt Hackel 	/* already converting? */
1196714d8e8SKurt Hackel 	if (lock->ml.convert_type != LKM_IVMODE) {
1206714d8e8SKurt Hackel 		mlog(ML_ERROR, "attempted to convert a lock with a lock "
1216714d8e8SKurt Hackel 		     "conversion pending\n");
1226714d8e8SKurt Hackel 		status = DLM_DENIED;
1236714d8e8SKurt Hackel 		goto unlock_exit;
1246714d8e8SKurt Hackel 	}
1256714d8e8SKurt Hackel 
1266714d8e8SKurt Hackel 	/* must be on grant queue to convert */
1276714d8e8SKurt Hackel 	if (!dlm_lock_on_list(&res->granted, lock)) {
1286714d8e8SKurt Hackel 		mlog(ML_ERROR, "attempted to convert a lock not on grant "
1296714d8e8SKurt Hackel 		     "queue\n");
1306714d8e8SKurt Hackel 		status = DLM_DENIED;
1316714d8e8SKurt Hackel 		goto unlock_exit;
1326714d8e8SKurt Hackel 	}
1336714d8e8SKurt Hackel 
1346714d8e8SKurt Hackel 	if (flags & LKM_VALBLK) {
1356714d8e8SKurt Hackel 		switch (lock->ml.type) {
1366714d8e8SKurt Hackel 			case LKM_EXMODE:
1376714d8e8SKurt Hackel 				/* EX + LKM_VALBLK + convert == set lvb */
1386714d8e8SKurt Hackel 				mlog(0, "will set lvb: converting %s->%s\n",
1396714d8e8SKurt Hackel 				     dlm_lock_mode_name(lock->ml.type),
1406714d8e8SKurt Hackel 				     dlm_lock_mode_name(type));
1416714d8e8SKurt Hackel 				lock->lksb->flags |= DLM_LKSB_PUT_LVB;
1426714d8e8SKurt Hackel 				break;
1436714d8e8SKurt Hackel 			case LKM_PRMODE:
1446714d8e8SKurt Hackel 			case LKM_NLMODE:
1456714d8e8SKurt Hackel 				/* refetch if new level is not NL */
1466714d8e8SKurt Hackel 				if (type > LKM_NLMODE) {
1476714d8e8SKurt Hackel 					mlog(0, "will fetch new value into "
1486714d8e8SKurt Hackel 					     "lvb: converting %s->%s\n",
1496714d8e8SKurt Hackel 					     dlm_lock_mode_name(lock->ml.type),
1506714d8e8SKurt Hackel 					     dlm_lock_mode_name(type));
1516714d8e8SKurt Hackel 					lock->lksb->flags |= DLM_LKSB_GET_LVB;
1526714d8e8SKurt Hackel 				} else {
1536714d8e8SKurt Hackel 					mlog(0, "will NOT fetch new value "
1546714d8e8SKurt Hackel 					     "into lvb: converting %s->%s\n",
1556714d8e8SKurt Hackel 					     dlm_lock_mode_name(lock->ml.type),
1566714d8e8SKurt Hackel 					     dlm_lock_mode_name(type));
1576714d8e8SKurt Hackel 					flags &= ~(LKM_VALBLK);
1586714d8e8SKurt Hackel 				}
1596714d8e8SKurt Hackel 				break;
1606714d8e8SKurt Hackel 		}
1616714d8e8SKurt Hackel 	}
1626714d8e8SKurt Hackel 
1636714d8e8SKurt Hackel 
1646714d8e8SKurt Hackel 	/* in-place downconvert? */
1656714d8e8SKurt Hackel 	if (type <= lock->ml.type)
1666714d8e8SKurt Hackel 		goto grant;
1676714d8e8SKurt Hackel 
1686714d8e8SKurt Hackel 	/* upconvert from here on */
1696714d8e8SKurt Hackel 	status = DLM_NORMAL;
170df53cd3bSDong Fang 	list_for_each_entry(tmplock, &res->granted, list) {
1716714d8e8SKurt Hackel 		if (tmplock == lock)
1726714d8e8SKurt Hackel 			continue;
1736714d8e8SKurt Hackel 		if (!dlm_lock_compatible(tmplock->ml.type, type))
1746714d8e8SKurt Hackel 			goto switch_queues;
1756714d8e8SKurt Hackel 	}
1766714d8e8SKurt Hackel 
177df53cd3bSDong Fang 	list_for_each_entry(tmplock, &res->converting, list) {
1786714d8e8SKurt Hackel 		if (!dlm_lock_compatible(tmplock->ml.type, type))
1796714d8e8SKurt Hackel 			goto switch_queues;
1806714d8e8SKurt Hackel 		/* existing conversion requests take precedence */
1816714d8e8SKurt Hackel 		if (!dlm_lock_compatible(tmplock->ml.convert_type, type))
1826714d8e8SKurt Hackel 			goto switch_queues;
1836714d8e8SKurt Hackel 	}
1846714d8e8SKurt Hackel 
1856714d8e8SKurt Hackel 	/* fall thru to grant */
1866714d8e8SKurt Hackel 
1876714d8e8SKurt Hackel grant:
1886714d8e8SKurt Hackel 	mlog(0, "res %.*s, granting %s lock\n", res->lockname.len,
1896714d8e8SKurt Hackel 	     res->lockname.name, dlm_lock_mode_name(type));
1906714d8e8SKurt Hackel 	/* immediately grant the new lock type */
1916714d8e8SKurt Hackel 	lock->lksb->status = DLM_NORMAL;
1926714d8e8SKurt Hackel 	if (lock->ml.node == dlm->node_num)
1936714d8e8SKurt Hackel 		mlog(0, "doing in-place convert for nonlocal lock\n");
1946714d8e8SKurt Hackel 	lock->ml.type = type;
195c0a8520cSMark Fasheh 	if (lock->lksb->flags & DLM_LKSB_PUT_LVB)
196c0a8520cSMark Fasheh 		memcpy(res->lvb, lock->lksb->lvb, DLM_LVB_LEN);
197c0a8520cSMark Fasheh 
198e5054c9aSxuejiufei 	/*
199e5054c9aSxuejiufei 	 * Move the lock to the tail because it may be the only lock which has
200e5054c9aSxuejiufei 	 * an invalid lvb.
201e5054c9aSxuejiufei 	 */
202e5054c9aSxuejiufei 	list_move_tail(&lock->list, &res->granted);
203e5054c9aSxuejiufei 
2046714d8e8SKurt Hackel 	status = DLM_NORMAL;
2056714d8e8SKurt Hackel 	*call_ast = 1;
2066714d8e8SKurt Hackel 	goto unlock_exit;
2076714d8e8SKurt Hackel 
2086714d8e8SKurt Hackel switch_queues:
2096714d8e8SKurt Hackel 	if (flags & LKM_NOQUEUE) {
2106714d8e8SKurt Hackel 		mlog(0, "failed to convert NOQUEUE lock %.*s from "
2116714d8e8SKurt Hackel 		     "%d to %d...\n", res->lockname.len, res->lockname.name,
2126714d8e8SKurt Hackel 		     lock->ml.type, type);
2136714d8e8SKurt Hackel 		status = DLM_NOTQUEUED;
2146714d8e8SKurt Hackel 		goto unlock_exit;
2156714d8e8SKurt Hackel 	}
2166714d8e8SKurt Hackel 	mlog(0, "res %.*s, queueing...\n", res->lockname.len,
2176714d8e8SKurt Hackel 	     res->lockname.name);
2186714d8e8SKurt Hackel 
2196714d8e8SKurt Hackel 	lock->ml.convert_type = type;
2206714d8e8SKurt Hackel 	/* do not alter lock refcount.  switching lists. */
221f116629dSAkinobu Mita 	list_move_tail(&lock->list, &res->converting);
2226714d8e8SKurt Hackel 
2236714d8e8SKurt Hackel unlock_exit:
2246714d8e8SKurt Hackel 	spin_unlock(&lock->spinlock);
2256714d8e8SKurt Hackel 	if (status == DLM_DENIED) {
2266714d8e8SKurt Hackel 		__dlm_print_one_lock_resource(res);
2276714d8e8SKurt Hackel 	}
2286714d8e8SKurt Hackel 	if (status == DLM_NORMAL)
2296714d8e8SKurt Hackel 		*kick_thread = 1;
2306714d8e8SKurt Hackel 	return status;
2316714d8e8SKurt Hackel }
2326714d8e8SKurt Hackel 
dlm_revert_pending_convert(struct dlm_lock_resource * res,struct dlm_lock * lock)2336714d8e8SKurt Hackel void dlm_revert_pending_convert(struct dlm_lock_resource *res,
2346714d8e8SKurt Hackel 				struct dlm_lock *lock)
2356714d8e8SKurt Hackel {
2366714d8e8SKurt Hackel 	/* do not alter lock refcount.  switching lists. */
237f116629dSAkinobu Mita 	list_move_tail(&lock->list, &res->granted);
2386714d8e8SKurt Hackel 	lock->ml.convert_type = LKM_IVMODE;
2396714d8e8SKurt Hackel 	lock->lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
2406714d8e8SKurt Hackel }
2416714d8e8SKurt Hackel 
2426714d8e8SKurt Hackel /* messages the master site to do lock conversion
2436714d8e8SKurt Hackel  * locking:
2446714d8e8SKurt Hackel  *   caller needs:  none
2456714d8e8SKurt Hackel  *   taken:         takes and drops res->spinlock, uses DLM_LOCK_RES_IN_PROGRESS
2466714d8e8SKurt Hackel  *   held on exit:  none
2476714d8e8SKurt Hackel  * returns: DLM_NORMAL, DLM_RECOVERING, status from remote node
2486714d8e8SKurt Hackel  */
dlmconvert_remote(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,struct dlm_lock * lock,int flags,int type)2496714d8e8SKurt Hackel enum dlm_status dlmconvert_remote(struct dlm_ctxt *dlm,
2506714d8e8SKurt Hackel 				  struct dlm_lock_resource *res,
2516714d8e8SKurt Hackel 				  struct dlm_lock *lock, int flags, int type)
2526714d8e8SKurt Hackel {
2536714d8e8SKurt Hackel 	enum dlm_status status;
2546714d8e8SKurt Hackel 
2556714d8e8SKurt Hackel 	mlog(0, "type=%d, convert_type=%d, busy=%d\n", lock->ml.type,
2566714d8e8SKurt Hackel 	     lock->ml.convert_type, res->state & DLM_LOCK_RES_IN_PROGRESS);
2576714d8e8SKurt Hackel 
2586714d8e8SKurt Hackel 	spin_lock(&res->spinlock);
2596714d8e8SKurt Hackel 	if (res->state & DLM_LOCK_RES_RECOVERING) {
2606714d8e8SKurt Hackel 		mlog(0, "bailing out early since res is RECOVERING "
2616714d8e8SKurt Hackel 		     "on secondary queue\n");
2626714d8e8SKurt Hackel 		/* __dlm_print_one_lock_resource(res); */
2636714d8e8SKurt Hackel 		status = DLM_RECOVERING;
2646714d8e8SKurt Hackel 		goto bail;
2656714d8e8SKurt Hackel 	}
2666714d8e8SKurt Hackel 	/* will exit this call with spinlock held */
2676714d8e8SKurt Hackel 	__dlm_wait_on_lockres(res);
2686714d8e8SKurt Hackel 
2696714d8e8SKurt Hackel 	if (lock->ml.convert_type != LKM_IVMODE) {
2706714d8e8SKurt Hackel 		__dlm_print_one_lock_resource(res);
2716714d8e8SKurt Hackel 		mlog(ML_ERROR, "converting a remote lock that is already "
27229004858SKurt Hackel 		     "converting! (cookie=%u:%llu, conv=%d)\n",
27374aa2585SKurt Hackel 		     dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
27474aa2585SKurt Hackel 		     dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
27529004858SKurt Hackel 		     lock->ml.convert_type);
2766714d8e8SKurt Hackel 		status = DLM_DENIED;
2776714d8e8SKurt Hackel 		goto bail;
2786714d8e8SKurt Hackel 	}
279be12b299SJoseph Qi 
280be12b299SJoseph Qi 	if (lock->ml.type == type && lock->ml.convert_type == LKM_IVMODE) {
281be12b299SJoseph Qi 		mlog(0, "last convert request returned DLM_RECOVERING, but "
282be12b299SJoseph Qi 		     "owner has already queued and sent ast to me. res %.*s, "
283be12b299SJoseph Qi 		     "(cookie=%u:%llu, type=%d, conv=%d)\n",
284be12b299SJoseph Qi 		     res->lockname.len, res->lockname.name,
285be12b299SJoseph Qi 		     dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
286be12b299SJoseph Qi 		     dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
287be12b299SJoseph Qi 		     lock->ml.type, lock->ml.convert_type);
288be12b299SJoseph Qi 		status = DLM_NORMAL;
289be12b299SJoseph Qi 		goto bail;
290be12b299SJoseph Qi 	}
291be12b299SJoseph Qi 
2926714d8e8SKurt Hackel 	res->state |= DLM_LOCK_RES_IN_PROGRESS;
2936714d8e8SKurt Hackel 	/* move lock to local convert queue */
2946714d8e8SKurt Hackel 	/* do not alter lock refcount.  switching lists. */
295f116629dSAkinobu Mita 	list_move_tail(&lock->list, &res->converting);
2966714d8e8SKurt Hackel 	lock->convert_pending = 1;
2976714d8e8SKurt Hackel 	lock->ml.convert_type = type;
2986714d8e8SKurt Hackel 
2996714d8e8SKurt Hackel 	if (flags & LKM_VALBLK) {
3006714d8e8SKurt Hackel 		if (lock->ml.type == LKM_EXMODE) {
3016714d8e8SKurt Hackel 			flags |= LKM_PUT_LVB;
3026714d8e8SKurt Hackel 			lock->lksb->flags |= DLM_LKSB_PUT_LVB;
3036714d8e8SKurt Hackel 		} else {
3046714d8e8SKurt Hackel 			if (lock->ml.convert_type == LKM_NLMODE)
3056714d8e8SKurt Hackel 				flags &= ~LKM_VALBLK;
3066714d8e8SKurt Hackel 			else {
3076714d8e8SKurt Hackel 				flags |= LKM_GET_LVB;
3086714d8e8SKurt Hackel 				lock->lksb->flags |= DLM_LKSB_GET_LVB;
3096714d8e8SKurt Hackel 			}
3106714d8e8SKurt Hackel 		}
3116714d8e8SKurt Hackel 	}
3126714d8e8SKurt Hackel 	spin_unlock(&res->spinlock);
3136714d8e8SKurt Hackel 
3146714d8e8SKurt Hackel 	/* no locks held here.
3156714d8e8SKurt Hackel 	 * need to wait for a reply as to whether it got queued or not. */
3166714d8e8SKurt Hackel 	status = dlm_send_remote_convert_request(dlm, res, lock, flags, type);
3176714d8e8SKurt Hackel 
3186714d8e8SKurt Hackel 	spin_lock(&res->spinlock);
3196714d8e8SKurt Hackel 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
320ac7cf246SJoseph Qi 	/* if it failed, move it back to granted queue.
321ac7cf246SJoseph Qi 	 * if master returns DLM_NORMAL and then down before sending ast,
322ac7cf246SJoseph Qi 	 * it may have already been moved to granted queue, reset to
323ac7cf246SJoseph Qi 	 * DLM_RECOVERING and retry convert */
3246714d8e8SKurt Hackel 	if (status != DLM_NORMAL) {
3256714d8e8SKurt Hackel 		if (status != DLM_NOTQUEUED)
3266714d8e8SKurt Hackel 			dlm_error(status);
3276714d8e8SKurt Hackel 		dlm_revert_pending_convert(res, lock);
328e6f0c6e6SJoseph Qi 	} else if (!lock->convert_pending) {
329e6f0c6e6SJoseph Qi 		mlog(0, "%s: res %.*s, owner died and lock has been moved back "
330e6f0c6e6SJoseph Qi 				"to granted list, retry convert.\n",
331e6f0c6e6SJoseph Qi 				dlm->name, res->lockname.len, res->lockname.name);
332ac7cf246SJoseph Qi 		status = DLM_RECOVERING;
3336714d8e8SKurt Hackel 	}
334e6f0c6e6SJoseph Qi 
335e6f0c6e6SJoseph Qi 	lock->convert_pending = 0;
3366714d8e8SKurt Hackel bail:
3376714d8e8SKurt Hackel 	spin_unlock(&res->spinlock);
3386714d8e8SKurt Hackel 
3396714d8e8SKurt Hackel 	/* TODO: should this be a wake_one? */
3406714d8e8SKurt Hackel 	/* wake up any IN_PROGRESS waiters */
3416714d8e8SKurt Hackel 	wake_up(&res->wq);
3426714d8e8SKurt Hackel 
3436714d8e8SKurt Hackel 	return status;
3446714d8e8SKurt Hackel }
3456714d8e8SKurt Hackel 
3466714d8e8SKurt Hackel /* sends DLM_CONVERT_LOCK_MSG to master site
3476714d8e8SKurt Hackel  * locking:
3486714d8e8SKurt Hackel  *   caller needs:  none
3496714d8e8SKurt Hackel  *   taken:         none
3506714d8e8SKurt Hackel  *   held on exit:  none
3516714d8e8SKurt Hackel  * returns: DLM_NOLOCKMGR, status from remote node
3526714d8e8SKurt Hackel  */
dlm_send_remote_convert_request(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,struct dlm_lock * lock,int flags,int type)3536714d8e8SKurt Hackel static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm,
3546714d8e8SKurt Hackel 					   struct dlm_lock_resource *res,
3556714d8e8SKurt Hackel 					   struct dlm_lock *lock, int flags, int type)
3566714d8e8SKurt Hackel {
3576714d8e8SKurt Hackel 	struct dlm_convert_lock convert;
3586714d8e8SKurt Hackel 	int tmpret;
3596714d8e8SKurt Hackel 	enum dlm_status ret;
3606714d8e8SKurt Hackel 	int status = 0;
3616714d8e8SKurt Hackel 	struct kvec vec[2];
3626714d8e8SKurt Hackel 	size_t veclen = 1;
3636714d8e8SKurt Hackel 
364ef6b689bSTao Ma 	mlog(0, "%.*s\n", res->lockname.len, res->lockname.name);
3656714d8e8SKurt Hackel 
3666714d8e8SKurt Hackel 	memset(&convert, 0, sizeof(struct dlm_convert_lock));
3676714d8e8SKurt Hackel 	convert.node_idx = dlm->node_num;
3686714d8e8SKurt Hackel 	convert.requested_type = type;
3696714d8e8SKurt Hackel 	convert.cookie = lock->ml.cookie;
3706714d8e8SKurt Hackel 	convert.namelen = res->lockname.len;
3716714d8e8SKurt Hackel 	convert.flags = cpu_to_be32(flags);
3726714d8e8SKurt Hackel 	memcpy(convert.name, res->lockname.name, convert.namelen);
3736714d8e8SKurt Hackel 
3746714d8e8SKurt Hackel 	vec[0].iov_len = sizeof(struct dlm_convert_lock);
3756714d8e8SKurt Hackel 	vec[0].iov_base = &convert;
3766714d8e8SKurt Hackel 
3776714d8e8SKurt Hackel 	if (flags & LKM_PUT_LVB) {
3786714d8e8SKurt Hackel 		/* extra data to send if we are updating lvb */
3796714d8e8SKurt Hackel 		vec[1].iov_len = DLM_LVB_LEN;
3806714d8e8SKurt Hackel 		vec[1].iov_base = lock->lksb->lvb;
3816714d8e8SKurt Hackel 		veclen++;
3826714d8e8SKurt Hackel 	}
3836714d8e8SKurt Hackel 
3846714d8e8SKurt Hackel 	tmpret = o2net_send_message_vec(DLM_CONVERT_LOCK_MSG, dlm->key,
3856714d8e8SKurt Hackel 					vec, veclen, res->owner, &status);
3866714d8e8SKurt Hackel 	if (tmpret >= 0) {
3876714d8e8SKurt Hackel 		// successfully sent and received
3886714d8e8SKurt Hackel 		ret = status;  // this is already a dlm_status
3896714d8e8SKurt Hackel 		if (ret == DLM_RECOVERING) {
3906714d8e8SKurt Hackel 			mlog(0, "node %u returned DLM_RECOVERING from convert "
3916714d8e8SKurt Hackel 			     "message!\n", res->owner);
3926714d8e8SKurt Hackel 		} else if (ret == DLM_MIGRATING) {
3936714d8e8SKurt Hackel 			mlog(0, "node %u returned DLM_MIGRATING from convert "
3946714d8e8SKurt Hackel 			     "message!\n", res->owner);
3956714d8e8SKurt Hackel 		} else if (ret == DLM_FORWARD) {
3966714d8e8SKurt Hackel 			mlog(0, "node %u returned DLM_FORWARD from convert "
3976714d8e8SKurt Hackel 			     "message!\n", res->owner);
3986714d8e8SKurt Hackel 		} else if (ret != DLM_NORMAL && ret != DLM_NOTQUEUED)
3996714d8e8SKurt Hackel 			dlm_error(ret);
4006714d8e8SKurt Hackel 	} else {
401a5196ec5SWengang Wang 		mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
402a5196ec5SWengang Wang 		     "node %u\n", tmpret, DLM_CONVERT_LOCK_MSG, dlm->key,
403a5196ec5SWengang Wang 		     res->owner);
4046714d8e8SKurt Hackel 		if (dlm_is_host_down(tmpret)) {
40544465a7dSKurt Hackel 			/* instead of logging the same network error over
40644465a7dSKurt Hackel 			 * and over, sleep here and wait for the heartbeat
40744465a7dSKurt Hackel 			 * to notice the node is dead.  times out after 5s. */
40844465a7dSKurt Hackel 			dlm_wait_for_node_death(dlm, res->owner,
40944465a7dSKurt Hackel 						DLM_NODE_DEATH_WAIT_MAX);
4106714d8e8SKurt Hackel 			ret = DLM_RECOVERING;
4116714d8e8SKurt Hackel 			mlog(0, "node %u died so returning DLM_RECOVERING "
4126714d8e8SKurt Hackel 			     "from convert message!\n", res->owner);
4136714d8e8SKurt Hackel 		} else {
4146714d8e8SKurt Hackel 			ret = dlm_err_to_dlm_status(tmpret);
4156714d8e8SKurt Hackel 		}
4166714d8e8SKurt Hackel 	}
4176714d8e8SKurt Hackel 
4186714d8e8SKurt Hackel 	return ret;
4196714d8e8SKurt Hackel }
4206714d8e8SKurt Hackel 
4216714d8e8SKurt Hackel /* handler for DLM_CONVERT_LOCK_MSG on master site
4226714d8e8SKurt Hackel  * locking:
4236714d8e8SKurt Hackel  *   caller needs:  none
4246714d8e8SKurt Hackel  *   taken:         takes and drop res->spinlock
4256714d8e8SKurt Hackel  *   held on exit:  none
4266714d8e8SKurt Hackel  * returns: DLM_NORMAL, DLM_IVLOCKID, DLM_BADARGS,
4276714d8e8SKurt Hackel  *          status from __dlmconvert_master
4286714d8e8SKurt Hackel  */
dlm_convert_lock_handler(struct o2net_msg * msg,u32 len,void * data,void ** ret_data)429d74c9803SKurt Hackel int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data,
430d74c9803SKurt Hackel 			     void **ret_data)
4316714d8e8SKurt Hackel {
4326714d8e8SKurt Hackel 	struct dlm_ctxt *dlm = data;
4336714d8e8SKurt Hackel 	struct dlm_convert_lock *cnv = (struct dlm_convert_lock *)msg->buf;
4346714d8e8SKurt Hackel 	struct dlm_lock_resource *res = NULL;
4356714d8e8SKurt Hackel 	struct dlm_lock *lock = NULL;
436df53cd3bSDong Fang 	struct dlm_lock *tmp_lock;
4376714d8e8SKurt Hackel 	struct dlm_lockstatus *lksb;
4386714d8e8SKurt Hackel 	enum dlm_status status = DLM_NORMAL;
4396714d8e8SKurt Hackel 	u32 flags;
440a6fa3640SKurt Hackel 	int call_ast = 0, kick_thread = 0, ast_reserved = 0, wake = 0;
4416714d8e8SKurt Hackel 
4426714d8e8SKurt Hackel 	if (!dlm_grab(dlm)) {
4436714d8e8SKurt Hackel 		dlm_error(DLM_REJECTED);
4446714d8e8SKurt Hackel 		return DLM_REJECTED;
4456714d8e8SKurt Hackel 	}
4466714d8e8SKurt Hackel 
4476714d8e8SKurt Hackel 	mlog_bug_on_msg(!dlm_domain_fully_joined(dlm),
4486714d8e8SKurt Hackel 			"Domain %s not fully joined!\n", dlm->name);
4496714d8e8SKurt Hackel 
4506714d8e8SKurt Hackel 	if (cnv->namelen > DLM_LOCKID_NAME_MAX) {
4516714d8e8SKurt Hackel 		status = DLM_IVBUFLEN;
4526714d8e8SKurt Hackel 		dlm_error(status);
4536714d8e8SKurt Hackel 		goto leave;
4546714d8e8SKurt Hackel 	}
4556714d8e8SKurt Hackel 
4566714d8e8SKurt Hackel 	flags = be32_to_cpu(cnv->flags);
4576714d8e8SKurt Hackel 
4586714d8e8SKurt Hackel 	if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
4596714d8e8SKurt Hackel 	     (LKM_PUT_LVB|LKM_GET_LVB)) {
4606714d8e8SKurt Hackel 		mlog(ML_ERROR, "both PUT and GET lvb specified\n");
4616714d8e8SKurt Hackel 		status = DLM_BADARGS;
4626714d8e8SKurt Hackel 		goto leave;
4636714d8e8SKurt Hackel 	}
4646714d8e8SKurt Hackel 
4656714d8e8SKurt Hackel 	mlog(0, "lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" :
4666714d8e8SKurt Hackel 	     (flags & LKM_GET_LVB ? "get lvb" : "none"));
4676714d8e8SKurt Hackel 
4686714d8e8SKurt Hackel 	status = DLM_IVLOCKID;
4696714d8e8SKurt Hackel 	res = dlm_lookup_lockres(dlm, cnv->name, cnv->namelen);
4706714d8e8SKurt Hackel 	if (!res) {
4716714d8e8SKurt Hackel 		dlm_error(status);
4726714d8e8SKurt Hackel 		goto leave;
4736714d8e8SKurt Hackel 	}
4746714d8e8SKurt Hackel 
4756714d8e8SKurt Hackel 	spin_lock(&res->spinlock);
476b220532aSKurt Hackel 	status = __dlm_lockres_state_to_status(res);
477b220532aSKurt Hackel 	if (status != DLM_NORMAL) {
478b220532aSKurt Hackel 		spin_unlock(&res->spinlock);
479b220532aSKurt Hackel 		dlm_error(status);
480b220532aSKurt Hackel 		goto leave;
481b220532aSKurt Hackel 	}
482df53cd3bSDong Fang 	list_for_each_entry(tmp_lock, &res->granted, list) {
483df53cd3bSDong Fang 		if (tmp_lock->ml.cookie == cnv->cookie &&
484df53cd3bSDong Fang 		    tmp_lock->ml.node == cnv->node_idx) {
485df53cd3bSDong Fang 			lock = tmp_lock;
4866714d8e8SKurt Hackel 			dlm_lock_get(lock);
4876714d8e8SKurt Hackel 			break;
4886714d8e8SKurt Hackel 		}
4896714d8e8SKurt Hackel 	}
4906714d8e8SKurt Hackel 	spin_unlock(&res->spinlock);
4916714d8e8SKurt Hackel 	if (!lock) {
4926714d8e8SKurt Hackel 		status = DLM_IVLOCKID;
49390aaaf1cSKurt Hackel 		mlog(ML_ERROR, "did not find lock to convert on grant queue! "
49490aaaf1cSKurt Hackel 			       "cookie=%u:%llu\n",
49574aa2585SKurt Hackel 		     dlm_get_lock_cookie_node(be64_to_cpu(cnv->cookie)),
49674aa2585SKurt Hackel 		     dlm_get_lock_cookie_seq(be64_to_cpu(cnv->cookie)));
4972af37ce8STao Ma 		dlm_print_one_lock_resource(res);
4986714d8e8SKurt Hackel 		goto leave;
4996714d8e8SKurt Hackel 	}
5006714d8e8SKurt Hackel 
5016714d8e8SKurt Hackel 	/* found the lock */
5026714d8e8SKurt Hackel 	lksb = lock->lksb;
5036714d8e8SKurt Hackel 
5046714d8e8SKurt Hackel 	/* see if caller needed to get/put lvb */
5056714d8e8SKurt Hackel 	if (flags & LKM_PUT_LVB) {
5066714d8e8SKurt Hackel 		BUG_ON(lksb->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
5076714d8e8SKurt Hackel 		lksb->flags |= DLM_LKSB_PUT_LVB;
5086714d8e8SKurt Hackel 		memcpy(&lksb->lvb[0], &cnv->lvb[0], DLM_LVB_LEN);
5096714d8e8SKurt Hackel 	} else if (flags & LKM_GET_LVB) {
5106714d8e8SKurt Hackel 		BUG_ON(lksb->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
5116714d8e8SKurt Hackel 		lksb->flags |= DLM_LKSB_GET_LVB;
5126714d8e8SKurt Hackel 	}
5136714d8e8SKurt Hackel 
5146714d8e8SKurt Hackel 	spin_lock(&res->spinlock);
5156714d8e8SKurt Hackel 	status = __dlm_lockres_state_to_status(res);
5166714d8e8SKurt Hackel 	if (status == DLM_NORMAL) {
5176714d8e8SKurt Hackel 		__dlm_lockres_reserve_ast(res);
518e2b5e450SKurt Hackel 		ast_reserved = 1;
5196714d8e8SKurt Hackel 		res->state |= DLM_LOCK_RES_IN_PROGRESS;
5206714d8e8SKurt Hackel 		status = __dlmconvert_master(dlm, res, lock, flags,
5216714d8e8SKurt Hackel 					     cnv->requested_type,
5226714d8e8SKurt Hackel 					     &call_ast, &kick_thread);
5236714d8e8SKurt Hackel 		res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
524a6fa3640SKurt Hackel 		wake = 1;
5256714d8e8SKurt Hackel 	}
5266714d8e8SKurt Hackel 	spin_unlock(&res->spinlock);
527a6fa3640SKurt Hackel 	if (wake)
528a6fa3640SKurt Hackel 		wake_up(&res->wq);
5296714d8e8SKurt Hackel 
5306714d8e8SKurt Hackel 	if (status != DLM_NORMAL) {
5316714d8e8SKurt Hackel 		if (status != DLM_NOTQUEUED)
5326714d8e8SKurt Hackel 			dlm_error(status);
5336714d8e8SKurt Hackel 		lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
5346714d8e8SKurt Hackel 	}
5356714d8e8SKurt Hackel 
5366714d8e8SKurt Hackel leave:
53790aaaf1cSKurt Hackel 	if (lock)
5386714d8e8SKurt Hackel 		dlm_lock_put(lock);
5396714d8e8SKurt Hackel 
540e2b5e450SKurt Hackel 	/* either queue the ast or release it, if reserved */
5416714d8e8SKurt Hackel 	if (call_ast)
5426714d8e8SKurt Hackel 		dlm_queue_ast(dlm, lock);
543e2b5e450SKurt Hackel 	else if (ast_reserved)
5446714d8e8SKurt Hackel 		dlm_lockres_release_ast(dlm, res);
5456714d8e8SKurt Hackel 
5466714d8e8SKurt Hackel 	if (kick_thread)
5476714d8e8SKurt Hackel 		dlm_kick_thread(dlm, res);
5486714d8e8SKurt Hackel 
5496714d8e8SKurt Hackel 	if (res)
5506714d8e8SKurt Hackel 		dlm_lockres_put(res);
5516714d8e8SKurt Hackel 
5526714d8e8SKurt Hackel 	dlm_put(dlm);
5536714d8e8SKurt Hackel 
5546714d8e8SKurt Hackel 	return status;
5556714d8e8SKurt Hackel }
556