xref: /openbmc/linux/fs/dlm/lockspace.c (revision 72ed5d5624af384eaf74d84915810d54486a75e2)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include <linux/module.h>
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 static struct task_struct *	scand_task;
33 
34 
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37 	ssize_t ret = len;
38 	int n;
39 	int rc = kstrtoint(buf, 0, &n);
40 
41 	if (rc)
42 		return rc;
43 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
44 	if (!ls)
45 		return -EINVAL;
46 
47 	switch (n) {
48 	case 0:
49 		dlm_ls_stop(ls);
50 		break;
51 	case 1:
52 		dlm_ls_start(ls);
53 		break;
54 	default:
55 		ret = -EINVAL;
56 	}
57 	dlm_put_lockspace(ls);
58 	return ret;
59 }
60 
61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62 {
63 	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64 
65 	if (rc)
66 		return rc;
67 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68 	wake_up(&ls->ls_uevent_wait);
69 	return len;
70 }
71 
72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73 {
74 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75 }
76 
77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78 {
79 	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80 
81 	if (rc)
82 		return rc;
83 	return len;
84 }
85 
86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87 {
88 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89 }
90 
91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92 {
93 	int val;
94 	int rc = kstrtoint(buf, 0, &val);
95 
96 	if (rc)
97 		return rc;
98 	if (val == 1)
99 		set_bit(LSFL_NODIR, &ls->ls_flags);
100 	return len;
101 }
102 
103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104 {
105 	uint32_t status = dlm_recover_status(ls);
106 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
107 }
108 
109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110 {
111 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112 }
113 
114 struct dlm_attr {
115 	struct attribute attr;
116 	ssize_t (*show)(struct dlm_ls *, char *);
117 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118 };
119 
120 static struct dlm_attr dlm_attr_control = {
121 	.attr  = {.name = "control", .mode = S_IWUSR},
122 	.store = dlm_control_store
123 };
124 
125 static struct dlm_attr dlm_attr_event = {
126 	.attr  = {.name = "event_done", .mode = S_IWUSR},
127 	.store = dlm_event_store
128 };
129 
130 static struct dlm_attr dlm_attr_id = {
131 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132 	.show  = dlm_id_show,
133 	.store = dlm_id_store
134 };
135 
136 static struct dlm_attr dlm_attr_nodir = {
137 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138 	.show  = dlm_nodir_show,
139 	.store = dlm_nodir_store
140 };
141 
142 static struct dlm_attr dlm_attr_recover_status = {
143 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
144 	.show  = dlm_recover_status_show
145 };
146 
147 static struct dlm_attr dlm_attr_recover_nodeid = {
148 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149 	.show  = dlm_recover_nodeid_show
150 };
151 
152 static struct attribute *dlm_attrs[] = {
153 	&dlm_attr_control.attr,
154 	&dlm_attr_event.attr,
155 	&dlm_attr_id.attr,
156 	&dlm_attr_nodir.attr,
157 	&dlm_attr_recover_status.attr,
158 	&dlm_attr_recover_nodeid.attr,
159 	NULL,
160 };
161 ATTRIBUTE_GROUPS(dlm);
162 
163 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
164 			     char *buf)
165 {
166 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
167 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
168 	return a->show ? a->show(ls, buf) : 0;
169 }
170 
171 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
172 			      const char *buf, size_t len)
173 {
174 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
175 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
176 	return a->store ? a->store(ls, buf, len) : len;
177 }
178 
179 static void lockspace_kobj_release(struct kobject *k)
180 {
181 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
182 	kfree(ls);
183 }
184 
185 static const struct sysfs_ops dlm_attr_ops = {
186 	.show  = dlm_attr_show,
187 	.store = dlm_attr_store,
188 };
189 
190 static struct kobj_type dlm_ktype = {
191 	.default_groups = dlm_groups,
192 	.sysfs_ops     = &dlm_attr_ops,
193 	.release       = lockspace_kobj_release,
194 };
195 
196 static struct kset *dlm_kset;
197 
198 static int do_uevent(struct dlm_ls *ls, int in)
199 {
200 	if (in)
201 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
202 	else
203 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
204 
205 	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
206 
207 	/* dlm_controld will see the uevent, do the necessary group management
208 	   and then write to sysfs to wake us */
209 
210 	wait_event(ls->ls_uevent_wait,
211 		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
212 
213 	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
214 
215 	return ls->ls_uevent_result;
216 }
217 
218 static int dlm_uevent(struct kobject *kobj, struct kobj_uevent_env *env)
219 {
220 	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
221 
222 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
223 	return 0;
224 }
225 
226 static const struct kset_uevent_ops dlm_uevent_ops = {
227 	.uevent = dlm_uevent,
228 };
229 
230 int __init dlm_lockspace_init(void)
231 {
232 	ls_count = 0;
233 	mutex_init(&ls_lock);
234 	INIT_LIST_HEAD(&lslist);
235 	spin_lock_init(&lslist_lock);
236 
237 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
238 	if (!dlm_kset) {
239 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
240 		return -ENOMEM;
241 	}
242 	return 0;
243 }
244 
245 void dlm_lockspace_exit(void)
246 {
247 	kset_unregister(dlm_kset);
248 }
249 
250 static struct dlm_ls *find_ls_to_scan(void)
251 {
252 	struct dlm_ls *ls;
253 
254 	spin_lock(&lslist_lock);
255 	list_for_each_entry(ls, &lslist, ls_list) {
256 		if (time_after_eq(jiffies, ls->ls_scan_time +
257 					    dlm_config.ci_scan_secs * HZ)) {
258 			spin_unlock(&lslist_lock);
259 			return ls;
260 		}
261 	}
262 	spin_unlock(&lslist_lock);
263 	return NULL;
264 }
265 
266 static int dlm_scand(void *data)
267 {
268 	struct dlm_ls *ls;
269 
270 	while (!kthread_should_stop()) {
271 		ls = find_ls_to_scan();
272 		if (ls) {
273 			if (dlm_lock_recovery_try(ls)) {
274 				ls->ls_scan_time = jiffies;
275 				dlm_scan_rsbs(ls);
276 				dlm_scan_timeout(ls);
277 				dlm_unlock_recovery(ls);
278 			} else {
279 				ls->ls_scan_time += HZ;
280 			}
281 			continue;
282 		}
283 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
284 	}
285 	return 0;
286 }
287 
288 static int dlm_scand_start(void)
289 {
290 	struct task_struct *p;
291 	int error = 0;
292 
293 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
294 	if (IS_ERR(p))
295 		error = PTR_ERR(p);
296 	else
297 		scand_task = p;
298 	return error;
299 }
300 
301 static void dlm_scand_stop(void)
302 {
303 	kthread_stop(scand_task);
304 }
305 
306 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
307 {
308 	struct dlm_ls *ls;
309 
310 	spin_lock(&lslist_lock);
311 
312 	list_for_each_entry(ls, &lslist, ls_list) {
313 		if (ls->ls_global_id == id) {
314 			atomic_inc(&ls->ls_count);
315 			goto out;
316 		}
317 	}
318 	ls = NULL;
319  out:
320 	spin_unlock(&lslist_lock);
321 	return ls;
322 }
323 
324 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
325 {
326 	struct dlm_ls *ls;
327 
328 	spin_lock(&lslist_lock);
329 	list_for_each_entry(ls, &lslist, ls_list) {
330 		if (ls->ls_local_handle == lockspace) {
331 			atomic_inc(&ls->ls_count);
332 			goto out;
333 		}
334 	}
335 	ls = NULL;
336  out:
337 	spin_unlock(&lslist_lock);
338 	return ls;
339 }
340 
341 struct dlm_ls *dlm_find_lockspace_device(int minor)
342 {
343 	struct dlm_ls *ls;
344 
345 	spin_lock(&lslist_lock);
346 	list_for_each_entry(ls, &lslist, ls_list) {
347 		if (ls->ls_device.minor == minor) {
348 			atomic_inc(&ls->ls_count);
349 			goto out;
350 		}
351 	}
352 	ls = NULL;
353  out:
354 	spin_unlock(&lslist_lock);
355 	return ls;
356 }
357 
358 void dlm_put_lockspace(struct dlm_ls *ls)
359 {
360 	if (atomic_dec_and_test(&ls->ls_count))
361 		wake_up(&ls->ls_count_wait);
362 }
363 
364 static void remove_lockspace(struct dlm_ls *ls)
365 {
366 retry:
367 	wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
368 
369 	spin_lock(&lslist_lock);
370 	if (atomic_read(&ls->ls_count) != 0) {
371 		spin_unlock(&lslist_lock);
372 		goto retry;
373 	}
374 
375 	WARN_ON(ls->ls_create_count != 0);
376 	list_del(&ls->ls_list);
377 	spin_unlock(&lslist_lock);
378 }
379 
380 static int threads_start(void)
381 {
382 	int error;
383 
384 	error = dlm_scand_start();
385 	if (error) {
386 		log_print("cannot start dlm_scand thread %d", error);
387 		goto fail;
388 	}
389 
390 	/* Thread for sending/receiving messages for all lockspace's */
391 	error = dlm_midcomms_start();
392 	if (error) {
393 		log_print("cannot start dlm midcomms %d", error);
394 		goto scand_fail;
395 	}
396 
397 	return 0;
398 
399  scand_fail:
400 	dlm_scand_stop();
401  fail:
402 	return error;
403 }
404 
405 static int new_lockspace(const char *name, const char *cluster,
406 			 uint32_t flags, int lvblen,
407 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
408 			 int *ops_result, dlm_lockspace_t **lockspace)
409 {
410 	struct dlm_ls *ls;
411 	int i, size, error;
412 	int do_unreg = 0;
413 	int namelen = strlen(name);
414 
415 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
416 		return -EINVAL;
417 
418 	if (lvblen % 8)
419 		return -EINVAL;
420 
421 	if (!try_module_get(THIS_MODULE))
422 		return -EINVAL;
423 
424 	if (!dlm_user_daemon_available()) {
425 		log_print("dlm user daemon not available");
426 		error = -EUNATCH;
427 		goto out;
428 	}
429 
430 	if (ops && ops_result) {
431 	       	if (!dlm_config.ci_recover_callbacks)
432 			*ops_result = -EOPNOTSUPP;
433 		else
434 			*ops_result = 0;
435 	}
436 
437 	if (!cluster)
438 		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
439 			  dlm_config.ci_cluster_name);
440 
441 	if (dlm_config.ci_recover_callbacks && cluster &&
442 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
443 		log_print("dlm cluster name '%s' does not match "
444 			  "the application cluster name '%s'",
445 			  dlm_config.ci_cluster_name, cluster);
446 		error = -EBADR;
447 		goto out;
448 	}
449 
450 	error = 0;
451 
452 	spin_lock(&lslist_lock);
453 	list_for_each_entry(ls, &lslist, ls_list) {
454 		WARN_ON(ls->ls_create_count <= 0);
455 		if (ls->ls_namelen != namelen)
456 			continue;
457 		if (memcmp(ls->ls_name, name, namelen))
458 			continue;
459 		if (flags & DLM_LSFL_NEWEXCL) {
460 			error = -EEXIST;
461 			break;
462 		}
463 		ls->ls_create_count++;
464 		*lockspace = ls;
465 		error = 1;
466 		break;
467 	}
468 	spin_unlock(&lslist_lock);
469 
470 	if (error)
471 		goto out;
472 
473 	error = -ENOMEM;
474 
475 	ls = kzalloc(sizeof(*ls), GFP_NOFS);
476 	if (!ls)
477 		goto out;
478 	memcpy(ls->ls_name, name, namelen);
479 	ls->ls_namelen = namelen;
480 	ls->ls_lvblen = lvblen;
481 	atomic_set(&ls->ls_count, 0);
482 	init_waitqueue_head(&ls->ls_count_wait);
483 	ls->ls_flags = 0;
484 	ls->ls_scan_time = jiffies;
485 
486 	if (ops && dlm_config.ci_recover_callbacks) {
487 		ls->ls_ops = ops;
488 		ls->ls_ops_arg = ops_arg;
489 	}
490 
491 #ifdef CONFIG_DLM_DEPRECATED_API
492 	if (flags & DLM_LSFL_TIMEWARN) {
493 		pr_warn_once("===============================================================\n"
494 			     "WARNING: the dlm DLM_LSFL_TIMEWARN flag is being deprecated and\n"
495 			     "         will be removed in v6.2!\n"
496 			     "         Inclusive DLM_LSFL_TIMEWARN define in UAPI header!\n"
497 			     "===============================================================\n");
498 
499 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
500 	}
501 
502 	/* ls_exflags are forced to match among nodes, and we don't
503 	 * need to require all nodes to have some flags set
504 	 */
505 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
506 				    DLM_LSFL_NEWEXCL));
507 #else
508 	/* ls_exflags are forced to match among nodes, and we don't
509 	 * need to require all nodes to have some flags set
510 	 */
511 	ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
512 #endif
513 
514 	size = READ_ONCE(dlm_config.ci_rsbtbl_size);
515 	ls->ls_rsbtbl_size = size;
516 
517 	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
518 	if (!ls->ls_rsbtbl)
519 		goto out_lsfree;
520 	for (i = 0; i < size; i++) {
521 		ls->ls_rsbtbl[i].keep.rb_node = NULL;
522 		ls->ls_rsbtbl[i].toss.rb_node = NULL;
523 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
524 	}
525 
526 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
527 		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
528 						 GFP_KERNEL);
529 		if (!ls->ls_remove_names[i])
530 			goto out_rsbtbl;
531 	}
532 
533 	idr_init(&ls->ls_lkbidr);
534 	spin_lock_init(&ls->ls_lkbidr_spin);
535 
536 	INIT_LIST_HEAD(&ls->ls_waiters);
537 	mutex_init(&ls->ls_waiters_mutex);
538 	INIT_LIST_HEAD(&ls->ls_orphans);
539 	mutex_init(&ls->ls_orphans_mutex);
540 #ifdef CONFIG_DLM_DEPRECATED_API
541 	INIT_LIST_HEAD(&ls->ls_timeout);
542 	mutex_init(&ls->ls_timeout_mutex);
543 #endif
544 
545 	INIT_LIST_HEAD(&ls->ls_new_rsb);
546 	spin_lock_init(&ls->ls_new_rsb_spin);
547 
548 	INIT_LIST_HEAD(&ls->ls_nodes);
549 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
550 	ls->ls_num_nodes = 0;
551 	ls->ls_low_nodeid = 0;
552 	ls->ls_total_weight = 0;
553 	ls->ls_node_array = NULL;
554 
555 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
556 	ls->ls_stub_rsb.res_ls = ls;
557 
558 	ls->ls_debug_rsb_dentry = NULL;
559 	ls->ls_debug_waiters_dentry = NULL;
560 
561 	init_waitqueue_head(&ls->ls_uevent_wait);
562 	ls->ls_uevent_result = 0;
563 	init_completion(&ls->ls_recovery_done);
564 	ls->ls_recovery_result = -1;
565 
566 	spin_lock_init(&ls->ls_cb_lock);
567 	INIT_LIST_HEAD(&ls->ls_cb_delay);
568 
569 	ls->ls_recoverd_task = NULL;
570 	mutex_init(&ls->ls_recoverd_active);
571 	spin_lock_init(&ls->ls_recover_lock);
572 	spin_lock_init(&ls->ls_rcom_spin);
573 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
574 	ls->ls_recover_status = 0;
575 	ls->ls_recover_seq = 0;
576 	ls->ls_recover_args = NULL;
577 	init_rwsem(&ls->ls_in_recovery);
578 	init_rwsem(&ls->ls_recv_active);
579 	INIT_LIST_HEAD(&ls->ls_requestqueue);
580 	atomic_set(&ls->ls_requestqueue_cnt, 0);
581 	init_waitqueue_head(&ls->ls_requestqueue_wait);
582 	mutex_init(&ls->ls_requestqueue_mutex);
583 	spin_lock_init(&ls->ls_clear_proc_locks);
584 
585 	/* Due backwards compatibility with 3.1 we need to use maximum
586 	 * possible dlm message size to be sure the message will fit and
587 	 * not having out of bounds issues. However on sending side 3.2
588 	 * might send less.
589 	 */
590 	ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
591 	if (!ls->ls_recover_buf)
592 		goto out_lkbidr;
593 
594 	ls->ls_slot = 0;
595 	ls->ls_num_slots = 0;
596 	ls->ls_slots_size = 0;
597 	ls->ls_slots = NULL;
598 
599 	INIT_LIST_HEAD(&ls->ls_recover_list);
600 	spin_lock_init(&ls->ls_recover_list_lock);
601 	idr_init(&ls->ls_recover_idr);
602 	spin_lock_init(&ls->ls_recover_idr_lock);
603 	ls->ls_recover_list_count = 0;
604 	ls->ls_local_handle = ls;
605 	init_waitqueue_head(&ls->ls_wait_general);
606 	INIT_LIST_HEAD(&ls->ls_root_list);
607 	init_rwsem(&ls->ls_root_sem);
608 
609 	spin_lock(&lslist_lock);
610 	ls->ls_create_count = 1;
611 	list_add(&ls->ls_list, &lslist);
612 	spin_unlock(&lslist_lock);
613 
614 	if (flags & DLM_LSFL_FS) {
615 		error = dlm_callback_start(ls);
616 		if (error) {
617 			log_error(ls, "can't start dlm_callback %d", error);
618 			goto out_delist;
619 		}
620 	}
621 
622 	init_waitqueue_head(&ls->ls_recover_lock_wait);
623 
624 	/*
625 	 * Once started, dlm_recoverd first looks for ls in lslist, then
626 	 * initializes ls_in_recovery as locked in "down" mode.  We need
627 	 * to wait for the wakeup from dlm_recoverd because in_recovery
628 	 * has to start out in down mode.
629 	 */
630 
631 	error = dlm_recoverd_start(ls);
632 	if (error) {
633 		log_error(ls, "can't start dlm_recoverd %d", error);
634 		goto out_callback;
635 	}
636 
637 	wait_event(ls->ls_recover_lock_wait,
638 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
639 
640 	/* let kobject handle freeing of ls if there's an error */
641 	do_unreg = 1;
642 
643 	ls->ls_kobj.kset = dlm_kset;
644 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
645 				     "%s", ls->ls_name);
646 	if (error)
647 		goto out_recoverd;
648 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
649 
650 	/* This uevent triggers dlm_controld in userspace to add us to the
651 	   group of nodes that are members of this lockspace (managed by the
652 	   cluster infrastructure.)  Once it's done that, it tells us who the
653 	   current lockspace members are (via configfs) and then tells the
654 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
655 
656 	error = do_uevent(ls, 1);
657 	if (error)
658 		goto out_recoverd;
659 
660 	/* wait until recovery is successful or failed */
661 	wait_for_completion(&ls->ls_recovery_done);
662 	error = ls->ls_recovery_result;
663 	if (error)
664 		goto out_members;
665 
666 	dlm_create_debug_file(ls);
667 
668 	log_rinfo(ls, "join complete");
669 	*lockspace = ls;
670 	return 0;
671 
672  out_members:
673 	do_uevent(ls, 0);
674 	dlm_clear_members(ls);
675 	kfree(ls->ls_node_array);
676  out_recoverd:
677 	dlm_recoverd_stop(ls);
678  out_callback:
679 	dlm_callback_stop(ls);
680  out_delist:
681 	spin_lock(&lslist_lock);
682 	list_del(&ls->ls_list);
683 	spin_unlock(&lslist_lock);
684 	idr_destroy(&ls->ls_recover_idr);
685 	kfree(ls->ls_recover_buf);
686  out_lkbidr:
687 	idr_destroy(&ls->ls_lkbidr);
688  out_rsbtbl:
689 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
690 		kfree(ls->ls_remove_names[i]);
691 	vfree(ls->ls_rsbtbl);
692  out_lsfree:
693 	if (do_unreg)
694 		kobject_put(&ls->ls_kobj);
695 	else
696 		kfree(ls);
697  out:
698 	module_put(THIS_MODULE);
699 	return error;
700 }
701 
702 static int __dlm_new_lockspace(const char *name, const char *cluster,
703 			       uint32_t flags, int lvblen,
704 			       const struct dlm_lockspace_ops *ops,
705 			       void *ops_arg, int *ops_result,
706 			       dlm_lockspace_t **lockspace)
707 {
708 	int error = 0;
709 
710 	mutex_lock(&ls_lock);
711 	if (!ls_count)
712 		error = threads_start();
713 	if (error)
714 		goto out;
715 
716 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
717 			      ops_result, lockspace);
718 	if (!error)
719 		ls_count++;
720 	if (error > 0)
721 		error = 0;
722 	if (!ls_count) {
723 		dlm_scand_stop();
724 		dlm_midcomms_shutdown();
725 		dlm_midcomms_stop();
726 	}
727  out:
728 	mutex_unlock(&ls_lock);
729 	return error;
730 }
731 
732 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
733 		      int lvblen, const struct dlm_lockspace_ops *ops,
734 		      void *ops_arg, int *ops_result,
735 		      dlm_lockspace_t **lockspace)
736 {
737 	return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
738 				   ops, ops_arg, ops_result, lockspace);
739 }
740 
741 int dlm_new_user_lockspace(const char *name, const char *cluster,
742 			   uint32_t flags, int lvblen,
743 			   const struct dlm_lockspace_ops *ops,
744 			   void *ops_arg, int *ops_result,
745 			   dlm_lockspace_t **lockspace)
746 {
747 	return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
748 				   ops_arg, ops_result, lockspace);
749 }
750 
751 static int lkb_idr_is_local(int id, void *p, void *data)
752 {
753 	struct dlm_lkb *lkb = p;
754 
755 	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
756 }
757 
758 static int lkb_idr_is_any(int id, void *p, void *data)
759 {
760 	return 1;
761 }
762 
763 static int lkb_idr_free(int id, void *p, void *data)
764 {
765 	struct dlm_lkb *lkb = p;
766 
767 	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
768 		dlm_free_lvb(lkb->lkb_lvbptr);
769 
770 	dlm_free_lkb(lkb);
771 	return 0;
772 }
773 
774 /* NOTE: We check the lkbidr here rather than the resource table.
775    This is because there may be LKBs queued as ASTs that have been unlinked
776    from their RSBs and are pending deletion once the AST has been delivered */
777 
778 static int lockspace_busy(struct dlm_ls *ls, int force)
779 {
780 	int rv;
781 
782 	spin_lock(&ls->ls_lkbidr_spin);
783 	if (force == 0) {
784 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
785 	} else if (force == 1) {
786 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
787 	} else {
788 		rv = 0;
789 	}
790 	spin_unlock(&ls->ls_lkbidr_spin);
791 	return rv;
792 }
793 
794 static int release_lockspace(struct dlm_ls *ls, int force)
795 {
796 	struct dlm_rsb *rsb;
797 	struct rb_node *n;
798 	int i, busy, rv;
799 
800 	busy = lockspace_busy(ls, force);
801 
802 	spin_lock(&lslist_lock);
803 	if (ls->ls_create_count == 1) {
804 		if (busy) {
805 			rv = -EBUSY;
806 		} else {
807 			/* remove_lockspace takes ls off lslist */
808 			ls->ls_create_count = 0;
809 			rv = 0;
810 		}
811 	} else if (ls->ls_create_count > 1) {
812 		rv = --ls->ls_create_count;
813 	} else {
814 		rv = -EINVAL;
815 	}
816 	spin_unlock(&lslist_lock);
817 
818 	if (rv) {
819 		log_debug(ls, "release_lockspace no remove %d", rv);
820 		return rv;
821 	}
822 
823 	dlm_device_deregister(ls);
824 
825 	if (force < 3 && dlm_user_daemon_available())
826 		do_uevent(ls, 0);
827 
828 	dlm_recoverd_stop(ls);
829 
830 	if (ls_count == 1) {
831 		dlm_scand_stop();
832 		dlm_clear_members(ls);
833 		dlm_midcomms_shutdown();
834 	}
835 
836 	dlm_callback_stop(ls);
837 
838 	remove_lockspace(ls);
839 
840 	dlm_delete_debug_file(ls);
841 
842 	idr_destroy(&ls->ls_recover_idr);
843 	kfree(ls->ls_recover_buf);
844 
845 	/*
846 	 * Free all lkb's in idr
847 	 */
848 
849 	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
850 	idr_destroy(&ls->ls_lkbidr);
851 
852 	/*
853 	 * Free all rsb's on rsbtbl[] lists
854 	 */
855 
856 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
857 		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
858 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
859 			rb_erase(n, &ls->ls_rsbtbl[i].keep);
860 			dlm_free_rsb(rsb);
861 		}
862 
863 		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
864 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
865 			rb_erase(n, &ls->ls_rsbtbl[i].toss);
866 			dlm_free_rsb(rsb);
867 		}
868 	}
869 
870 	vfree(ls->ls_rsbtbl);
871 
872 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
873 		kfree(ls->ls_remove_names[i]);
874 
875 	while (!list_empty(&ls->ls_new_rsb)) {
876 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
877 				       res_hashchain);
878 		list_del(&rsb->res_hashchain);
879 		dlm_free_rsb(rsb);
880 	}
881 
882 	/*
883 	 * Free structures on any other lists
884 	 */
885 
886 	dlm_purge_requestqueue(ls);
887 	kfree(ls->ls_recover_args);
888 	dlm_clear_members(ls);
889 	dlm_clear_members_gone(ls);
890 	kfree(ls->ls_node_array);
891 	log_rinfo(ls, "release_lockspace final free");
892 	kobject_put(&ls->ls_kobj);
893 	/* The ls structure will be freed when the kobject is done with */
894 
895 	module_put(THIS_MODULE);
896 	return 0;
897 }
898 
899 /*
900  * Called when a system has released all its locks and is not going to use the
901  * lockspace any longer.  We free everything we're managing for this lockspace.
902  * Remaining nodes will go through the recovery process as if we'd died.  The
903  * lockspace must continue to function as usual, participating in recoveries,
904  * until this returns.
905  *
906  * Force has 4 possible values:
907  * 0 - don't destroy lockspace if it has any LKBs
908  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
909  * 2 - destroy lockspace regardless of LKBs
910  * 3 - destroy lockspace as part of a forced shutdown
911  */
912 
913 int dlm_release_lockspace(void *lockspace, int force)
914 {
915 	struct dlm_ls *ls;
916 	int error;
917 
918 	ls = dlm_find_lockspace_local(lockspace);
919 	if (!ls)
920 		return -EINVAL;
921 	dlm_put_lockspace(ls);
922 
923 	mutex_lock(&ls_lock);
924 	error = release_lockspace(ls, force);
925 	if (!error)
926 		ls_count--;
927 	if (!ls_count)
928 		dlm_midcomms_stop();
929 	mutex_unlock(&ls_lock);
930 
931 	return error;
932 }
933 
934 void dlm_stop_lockspaces(void)
935 {
936 	struct dlm_ls *ls;
937 	int count;
938 
939  restart:
940 	count = 0;
941 	spin_lock(&lslist_lock);
942 	list_for_each_entry(ls, &lslist, ls_list) {
943 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
944 			count++;
945 			continue;
946 		}
947 		spin_unlock(&lslist_lock);
948 		log_error(ls, "no userland control daemon, stopping lockspace");
949 		dlm_ls_stop(ls);
950 		goto restart;
951 	}
952 	spin_unlock(&lslist_lock);
953 
954 	if (count)
955 		log_print("dlm user daemon left %d lockspaces", count);
956 }
957 
958 void dlm_stop_lockspaces_check(void)
959 {
960 	struct dlm_ls *ls;
961 
962 	spin_lock(&lslist_lock);
963 	list_for_each_entry(ls, &lslist, ls_list) {
964 		if (WARN_ON(!rwsem_is_locked(&ls->ls_in_recovery) ||
965 			    !dlm_locking_stopped(ls)))
966 			break;
967 	}
968 	spin_unlock(&lslist_lock);
969 }
970