xref: /openbmc/linux/fs/ocfs2/dlm/dlmlock.c (revision 87c2ce3b)
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmlock.c
5  *
6  * underlying calls for lock creation
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26 
27 
28 #include <linux/module.h>
29 #include <linux/fs.h>
30 #include <linux/types.h>
31 #include <linux/slab.h>
32 #include <linux/highmem.h>
33 #include <linux/utsname.h>
34 #include <linux/init.h>
35 #include <linux/sysctl.h>
36 #include <linux/random.h>
37 #include <linux/blkdev.h>
38 #include <linux/socket.h>
39 #include <linux/inet.h>
40 #include <linux/spinlock.h>
41 #include <linux/delay.h>
42 
43 
44 #include "cluster/heartbeat.h"
45 #include "cluster/nodemanager.h"
46 #include "cluster/tcp.h"
47 
48 #include "dlmapi.h"
49 #include "dlmcommon.h"
50 
51 #include "dlmconvert.h"
52 
53 #define MLOG_MASK_PREFIX ML_DLM
54 #include "cluster/masklog.h"
55 
56 static spinlock_t dlm_cookie_lock = SPIN_LOCK_UNLOCKED;
57 static u64 dlm_next_cookie = 1;
58 
59 static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
60 					       struct dlm_lock_resource *res,
61 					       struct dlm_lock *lock, int flags);
62 static void dlm_init_lock(struct dlm_lock *newlock, int type,
63 			  u8 node, u64 cookie);
64 static void dlm_lock_release(struct kref *kref);
65 static void dlm_lock_detach_lockres(struct dlm_lock *lock);
66 
67 /* Tell us whether we can grant a new lock request.
68  * locking:
69  *   caller needs:  res->spinlock
70  *   taken:         none
71  *   held on exit:  none
72  * returns: 1 if the lock can be granted, 0 otherwise.
73  */
74 static int dlm_can_grant_new_lock(struct dlm_lock_resource *res,
75 				  struct dlm_lock *lock)
76 {
77 	struct list_head *iter;
78 	struct dlm_lock *tmplock;
79 
80 	list_for_each(iter, &res->granted) {
81 		tmplock = list_entry(iter, struct dlm_lock, list);
82 
83 		if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type))
84 			return 0;
85 	}
86 
87 	list_for_each(iter, &res->converting) {
88 		tmplock = list_entry(iter, struct dlm_lock, list);
89 
90 		if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type))
91 			return 0;
92 	}
93 
94 	return 1;
95 }
96 
97 /* performs lock creation at the lockres master site
98  * locking:
99  *   caller needs:  none
100  *   taken:         takes and drops res->spinlock
101  *   held on exit:  none
102  * returns: DLM_NORMAL, DLM_NOTQUEUED
103  */
104 static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
105 				      struct dlm_lock_resource *res,
106 				      struct dlm_lock *lock, int flags)
107 {
108 	int call_ast = 0, kick_thread = 0;
109 	enum dlm_status status = DLM_NORMAL;
110 
111 	mlog_entry("type=%d\n", lock->ml.type);
112 
113 	spin_lock(&res->spinlock);
114 	/* if called from dlm_create_lock_handler, need to
115 	 * ensure it will not sleep in dlm_wait_on_lockres */
116 	status = __dlm_lockres_state_to_status(res);
117 	if (status != DLM_NORMAL &&
118 	    lock->ml.node != dlm->node_num) {
119 		/* erf.  state changed after lock was dropped. */
120 		spin_unlock(&res->spinlock);
121 		dlm_error(status);
122 		return status;
123 	}
124 	__dlm_wait_on_lockres(res);
125 	__dlm_lockres_reserve_ast(res);
126 
127 	if (dlm_can_grant_new_lock(res, lock)) {
128 		mlog(0, "I can grant this lock right away\n");
129 		/* got it right away */
130 		lock->lksb->status = DLM_NORMAL;
131 		status = DLM_NORMAL;
132 		dlm_lock_get(lock);
133 		list_add_tail(&lock->list, &res->granted);
134 
135 		/* for the recovery lock, we can't allow the ast
136 		 * to be queued since the dlmthread is already
137 		 * frozen.  but the recovery lock is always locked
138 		 * with LKM_NOQUEUE so we do not need the ast in
139 		 * this special case */
140 		if (!dlm_is_recovery_lock(res->lockname.name,
141 					  res->lockname.len)) {
142 			kick_thread = 1;
143 			call_ast = 1;
144 		}
145 	} else {
146 		/* for NOQUEUE request, unless we get the
147 		 * lock right away, return DLM_NOTQUEUED */
148 		if (flags & LKM_NOQUEUE)
149 			status = DLM_NOTQUEUED;
150 		else {
151 			dlm_lock_get(lock);
152 			list_add_tail(&lock->list, &res->blocked);
153 			kick_thread = 1;
154 		}
155 	}
156 
157 	spin_unlock(&res->spinlock);
158 	wake_up(&res->wq);
159 
160 	/* either queue the ast or release it */
161 	if (call_ast)
162 		dlm_queue_ast(dlm, lock);
163 	else
164 		dlm_lockres_release_ast(dlm, res);
165 
166 	dlm_lockres_calc_usage(dlm, res);
167 	if (kick_thread)
168 		dlm_kick_thread(dlm, res);
169 
170 	return status;
171 }
172 
173 void dlm_revert_pending_lock(struct dlm_lock_resource *res,
174 			     struct dlm_lock *lock)
175 {
176 	/* remove from local queue if it failed */
177 	list_del_init(&lock->list);
178 	lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
179 }
180 
181 
182 /*
183  * locking:
184  *   caller needs:  none
185  *   taken:         takes and drops res->spinlock
186  *   held on exit:  none
187  * returns: DLM_DENIED, DLM_RECOVERING, or net status
188  */
189 static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
190 				      struct dlm_lock_resource *res,
191 				      struct dlm_lock *lock, int flags)
192 {
193 	enum dlm_status status = DLM_DENIED;
194 
195 	mlog_entry("type=%d\n", lock->ml.type);
196 	mlog(0, "lockres %.*s, flags = 0x%x\n", res->lockname.len,
197 	     res->lockname.name, flags);
198 
199 	spin_lock(&res->spinlock);
200 
201 	/* will exit this call with spinlock held */
202 	__dlm_wait_on_lockres(res);
203 	res->state |= DLM_LOCK_RES_IN_PROGRESS;
204 
205 	/* add lock to local (secondary) queue */
206 	dlm_lock_get(lock);
207 	list_add_tail(&lock->list, &res->blocked);
208 	lock->lock_pending = 1;
209 	spin_unlock(&res->spinlock);
210 
211 	/* spec seems to say that you will get DLM_NORMAL when the lock
212 	 * has been queued, meaning we need to wait for a reply here. */
213 	status = dlm_send_remote_lock_request(dlm, res, lock, flags);
214 
215 	spin_lock(&res->spinlock);
216 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
217 	lock->lock_pending = 0;
218 	if (status != DLM_NORMAL) {
219 		if (status != DLM_NOTQUEUED)
220 			dlm_error(status);
221 		dlm_revert_pending_lock(res, lock);
222 		dlm_lock_put(lock);
223 	}
224 	spin_unlock(&res->spinlock);
225 
226 	dlm_lockres_calc_usage(dlm, res);
227 
228 	wake_up(&res->wq);
229 	return status;
230 }
231 
232 
233 /* for remote lock creation.
234  * locking:
235  *   caller needs:  none, but need res->state & DLM_LOCK_RES_IN_PROGRESS
236  *   taken:         none
237  *   held on exit:  none
238  * returns: DLM_NOLOCKMGR, or net status
239  */
240 static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
241 					       struct dlm_lock_resource *res,
242 					       struct dlm_lock *lock, int flags)
243 {
244 	struct dlm_create_lock create;
245 	int tmpret, status = 0;
246 	enum dlm_status ret;
247 
248 	mlog_entry_void();
249 
250 	memset(&create, 0, sizeof(create));
251 	create.node_idx = dlm->node_num;
252 	create.requested_type = lock->ml.type;
253 	create.cookie = lock->ml.cookie;
254 	create.namelen = res->lockname.len;
255 	create.flags = cpu_to_be32(flags);
256 	memcpy(create.name, res->lockname.name, create.namelen);
257 
258 	tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create,
259 				    sizeof(create), res->owner, &status);
260 	if (tmpret >= 0) {
261 		// successfully sent and received
262 		ret = status;  // this is already a dlm_status
263 	} else {
264 		mlog_errno(tmpret);
265 		if (dlm_is_host_down(tmpret)) {
266 			ret = DLM_RECOVERING;
267 			mlog(0, "node %u died so returning DLM_RECOVERING "
268 			     "from lock message!\n", res->owner);
269 		} else {
270 			ret = dlm_err_to_dlm_status(tmpret);
271 		}
272 	}
273 
274 	return ret;
275 }
276 
277 void dlm_lock_get(struct dlm_lock *lock)
278 {
279 	kref_get(&lock->lock_refs);
280 }
281 
282 void dlm_lock_put(struct dlm_lock *lock)
283 {
284 	kref_put(&lock->lock_refs, dlm_lock_release);
285 }
286 
287 static void dlm_lock_release(struct kref *kref)
288 {
289 	struct dlm_lock *lock;
290 
291 	lock = container_of(kref, struct dlm_lock, lock_refs);
292 
293 	BUG_ON(!list_empty(&lock->list));
294 	BUG_ON(!list_empty(&lock->ast_list));
295 	BUG_ON(!list_empty(&lock->bast_list));
296 	BUG_ON(lock->ast_pending);
297 	BUG_ON(lock->bast_pending);
298 
299 	dlm_lock_detach_lockres(lock);
300 
301 	if (lock->lksb_kernel_allocated) {
302 		mlog(0, "freeing kernel-allocated lksb\n");
303 		kfree(lock->lksb);
304 	}
305 	kfree(lock);
306 }
307 
308 /* associate a lock with it's lockres, getting a ref on the lockres */
309 void dlm_lock_attach_lockres(struct dlm_lock *lock,
310 			     struct dlm_lock_resource *res)
311 {
312 	dlm_lockres_get(res);
313 	lock->lockres = res;
314 }
315 
316 /* drop ref on lockres, if there is still one associated with lock */
317 static void dlm_lock_detach_lockres(struct dlm_lock *lock)
318 {
319 	struct dlm_lock_resource *res;
320 
321 	res = lock->lockres;
322 	if (res) {
323 		lock->lockres = NULL;
324 		mlog(0, "removing lock's lockres reference\n");
325 		dlm_lockres_put(res);
326 	}
327 }
328 
329 static void dlm_init_lock(struct dlm_lock *newlock, int type,
330 			  u8 node, u64 cookie)
331 {
332 	INIT_LIST_HEAD(&newlock->list);
333 	INIT_LIST_HEAD(&newlock->ast_list);
334 	INIT_LIST_HEAD(&newlock->bast_list);
335 	spin_lock_init(&newlock->spinlock);
336 	newlock->ml.type = type;
337 	newlock->ml.convert_type = LKM_IVMODE;
338 	newlock->ml.highest_blocked = LKM_IVMODE;
339 	newlock->ml.node = node;
340 	newlock->ml.pad1 = 0;
341 	newlock->ml.list = 0;
342 	newlock->ml.flags = 0;
343 	newlock->ast = NULL;
344 	newlock->bast = NULL;
345 	newlock->astdata = NULL;
346 	newlock->ml.cookie = cpu_to_be64(cookie);
347 	newlock->ast_pending = 0;
348 	newlock->bast_pending = 0;
349 	newlock->convert_pending = 0;
350 	newlock->lock_pending = 0;
351 	newlock->unlock_pending = 0;
352 	newlock->cancel_pending = 0;
353 	newlock->lksb_kernel_allocated = 0;
354 
355 	kref_init(&newlock->lock_refs);
356 }
357 
358 struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
359 			       struct dlm_lockstatus *lksb)
360 {
361 	struct dlm_lock *lock;
362 	int kernel_allocated = 0;
363 
364 	lock = kcalloc(1, sizeof(*lock), GFP_KERNEL);
365 	if (!lock)
366 		return NULL;
367 
368 	if (!lksb) {
369 		/* zero memory only if kernel-allocated */
370 		lksb = kcalloc(1, sizeof(*lksb), GFP_KERNEL);
371 		if (!lksb) {
372 			kfree(lock);
373 			return NULL;
374 		}
375 		kernel_allocated = 1;
376 	}
377 
378 	dlm_init_lock(lock, type, node, cookie);
379 	if (kernel_allocated)
380 		lock->lksb_kernel_allocated = 1;
381 	lock->lksb = lksb;
382 	lksb->lockid = lock;
383 	return lock;
384 }
385 
386 /* handler for lock creation net message
387  * locking:
388  *   caller needs:  none
389  *   taken:         takes and drops res->spinlock
390  *   held on exit:  none
391  * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED
392  */
393 int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data)
394 {
395 	struct dlm_ctxt *dlm = data;
396 	struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf;
397 	struct dlm_lock_resource *res = NULL;
398 	struct dlm_lock *newlock = NULL;
399 	struct dlm_lockstatus *lksb = NULL;
400 	enum dlm_status status = DLM_NORMAL;
401 	char *name;
402 	unsigned int namelen;
403 
404 	BUG_ON(!dlm);
405 
406 	mlog_entry_void();
407 
408 	if (!dlm_grab(dlm))
409 		return DLM_REJECTED;
410 
411 	mlog_bug_on_msg(!dlm_domain_fully_joined(dlm),
412 			"Domain %s not fully joined!\n", dlm->name);
413 
414 	name = create->name;
415 	namelen = create->namelen;
416 
417 	status = DLM_IVBUFLEN;
418 	if (namelen > DLM_LOCKID_NAME_MAX) {
419 		dlm_error(status);
420 		goto leave;
421 	}
422 
423 	status = DLM_SYSERR;
424 	newlock = dlm_new_lock(create->requested_type,
425 			       create->node_idx,
426 			       be64_to_cpu(create->cookie), NULL);
427 	if (!newlock) {
428 		dlm_error(status);
429 		goto leave;
430 	}
431 
432 	lksb = newlock->lksb;
433 
434 	if (be32_to_cpu(create->flags) & LKM_GET_LVB) {
435 		lksb->flags |= DLM_LKSB_GET_LVB;
436 		mlog(0, "set DLM_LKSB_GET_LVB flag\n");
437 	}
438 
439 	status = DLM_IVLOCKID;
440 	res = dlm_lookup_lockres(dlm, name, namelen);
441 	if (!res) {
442 		dlm_error(status);
443 		goto leave;
444 	}
445 
446 	spin_lock(&res->spinlock);
447 	status = __dlm_lockres_state_to_status(res);
448 	spin_unlock(&res->spinlock);
449 
450 	if (status != DLM_NORMAL) {
451 		mlog(0, "lockres recovering/migrating/in-progress\n");
452 		goto leave;
453 	}
454 
455 	dlm_lock_attach_lockres(newlock, res);
456 
457 	status = dlmlock_master(dlm, res, newlock, be32_to_cpu(create->flags));
458 leave:
459 	if (status != DLM_NORMAL)
460 		if (newlock)
461 			dlm_lock_put(newlock);
462 
463 	if (res)
464 		dlm_lockres_put(res);
465 
466 	dlm_put(dlm);
467 
468 	return status;
469 }
470 
471 
472 /* fetch next node-local (u8 nodenum + u56 cookie) into u64 */
473 static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie)
474 {
475 	u64 tmpnode = node_num;
476 
477 	/* shift single byte of node num into top 8 bits */
478 	tmpnode <<= 56;
479 
480 	spin_lock(&dlm_cookie_lock);
481 	*cookie = (dlm_next_cookie | tmpnode);
482 	if (++dlm_next_cookie & 0xff00000000000000ull) {
483 		mlog(0, "This node's cookie will now wrap!\n");
484 		dlm_next_cookie = 1;
485 	}
486 	spin_unlock(&dlm_cookie_lock);
487 }
488 
489 enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode,
490 			struct dlm_lockstatus *lksb, int flags,
491 			const char *name, dlm_astlockfunc_t *ast, void *data,
492 			dlm_bastlockfunc_t *bast)
493 {
494 	enum dlm_status status;
495 	struct dlm_lock_resource *res = NULL;
496 	struct dlm_lock *lock = NULL;
497 	int convert = 0, recovery = 0;
498 
499 	/* yes this function is a mess.
500 	 * TODO: clean this up.  lots of common code in the
501 	 *       lock and convert paths, especially in the retry blocks */
502 	if (!lksb) {
503 		dlm_error(DLM_BADARGS);
504 		return DLM_BADARGS;
505 	}
506 
507 	status = DLM_BADPARAM;
508 	if (mode != LKM_EXMODE && mode != LKM_PRMODE && mode != LKM_NLMODE) {
509 		dlm_error(status);
510 		goto error;
511 	}
512 
513 	if (flags & ~LKM_VALID_FLAGS) {
514 		dlm_error(status);
515 		goto error;
516 	}
517 
518 	convert = (flags & LKM_CONVERT);
519 	recovery = (flags & LKM_RECOVERY);
520 
521 	if (recovery &&
522 	    (!dlm_is_recovery_lock(name, strlen(name)) || convert) ) {
523 		dlm_error(status);
524 		goto error;
525 	}
526 	if (convert && (flags & LKM_LOCAL)) {
527 		mlog(ML_ERROR, "strange LOCAL convert request!\n");
528 		goto error;
529 	}
530 
531 	if (convert) {
532 		/* CONVERT request */
533 
534 		/* if converting, must pass in a valid dlm_lock */
535 		lock = lksb->lockid;
536 		if (!lock) {
537 			mlog(ML_ERROR, "NULL lock pointer in convert "
538 			     "request\n");
539 			goto error;
540 		}
541 
542 		res = lock->lockres;
543 		if (!res) {
544 			mlog(ML_ERROR, "NULL lockres pointer in convert "
545 			     "request\n");
546 			goto error;
547 		}
548 		dlm_lockres_get(res);
549 
550 		/* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are
551 	 	 * static after the original lock call.  convert requests will
552 		 * ensure that everything is the same, or return DLM_BADARGS.
553 	 	 * this means that DLM_DENIED_NOASTS will never be returned.
554 	 	 */
555 		if (lock->lksb != lksb || lock->ast != ast ||
556 		    lock->bast != bast || lock->astdata != data) {
557 			status = DLM_BADARGS;
558 			mlog(ML_ERROR, "new args:  lksb=%p, ast=%p, bast=%p, "
559 			     "astdata=%p\n", lksb, ast, bast, data);
560 			mlog(ML_ERROR, "orig args: lksb=%p, ast=%p, bast=%p, "
561 			     "astdata=%p\n", lock->lksb, lock->ast,
562 			     lock->bast, lock->astdata);
563 			goto error;
564 		}
565 retry_convert:
566 		dlm_wait_for_recovery(dlm);
567 
568 		if (res->owner == dlm->node_num)
569 			status = dlmconvert_master(dlm, res, lock, flags, mode);
570 		else
571 			status = dlmconvert_remote(dlm, res, lock, flags, mode);
572 		if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
573 		    status == DLM_FORWARD) {
574 			/* for now, see how this works without sleeping
575 			 * and just retry right away.  I suspect the reco
576 			 * or migration will complete fast enough that
577 			 * no waiting will be necessary */
578 			mlog(0, "retrying convert with migration/recovery/"
579 			     "in-progress\n");
580 			msleep(100);
581 			goto retry_convert;
582 		}
583 	} else {
584 		u64 tmpcookie;
585 
586 		/* LOCK request */
587 		status = DLM_BADARGS;
588 		if (!name) {
589 			dlm_error(status);
590 			goto error;
591 		}
592 
593 		status = DLM_IVBUFLEN;
594 		if (strlen(name) > DLM_LOCKID_NAME_MAX || strlen(name) < 1) {
595 			dlm_error(status);
596 			goto error;
597 		}
598 
599 		dlm_get_next_cookie(dlm->node_num, &tmpcookie);
600 		lock = dlm_new_lock(mode, dlm->node_num, tmpcookie, lksb);
601 		if (!lock) {
602 			dlm_error(status);
603 			goto error;
604 		}
605 
606 		if (!recovery)
607 			dlm_wait_for_recovery(dlm);
608 
609 		/* find or create the lock resource */
610 		res = dlm_get_lock_resource(dlm, name, flags);
611 		if (!res) {
612 			status = DLM_IVLOCKID;
613 			dlm_error(status);
614 			goto error;
615 		}
616 
617 		mlog(0, "type=%d, flags = 0x%x\n", mode, flags);
618 		mlog(0, "creating lock: lock=%p res=%p\n", lock, res);
619 
620 		dlm_lock_attach_lockres(lock, res);
621 		lock->ast = ast;
622 		lock->bast = bast;
623 		lock->astdata = data;
624 
625 retry_lock:
626 		if (flags & LKM_VALBLK) {
627 			mlog(0, "LKM_VALBLK passed by caller\n");
628 
629 			/* LVB requests for non PR, PW or EX locks are
630 			 * ignored. */
631 			if (mode < LKM_PRMODE)
632 				flags &= ~LKM_VALBLK;
633 			else {
634 				flags |= LKM_GET_LVB;
635 				lock->lksb->flags |= DLM_LKSB_GET_LVB;
636 			}
637 		}
638 
639 		if (res->owner == dlm->node_num)
640 			status = dlmlock_master(dlm, res, lock, flags);
641 		else
642 			status = dlmlock_remote(dlm, res, lock, flags);
643 
644 		if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
645 		    status == DLM_FORWARD) {
646 			mlog(0, "retrying lock with migration/"
647 			     "recovery/in progress\n");
648 			msleep(100);
649 			dlm_wait_for_recovery(dlm);
650 			goto retry_lock;
651 		}
652 
653 		if (status != DLM_NORMAL) {
654 			lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
655 			if (status != DLM_NOTQUEUED)
656 				dlm_error(status);
657 			goto error;
658 		}
659 	}
660 
661 error:
662 	if (status != DLM_NORMAL) {
663 		if (lock && !convert)
664 			dlm_lock_put(lock);
665 		// this is kind of unnecessary
666 		lksb->status = status;
667 	}
668 
669 	/* put lockres ref from the convert path
670 	 * or from dlm_get_lock_resource */
671 	if (res)
672 		dlm_lockres_put(res);
673 
674 	return status;
675 }
676 EXPORT_SYMBOL_GPL(dlmlock);
677