xref: /openbmc/linux/fs/dlm/lockspace.c (revision 7587cdef)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include <linux/module.h>
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 #include "user.h"
27 #include "ast.h"
28 
29 static int			ls_count;
30 static struct mutex		ls_lock;
31 static struct list_head		lslist;
32 static spinlock_t		lslist_lock;
33 static struct task_struct *	scand_task;
34 
35 
36 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
37 {
38 	ssize_t ret = len;
39 	int n;
40 	int rc = kstrtoint(buf, 0, &n);
41 
42 	if (rc)
43 		return rc;
44 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
45 	if (!ls)
46 		return -EINVAL;
47 
48 	switch (n) {
49 	case 0:
50 		dlm_ls_stop(ls);
51 		break;
52 	case 1:
53 		dlm_ls_start(ls);
54 		break;
55 	default:
56 		ret = -EINVAL;
57 	}
58 	dlm_put_lockspace(ls);
59 	return ret;
60 }
61 
62 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
63 {
64 	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
65 
66 	if (rc)
67 		return rc;
68 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
69 	wake_up(&ls->ls_uevent_wait);
70 	return len;
71 }
72 
73 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
74 {
75 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
76 }
77 
78 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
79 {
80 	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
81 
82 	if (rc)
83 		return rc;
84 	return len;
85 }
86 
87 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
88 {
89 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
90 }
91 
92 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
93 {
94 	int val;
95 	int rc = kstrtoint(buf, 0, &val);
96 
97 	if (rc)
98 		return rc;
99 	if (val == 1)
100 		set_bit(LSFL_NODIR, &ls->ls_flags);
101 	return len;
102 }
103 
104 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
105 {
106 	uint32_t status = dlm_recover_status(ls);
107 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
108 }
109 
110 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
111 {
112 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
113 }
114 
115 struct dlm_attr {
116 	struct attribute attr;
117 	ssize_t (*show)(struct dlm_ls *, char *);
118 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
119 };
120 
121 static struct dlm_attr dlm_attr_control = {
122 	.attr  = {.name = "control", .mode = S_IWUSR},
123 	.store = dlm_control_store
124 };
125 
126 static struct dlm_attr dlm_attr_event = {
127 	.attr  = {.name = "event_done", .mode = S_IWUSR},
128 	.store = dlm_event_store
129 };
130 
131 static struct dlm_attr dlm_attr_id = {
132 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
133 	.show  = dlm_id_show,
134 	.store = dlm_id_store
135 };
136 
137 static struct dlm_attr dlm_attr_nodir = {
138 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
139 	.show  = dlm_nodir_show,
140 	.store = dlm_nodir_store
141 };
142 
143 static struct dlm_attr dlm_attr_recover_status = {
144 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
145 	.show  = dlm_recover_status_show
146 };
147 
148 static struct dlm_attr dlm_attr_recover_nodeid = {
149 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
150 	.show  = dlm_recover_nodeid_show
151 };
152 
153 static struct attribute *dlm_attrs[] = {
154 	&dlm_attr_control.attr,
155 	&dlm_attr_event.attr,
156 	&dlm_attr_id.attr,
157 	&dlm_attr_nodir.attr,
158 	&dlm_attr_recover_status.attr,
159 	&dlm_attr_recover_nodeid.attr,
160 	NULL,
161 };
162 ATTRIBUTE_GROUPS(dlm);
163 
164 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
165 			     char *buf)
166 {
167 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
168 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
169 	return a->show ? a->show(ls, buf) : 0;
170 }
171 
172 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
173 			      const char *buf, size_t len)
174 {
175 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
176 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
177 	return a->store ? a->store(ls, buf, len) : len;
178 }
179 
180 static void lockspace_kobj_release(struct kobject *k)
181 {
182 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
183 	kfree(ls);
184 }
185 
186 static const struct sysfs_ops dlm_attr_ops = {
187 	.show  = dlm_attr_show,
188 	.store = dlm_attr_store,
189 };
190 
191 static struct kobj_type dlm_ktype = {
192 	.default_groups = dlm_groups,
193 	.sysfs_ops     = &dlm_attr_ops,
194 	.release       = lockspace_kobj_release,
195 };
196 
197 static struct kset *dlm_kset;
198 
199 static int do_uevent(struct dlm_ls *ls, int in)
200 {
201 	if (in)
202 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
203 	else
204 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
205 
206 	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
207 
208 	/* dlm_controld will see the uevent, do the necessary group management
209 	   and then write to sysfs to wake us */
210 
211 	wait_event(ls->ls_uevent_wait,
212 		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
213 
214 	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
215 
216 	return ls->ls_uevent_result;
217 }
218 
219 static int dlm_uevent(struct kobject *kobj, struct kobj_uevent_env *env)
220 {
221 	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
222 
223 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
224 	return 0;
225 }
226 
227 static const struct kset_uevent_ops dlm_uevent_ops = {
228 	.uevent = dlm_uevent,
229 };
230 
231 int __init dlm_lockspace_init(void)
232 {
233 	ls_count = 0;
234 	mutex_init(&ls_lock);
235 	INIT_LIST_HEAD(&lslist);
236 	spin_lock_init(&lslist_lock);
237 
238 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
239 	if (!dlm_kset) {
240 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
241 		return -ENOMEM;
242 	}
243 	return 0;
244 }
245 
246 void dlm_lockspace_exit(void)
247 {
248 	kset_unregister(dlm_kset);
249 }
250 
251 static struct dlm_ls *find_ls_to_scan(void)
252 {
253 	struct dlm_ls *ls;
254 
255 	spin_lock(&lslist_lock);
256 	list_for_each_entry(ls, &lslist, ls_list) {
257 		if (time_after_eq(jiffies, ls->ls_scan_time +
258 					    dlm_config.ci_scan_secs * HZ)) {
259 			spin_unlock(&lslist_lock);
260 			return ls;
261 		}
262 	}
263 	spin_unlock(&lslist_lock);
264 	return NULL;
265 }
266 
267 static int dlm_scand(void *data)
268 {
269 	struct dlm_ls *ls;
270 
271 	while (!kthread_should_stop()) {
272 		ls = find_ls_to_scan();
273 		if (ls) {
274 			if (dlm_lock_recovery_try(ls)) {
275 				ls->ls_scan_time = jiffies;
276 				dlm_scan_rsbs(ls);
277 				dlm_scan_timeout(ls);
278 				dlm_unlock_recovery(ls);
279 			} else {
280 				ls->ls_scan_time += HZ;
281 			}
282 			continue;
283 		}
284 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
285 	}
286 	return 0;
287 }
288 
289 static int dlm_scand_start(void)
290 {
291 	struct task_struct *p;
292 	int error = 0;
293 
294 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
295 	if (IS_ERR(p))
296 		error = PTR_ERR(p);
297 	else
298 		scand_task = p;
299 	return error;
300 }
301 
302 static void dlm_scand_stop(void)
303 {
304 	kthread_stop(scand_task);
305 }
306 
307 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
308 {
309 	struct dlm_ls *ls;
310 
311 	spin_lock(&lslist_lock);
312 
313 	list_for_each_entry(ls, &lslist, ls_list) {
314 		if (ls->ls_global_id == id) {
315 			atomic_inc(&ls->ls_count);
316 			goto out;
317 		}
318 	}
319 	ls = NULL;
320  out:
321 	spin_unlock(&lslist_lock);
322 	return ls;
323 }
324 
325 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
326 {
327 	struct dlm_ls *ls;
328 
329 	spin_lock(&lslist_lock);
330 	list_for_each_entry(ls, &lslist, ls_list) {
331 		if (ls->ls_local_handle == lockspace) {
332 			atomic_inc(&ls->ls_count);
333 			goto out;
334 		}
335 	}
336 	ls = NULL;
337  out:
338 	spin_unlock(&lslist_lock);
339 	return ls;
340 }
341 
342 struct dlm_ls *dlm_find_lockspace_device(int minor)
343 {
344 	struct dlm_ls *ls;
345 
346 	spin_lock(&lslist_lock);
347 	list_for_each_entry(ls, &lslist, ls_list) {
348 		if (ls->ls_device.minor == minor) {
349 			atomic_inc(&ls->ls_count);
350 			goto out;
351 		}
352 	}
353 	ls = NULL;
354  out:
355 	spin_unlock(&lslist_lock);
356 	return ls;
357 }
358 
359 void dlm_put_lockspace(struct dlm_ls *ls)
360 {
361 	if (atomic_dec_and_test(&ls->ls_count))
362 		wake_up(&ls->ls_count_wait);
363 }
364 
365 static void remove_lockspace(struct dlm_ls *ls)
366 {
367 retry:
368 	wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
369 
370 	spin_lock(&lslist_lock);
371 	if (atomic_read(&ls->ls_count) != 0) {
372 		spin_unlock(&lslist_lock);
373 		goto retry;
374 	}
375 
376 	WARN_ON(ls->ls_create_count != 0);
377 	list_del(&ls->ls_list);
378 	spin_unlock(&lslist_lock);
379 }
380 
381 static int threads_start(void)
382 {
383 	int error;
384 
385 	error = dlm_scand_start();
386 	if (error) {
387 		log_print("cannot start dlm_scand thread %d", error);
388 		goto fail;
389 	}
390 
391 	/* Thread for sending/receiving messages for all lockspace's */
392 	error = dlm_midcomms_start();
393 	if (error) {
394 		log_print("cannot start dlm lowcomms %d", error);
395 		goto scand_fail;
396 	}
397 
398 	return 0;
399 
400  scand_fail:
401 	dlm_scand_stop();
402  fail:
403 	return error;
404 }
405 
406 static int new_lockspace(const char *name, const char *cluster,
407 			 uint32_t flags, int lvblen,
408 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
409 			 int *ops_result, dlm_lockspace_t **lockspace)
410 {
411 	struct dlm_ls *ls;
412 	int i, size, error;
413 	int do_unreg = 0;
414 	int namelen = strlen(name);
415 
416 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
417 		return -EINVAL;
418 
419 	if (lvblen % 8)
420 		return -EINVAL;
421 
422 	if (!try_module_get(THIS_MODULE))
423 		return -EINVAL;
424 
425 	if (!dlm_user_daemon_available()) {
426 		log_print("dlm user daemon not available");
427 		error = -EUNATCH;
428 		goto out;
429 	}
430 
431 	if (ops && ops_result) {
432 	       	if (!dlm_config.ci_recover_callbacks)
433 			*ops_result = -EOPNOTSUPP;
434 		else
435 			*ops_result = 0;
436 	}
437 
438 	if (!cluster)
439 		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
440 			  dlm_config.ci_cluster_name);
441 
442 	if (dlm_config.ci_recover_callbacks && cluster &&
443 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
444 		log_print("dlm cluster name '%s' does not match "
445 			  "the application cluster name '%s'",
446 			  dlm_config.ci_cluster_name, cluster);
447 		error = -EBADR;
448 		goto out;
449 	}
450 
451 	error = 0;
452 
453 	spin_lock(&lslist_lock);
454 	list_for_each_entry(ls, &lslist, ls_list) {
455 		WARN_ON(ls->ls_create_count <= 0);
456 		if (ls->ls_namelen != namelen)
457 			continue;
458 		if (memcmp(ls->ls_name, name, namelen))
459 			continue;
460 		if (flags & DLM_LSFL_NEWEXCL) {
461 			error = -EEXIST;
462 			break;
463 		}
464 		ls->ls_create_count++;
465 		*lockspace = ls;
466 		error = 1;
467 		break;
468 	}
469 	spin_unlock(&lslist_lock);
470 
471 	if (error)
472 		goto out;
473 
474 	error = -ENOMEM;
475 
476 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
477 	if (!ls)
478 		goto out;
479 	memcpy(ls->ls_name, name, namelen);
480 	ls->ls_namelen = namelen;
481 	ls->ls_lvblen = lvblen;
482 	atomic_set(&ls->ls_count, 0);
483 	init_waitqueue_head(&ls->ls_count_wait);
484 	ls->ls_flags = 0;
485 	ls->ls_scan_time = jiffies;
486 
487 	if (ops && dlm_config.ci_recover_callbacks) {
488 		ls->ls_ops = ops;
489 		ls->ls_ops_arg = ops_arg;
490 	}
491 
492 #ifdef CONFIG_DLM_DEPRECATED_API
493 	if (flags & DLM_LSFL_TIMEWARN) {
494 		pr_warn_once("===============================================================\n"
495 			     "WARNING: the dlm DLM_LSFL_TIMEWARN flag is being deprecated and\n"
496 			     "         will be removed in v6.2!\n"
497 			     "         Inclusive DLM_LSFL_TIMEWARN define in UAPI header!\n"
498 			     "===============================================================\n");
499 
500 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
501 	}
502 
503 	/* ls_exflags are forced to match among nodes, and we don't
504 	 * need to require all nodes to have some flags set
505 	 */
506 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
507 				    DLM_LSFL_NEWEXCL));
508 #else
509 	/* ls_exflags are forced to match among nodes, and we don't
510 	 * need to require all nodes to have some flags set
511 	 */
512 	ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
513 #endif
514 
515 	size = READ_ONCE(dlm_config.ci_rsbtbl_size);
516 	ls->ls_rsbtbl_size = size;
517 
518 	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
519 	if (!ls->ls_rsbtbl)
520 		goto out_lsfree;
521 	for (i = 0; i < size; i++) {
522 		ls->ls_rsbtbl[i].keep.rb_node = NULL;
523 		ls->ls_rsbtbl[i].toss.rb_node = NULL;
524 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
525 	}
526 
527 	spin_lock_init(&ls->ls_remove_spin);
528 	init_waitqueue_head(&ls->ls_remove_wait);
529 
530 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
531 		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
532 						 GFP_KERNEL);
533 		if (!ls->ls_remove_names[i])
534 			goto out_rsbtbl;
535 	}
536 
537 	idr_init(&ls->ls_lkbidr);
538 	spin_lock_init(&ls->ls_lkbidr_spin);
539 
540 	INIT_LIST_HEAD(&ls->ls_waiters);
541 	mutex_init(&ls->ls_waiters_mutex);
542 	INIT_LIST_HEAD(&ls->ls_orphans);
543 	mutex_init(&ls->ls_orphans_mutex);
544 #ifdef CONFIG_DLM_DEPRECATED_API
545 	INIT_LIST_HEAD(&ls->ls_timeout);
546 	mutex_init(&ls->ls_timeout_mutex);
547 #endif
548 
549 	INIT_LIST_HEAD(&ls->ls_new_rsb);
550 	spin_lock_init(&ls->ls_new_rsb_spin);
551 
552 	INIT_LIST_HEAD(&ls->ls_nodes);
553 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
554 	ls->ls_num_nodes = 0;
555 	ls->ls_low_nodeid = 0;
556 	ls->ls_total_weight = 0;
557 	ls->ls_node_array = NULL;
558 
559 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
560 	ls->ls_stub_rsb.res_ls = ls;
561 
562 	ls->ls_debug_rsb_dentry = NULL;
563 	ls->ls_debug_waiters_dentry = NULL;
564 
565 	init_waitqueue_head(&ls->ls_uevent_wait);
566 	ls->ls_uevent_result = 0;
567 	init_completion(&ls->ls_recovery_done);
568 	ls->ls_recovery_result = -1;
569 
570 	mutex_init(&ls->ls_cb_mutex);
571 	INIT_LIST_HEAD(&ls->ls_cb_delay);
572 
573 	ls->ls_recoverd_task = NULL;
574 	mutex_init(&ls->ls_recoverd_active);
575 	spin_lock_init(&ls->ls_recover_lock);
576 	spin_lock_init(&ls->ls_rcom_spin);
577 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
578 	ls->ls_recover_status = 0;
579 	ls->ls_recover_seq = 0;
580 	ls->ls_recover_args = NULL;
581 	init_rwsem(&ls->ls_in_recovery);
582 	init_rwsem(&ls->ls_recv_active);
583 	INIT_LIST_HEAD(&ls->ls_requestqueue);
584 	atomic_set(&ls->ls_requestqueue_cnt, 0);
585 	init_waitqueue_head(&ls->ls_requestqueue_wait);
586 	mutex_init(&ls->ls_requestqueue_mutex);
587 	spin_lock_init(&ls->ls_clear_proc_locks);
588 
589 	/* Due backwards compatibility with 3.1 we need to use maximum
590 	 * possible dlm message size to be sure the message will fit and
591 	 * not having out of bounds issues. However on sending side 3.2
592 	 * might send less.
593 	 */
594 	ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
595 	if (!ls->ls_recover_buf)
596 		goto out_lkbidr;
597 
598 	ls->ls_slot = 0;
599 	ls->ls_num_slots = 0;
600 	ls->ls_slots_size = 0;
601 	ls->ls_slots = NULL;
602 
603 	INIT_LIST_HEAD(&ls->ls_recover_list);
604 	spin_lock_init(&ls->ls_recover_list_lock);
605 	idr_init(&ls->ls_recover_idr);
606 	spin_lock_init(&ls->ls_recover_idr_lock);
607 	ls->ls_recover_list_count = 0;
608 	ls->ls_local_handle = ls;
609 	init_waitqueue_head(&ls->ls_wait_general);
610 	INIT_LIST_HEAD(&ls->ls_root_list);
611 	init_rwsem(&ls->ls_root_sem);
612 
613 	spin_lock(&lslist_lock);
614 	ls->ls_create_count = 1;
615 	list_add(&ls->ls_list, &lslist);
616 	spin_unlock(&lslist_lock);
617 
618 	if (flags & DLM_LSFL_FS) {
619 		error = dlm_callback_start(ls);
620 		if (error) {
621 			log_error(ls, "can't start dlm_callback %d", error);
622 			goto out_delist;
623 		}
624 	}
625 
626 	init_waitqueue_head(&ls->ls_recover_lock_wait);
627 
628 	/*
629 	 * Once started, dlm_recoverd first looks for ls in lslist, then
630 	 * initializes ls_in_recovery as locked in "down" mode.  We need
631 	 * to wait for the wakeup from dlm_recoverd because in_recovery
632 	 * has to start out in down mode.
633 	 */
634 
635 	error = dlm_recoverd_start(ls);
636 	if (error) {
637 		log_error(ls, "can't start dlm_recoverd %d", error);
638 		goto out_callback;
639 	}
640 
641 	wait_event(ls->ls_recover_lock_wait,
642 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
643 
644 	/* let kobject handle freeing of ls if there's an error */
645 	do_unreg = 1;
646 
647 	ls->ls_kobj.kset = dlm_kset;
648 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
649 				     "%s", ls->ls_name);
650 	if (error)
651 		goto out_recoverd;
652 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
653 
654 	/* This uevent triggers dlm_controld in userspace to add us to the
655 	   group of nodes that are members of this lockspace (managed by the
656 	   cluster infrastructure.)  Once it's done that, it tells us who the
657 	   current lockspace members are (via configfs) and then tells the
658 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
659 
660 	error = do_uevent(ls, 1);
661 	if (error)
662 		goto out_recoverd;
663 
664 	/* wait until recovery is successful or failed */
665 	wait_for_completion(&ls->ls_recovery_done);
666 	error = ls->ls_recovery_result;
667 	if (error)
668 		goto out_members;
669 
670 	dlm_create_debug_file(ls);
671 
672 	log_rinfo(ls, "join complete");
673 	*lockspace = ls;
674 	return 0;
675 
676  out_members:
677 	do_uevent(ls, 0);
678 	dlm_clear_members(ls);
679 	kfree(ls->ls_node_array);
680  out_recoverd:
681 	dlm_recoverd_stop(ls);
682  out_callback:
683 	dlm_callback_stop(ls);
684  out_delist:
685 	spin_lock(&lslist_lock);
686 	list_del(&ls->ls_list);
687 	spin_unlock(&lslist_lock);
688 	idr_destroy(&ls->ls_recover_idr);
689 	kfree(ls->ls_recover_buf);
690  out_lkbidr:
691 	idr_destroy(&ls->ls_lkbidr);
692  out_rsbtbl:
693 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
694 		kfree(ls->ls_remove_names[i]);
695 	vfree(ls->ls_rsbtbl);
696  out_lsfree:
697 	if (do_unreg)
698 		kobject_put(&ls->ls_kobj);
699 	else
700 		kfree(ls);
701  out:
702 	module_put(THIS_MODULE);
703 	return error;
704 }
705 
706 static int __dlm_new_lockspace(const char *name, const char *cluster,
707 			       uint32_t flags, int lvblen,
708 			       const struct dlm_lockspace_ops *ops,
709 			       void *ops_arg, int *ops_result,
710 			       dlm_lockspace_t **lockspace)
711 {
712 	int error = 0;
713 
714 	mutex_lock(&ls_lock);
715 	if (!ls_count)
716 		error = threads_start();
717 	if (error)
718 		goto out;
719 
720 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
721 			      ops_result, lockspace);
722 	if (!error)
723 		ls_count++;
724 	if (error > 0)
725 		error = 0;
726 	if (!ls_count) {
727 		dlm_scand_stop();
728 		dlm_midcomms_shutdown();
729 		dlm_lowcomms_stop();
730 	}
731  out:
732 	mutex_unlock(&ls_lock);
733 	return error;
734 }
735 
736 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
737 		      int lvblen, const struct dlm_lockspace_ops *ops,
738 		      void *ops_arg, int *ops_result,
739 		      dlm_lockspace_t **lockspace)
740 {
741 	return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
742 				   ops, ops_arg, ops_result, lockspace);
743 }
744 
745 int dlm_new_user_lockspace(const char *name, const char *cluster,
746 			   uint32_t flags, int lvblen,
747 			   const struct dlm_lockspace_ops *ops,
748 			   void *ops_arg, int *ops_result,
749 			   dlm_lockspace_t **lockspace)
750 {
751 	return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
752 				   ops_arg, ops_result, lockspace);
753 }
754 
755 static int lkb_idr_is_local(int id, void *p, void *data)
756 {
757 	struct dlm_lkb *lkb = p;
758 
759 	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
760 }
761 
762 static int lkb_idr_is_any(int id, void *p, void *data)
763 {
764 	return 1;
765 }
766 
767 static int lkb_idr_free(int id, void *p, void *data)
768 {
769 	struct dlm_lkb *lkb = p;
770 
771 	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
772 		dlm_free_lvb(lkb->lkb_lvbptr);
773 
774 	dlm_free_lkb(lkb);
775 	return 0;
776 }
777 
778 /* NOTE: We check the lkbidr here rather than the resource table.
779    This is because there may be LKBs queued as ASTs that have been unlinked
780    from their RSBs and are pending deletion once the AST has been delivered */
781 
782 static int lockspace_busy(struct dlm_ls *ls, int force)
783 {
784 	int rv;
785 
786 	spin_lock(&ls->ls_lkbidr_spin);
787 	if (force == 0) {
788 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
789 	} else if (force == 1) {
790 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
791 	} else {
792 		rv = 0;
793 	}
794 	spin_unlock(&ls->ls_lkbidr_spin);
795 	return rv;
796 }
797 
798 static int release_lockspace(struct dlm_ls *ls, int force)
799 {
800 	struct dlm_rsb *rsb;
801 	struct rb_node *n;
802 	int i, busy, rv;
803 
804 	busy = lockspace_busy(ls, force);
805 
806 	spin_lock(&lslist_lock);
807 	if (ls->ls_create_count == 1) {
808 		if (busy) {
809 			rv = -EBUSY;
810 		} else {
811 			/* remove_lockspace takes ls off lslist */
812 			ls->ls_create_count = 0;
813 			rv = 0;
814 		}
815 	} else if (ls->ls_create_count > 1) {
816 		rv = --ls->ls_create_count;
817 	} else {
818 		rv = -EINVAL;
819 	}
820 	spin_unlock(&lslist_lock);
821 
822 	if (rv) {
823 		log_debug(ls, "release_lockspace no remove %d", rv);
824 		return rv;
825 	}
826 
827 	dlm_device_deregister(ls);
828 
829 	if (force < 3 && dlm_user_daemon_available())
830 		do_uevent(ls, 0);
831 
832 	dlm_recoverd_stop(ls);
833 
834 	if (ls_count == 1) {
835 		dlm_scand_stop();
836 		dlm_clear_members(ls);
837 		dlm_midcomms_shutdown();
838 	}
839 
840 	dlm_callback_stop(ls);
841 
842 	remove_lockspace(ls);
843 
844 	dlm_delete_debug_file(ls);
845 
846 	idr_destroy(&ls->ls_recover_idr);
847 	kfree(ls->ls_recover_buf);
848 
849 	/*
850 	 * Free all lkb's in idr
851 	 */
852 
853 	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
854 	idr_destroy(&ls->ls_lkbidr);
855 
856 	/*
857 	 * Free all rsb's on rsbtbl[] lists
858 	 */
859 
860 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
861 		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
862 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
863 			rb_erase(n, &ls->ls_rsbtbl[i].keep);
864 			dlm_free_rsb(rsb);
865 		}
866 
867 		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
868 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
869 			rb_erase(n, &ls->ls_rsbtbl[i].toss);
870 			dlm_free_rsb(rsb);
871 		}
872 	}
873 
874 	vfree(ls->ls_rsbtbl);
875 
876 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
877 		kfree(ls->ls_remove_names[i]);
878 
879 	while (!list_empty(&ls->ls_new_rsb)) {
880 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
881 				       res_hashchain);
882 		list_del(&rsb->res_hashchain);
883 		dlm_free_rsb(rsb);
884 	}
885 
886 	/*
887 	 * Free structures on any other lists
888 	 */
889 
890 	dlm_purge_requestqueue(ls);
891 	kfree(ls->ls_recover_args);
892 	dlm_clear_members(ls);
893 	dlm_clear_members_gone(ls);
894 	kfree(ls->ls_node_array);
895 	log_rinfo(ls, "release_lockspace final free");
896 	kobject_put(&ls->ls_kobj);
897 	/* The ls structure will be freed when the kobject is done with */
898 
899 	module_put(THIS_MODULE);
900 	return 0;
901 }
902 
903 /*
904  * Called when a system has released all its locks and is not going to use the
905  * lockspace any longer.  We free everything we're managing for this lockspace.
906  * Remaining nodes will go through the recovery process as if we'd died.  The
907  * lockspace must continue to function as usual, participating in recoveries,
908  * until this returns.
909  *
910  * Force has 4 possible values:
911  * 0 - don't destroy lockspace if it has any LKBs
912  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
913  * 2 - destroy lockspace regardless of LKBs
914  * 3 - destroy lockspace as part of a forced shutdown
915  */
916 
917 int dlm_release_lockspace(void *lockspace, int force)
918 {
919 	struct dlm_ls *ls;
920 	int error;
921 
922 	ls = dlm_find_lockspace_local(lockspace);
923 	if (!ls)
924 		return -EINVAL;
925 	dlm_put_lockspace(ls);
926 
927 	mutex_lock(&ls_lock);
928 	error = release_lockspace(ls, force);
929 	if (!error)
930 		ls_count--;
931 	if (!ls_count)
932 		dlm_lowcomms_stop();
933 	mutex_unlock(&ls_lock);
934 
935 	return error;
936 }
937 
938 void dlm_stop_lockspaces(void)
939 {
940 	struct dlm_ls *ls;
941 	int count;
942 
943  restart:
944 	count = 0;
945 	spin_lock(&lslist_lock);
946 	list_for_each_entry(ls, &lslist, ls_list) {
947 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
948 			count++;
949 			continue;
950 		}
951 		spin_unlock(&lslist_lock);
952 		log_error(ls, "no userland control daemon, stopping lockspace");
953 		dlm_ls_stop(ls);
954 		goto restart;
955 	}
956 	spin_unlock(&lslist_lock);
957 
958 	if (count)
959 		log_print("dlm user daemon left %d lockspaces", count);
960 }
961 
962 void dlm_stop_lockspaces_check(void)
963 {
964 	struct dlm_ls *ls;
965 
966 	spin_lock(&lslist_lock);
967 	list_for_each_entry(ls, &lslist, ls_list) {
968 		if (WARN_ON(!rwsem_is_locked(&ls->ls_in_recovery) ||
969 			    !dlm_locking_stopped(ls)))
970 			break;
971 	}
972 	spin_unlock(&lslist_lock);
973 }
974