xref: /openbmc/linux/fs/dlm/lockspace.c (revision d2999e1b)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 static struct task_struct *	scand_task;
33 
34 
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37 	ssize_t ret = len;
38 	int n;
39 	int rc = kstrtoint(buf, 0, &n);
40 
41 	if (rc)
42 		return rc;
43 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
44 	if (!ls)
45 		return -EINVAL;
46 
47 	switch (n) {
48 	case 0:
49 		dlm_ls_stop(ls);
50 		break;
51 	case 1:
52 		dlm_ls_start(ls);
53 		break;
54 	default:
55 		ret = -EINVAL;
56 	}
57 	dlm_put_lockspace(ls);
58 	return ret;
59 }
60 
61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62 {
63 	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64 
65 	if (rc)
66 		return rc;
67 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68 	wake_up(&ls->ls_uevent_wait);
69 	return len;
70 }
71 
72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73 {
74 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75 }
76 
77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78 {
79 	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80 
81 	if (rc)
82 		return rc;
83 	return len;
84 }
85 
86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87 {
88 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89 }
90 
91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92 {
93 	int val;
94 	int rc = kstrtoint(buf, 0, &val);
95 
96 	if (rc)
97 		return rc;
98 	if (val == 1)
99 		set_bit(LSFL_NODIR, &ls->ls_flags);
100 	return len;
101 }
102 
103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104 {
105 	uint32_t status = dlm_recover_status(ls);
106 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
107 }
108 
109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110 {
111 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112 }
113 
114 struct dlm_attr {
115 	struct attribute attr;
116 	ssize_t (*show)(struct dlm_ls *, char *);
117 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118 };
119 
120 static struct dlm_attr dlm_attr_control = {
121 	.attr  = {.name = "control", .mode = S_IWUSR},
122 	.store = dlm_control_store
123 };
124 
125 static struct dlm_attr dlm_attr_event = {
126 	.attr  = {.name = "event_done", .mode = S_IWUSR},
127 	.store = dlm_event_store
128 };
129 
130 static struct dlm_attr dlm_attr_id = {
131 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132 	.show  = dlm_id_show,
133 	.store = dlm_id_store
134 };
135 
136 static struct dlm_attr dlm_attr_nodir = {
137 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138 	.show  = dlm_nodir_show,
139 	.store = dlm_nodir_store
140 };
141 
142 static struct dlm_attr dlm_attr_recover_status = {
143 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
144 	.show  = dlm_recover_status_show
145 };
146 
147 static struct dlm_attr dlm_attr_recover_nodeid = {
148 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149 	.show  = dlm_recover_nodeid_show
150 };
151 
152 static struct attribute *dlm_attrs[] = {
153 	&dlm_attr_control.attr,
154 	&dlm_attr_event.attr,
155 	&dlm_attr_id.attr,
156 	&dlm_attr_nodir.attr,
157 	&dlm_attr_recover_status.attr,
158 	&dlm_attr_recover_nodeid.attr,
159 	NULL,
160 };
161 
162 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
163 			     char *buf)
164 {
165 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
166 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
167 	return a->show ? a->show(ls, buf) : 0;
168 }
169 
170 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
171 			      const char *buf, size_t len)
172 {
173 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
174 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
175 	return a->store ? a->store(ls, buf, len) : len;
176 }
177 
178 static void lockspace_kobj_release(struct kobject *k)
179 {
180 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
181 	kfree(ls);
182 }
183 
184 static const struct sysfs_ops dlm_attr_ops = {
185 	.show  = dlm_attr_show,
186 	.store = dlm_attr_store,
187 };
188 
189 static struct kobj_type dlm_ktype = {
190 	.default_attrs = dlm_attrs,
191 	.sysfs_ops     = &dlm_attr_ops,
192 	.release       = lockspace_kobj_release,
193 };
194 
195 static struct kset *dlm_kset;
196 
197 static int do_uevent(struct dlm_ls *ls, int in)
198 {
199 	int error;
200 
201 	if (in)
202 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
203 	else
204 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
205 
206 	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
207 
208 	/* dlm_controld will see the uevent, do the necessary group management
209 	   and then write to sysfs to wake us */
210 
211 	error = wait_event_interruptible(ls->ls_uevent_wait,
212 			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
213 
214 	log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
215 
216 	if (error)
217 		goto out;
218 
219 	error = ls->ls_uevent_result;
220  out:
221 	if (error)
222 		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
223 			  error, ls->ls_uevent_result);
224 	return error;
225 }
226 
227 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
228 		      struct kobj_uevent_env *env)
229 {
230 	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
231 
232 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
233 	return 0;
234 }
235 
236 static struct kset_uevent_ops dlm_uevent_ops = {
237 	.uevent = dlm_uevent,
238 };
239 
240 int __init dlm_lockspace_init(void)
241 {
242 	ls_count = 0;
243 	mutex_init(&ls_lock);
244 	INIT_LIST_HEAD(&lslist);
245 	spin_lock_init(&lslist_lock);
246 
247 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
248 	if (!dlm_kset) {
249 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
250 		return -ENOMEM;
251 	}
252 	return 0;
253 }
254 
255 void dlm_lockspace_exit(void)
256 {
257 	kset_unregister(dlm_kset);
258 }
259 
260 static struct dlm_ls *find_ls_to_scan(void)
261 {
262 	struct dlm_ls *ls;
263 
264 	spin_lock(&lslist_lock);
265 	list_for_each_entry(ls, &lslist, ls_list) {
266 		if (time_after_eq(jiffies, ls->ls_scan_time +
267 					    dlm_config.ci_scan_secs * HZ)) {
268 			spin_unlock(&lslist_lock);
269 			return ls;
270 		}
271 	}
272 	spin_unlock(&lslist_lock);
273 	return NULL;
274 }
275 
276 static int dlm_scand(void *data)
277 {
278 	struct dlm_ls *ls;
279 
280 	while (!kthread_should_stop()) {
281 		ls = find_ls_to_scan();
282 		if (ls) {
283 			if (dlm_lock_recovery_try(ls)) {
284 				ls->ls_scan_time = jiffies;
285 				dlm_scan_rsbs(ls);
286 				dlm_scan_timeout(ls);
287 				dlm_scan_waiters(ls);
288 				dlm_unlock_recovery(ls);
289 			} else {
290 				ls->ls_scan_time += HZ;
291 			}
292 			continue;
293 		}
294 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
295 	}
296 	return 0;
297 }
298 
299 static int dlm_scand_start(void)
300 {
301 	struct task_struct *p;
302 	int error = 0;
303 
304 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
305 	if (IS_ERR(p))
306 		error = PTR_ERR(p);
307 	else
308 		scand_task = p;
309 	return error;
310 }
311 
312 static void dlm_scand_stop(void)
313 {
314 	kthread_stop(scand_task);
315 }
316 
317 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
318 {
319 	struct dlm_ls *ls;
320 
321 	spin_lock(&lslist_lock);
322 
323 	list_for_each_entry(ls, &lslist, ls_list) {
324 		if (ls->ls_global_id == id) {
325 			ls->ls_count++;
326 			goto out;
327 		}
328 	}
329 	ls = NULL;
330  out:
331 	spin_unlock(&lslist_lock);
332 	return ls;
333 }
334 
335 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
336 {
337 	struct dlm_ls *ls;
338 
339 	spin_lock(&lslist_lock);
340 	list_for_each_entry(ls, &lslist, ls_list) {
341 		if (ls->ls_local_handle == lockspace) {
342 			ls->ls_count++;
343 			goto out;
344 		}
345 	}
346 	ls = NULL;
347  out:
348 	spin_unlock(&lslist_lock);
349 	return ls;
350 }
351 
352 struct dlm_ls *dlm_find_lockspace_device(int minor)
353 {
354 	struct dlm_ls *ls;
355 
356 	spin_lock(&lslist_lock);
357 	list_for_each_entry(ls, &lslist, ls_list) {
358 		if (ls->ls_device.minor == minor) {
359 			ls->ls_count++;
360 			goto out;
361 		}
362 	}
363 	ls = NULL;
364  out:
365 	spin_unlock(&lslist_lock);
366 	return ls;
367 }
368 
369 void dlm_put_lockspace(struct dlm_ls *ls)
370 {
371 	spin_lock(&lslist_lock);
372 	ls->ls_count--;
373 	spin_unlock(&lslist_lock);
374 }
375 
376 static void remove_lockspace(struct dlm_ls *ls)
377 {
378 	for (;;) {
379 		spin_lock(&lslist_lock);
380 		if (ls->ls_count == 0) {
381 			WARN_ON(ls->ls_create_count != 0);
382 			list_del(&ls->ls_list);
383 			spin_unlock(&lslist_lock);
384 			return;
385 		}
386 		spin_unlock(&lslist_lock);
387 		ssleep(1);
388 	}
389 }
390 
391 static int threads_start(void)
392 {
393 	int error;
394 
395 	error = dlm_scand_start();
396 	if (error) {
397 		log_print("cannot start dlm_scand thread %d", error);
398 		goto fail;
399 	}
400 
401 	/* Thread for sending/receiving messages for all lockspace's */
402 	error = dlm_lowcomms_start();
403 	if (error) {
404 		log_print("cannot start dlm lowcomms %d", error);
405 		goto scand_fail;
406 	}
407 
408 	return 0;
409 
410  scand_fail:
411 	dlm_scand_stop();
412  fail:
413 	return error;
414 }
415 
416 static void threads_stop(void)
417 {
418 	dlm_scand_stop();
419 	dlm_lowcomms_stop();
420 }
421 
422 static int new_lockspace(const char *name, const char *cluster,
423 			 uint32_t flags, int lvblen,
424 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
425 			 int *ops_result, dlm_lockspace_t **lockspace)
426 {
427 	struct dlm_ls *ls;
428 	int i, size, error;
429 	int do_unreg = 0;
430 	int namelen = strlen(name);
431 
432 	if (namelen > DLM_LOCKSPACE_LEN)
433 		return -EINVAL;
434 
435 	if (!lvblen || (lvblen % 8))
436 		return -EINVAL;
437 
438 	if (!try_module_get(THIS_MODULE))
439 		return -EINVAL;
440 
441 	if (!dlm_user_daemon_available()) {
442 		log_print("dlm user daemon not available");
443 		error = -EUNATCH;
444 		goto out;
445 	}
446 
447 	if (ops && ops_result) {
448 	       	if (!dlm_config.ci_recover_callbacks)
449 			*ops_result = -EOPNOTSUPP;
450 		else
451 			*ops_result = 0;
452 	}
453 
454 	if (dlm_config.ci_recover_callbacks && cluster &&
455 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
456 		log_print("dlm cluster name %s mismatch %s",
457 			  dlm_config.ci_cluster_name, cluster);
458 		error = -EBADR;
459 		goto out;
460 	}
461 
462 	error = 0;
463 
464 	spin_lock(&lslist_lock);
465 	list_for_each_entry(ls, &lslist, ls_list) {
466 		WARN_ON(ls->ls_create_count <= 0);
467 		if (ls->ls_namelen != namelen)
468 			continue;
469 		if (memcmp(ls->ls_name, name, namelen))
470 			continue;
471 		if (flags & DLM_LSFL_NEWEXCL) {
472 			error = -EEXIST;
473 			break;
474 		}
475 		ls->ls_create_count++;
476 		*lockspace = ls;
477 		error = 1;
478 		break;
479 	}
480 	spin_unlock(&lslist_lock);
481 
482 	if (error)
483 		goto out;
484 
485 	error = -ENOMEM;
486 
487 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
488 	if (!ls)
489 		goto out;
490 	memcpy(ls->ls_name, name, namelen);
491 	ls->ls_namelen = namelen;
492 	ls->ls_lvblen = lvblen;
493 	ls->ls_count = 0;
494 	ls->ls_flags = 0;
495 	ls->ls_scan_time = jiffies;
496 
497 	if (ops && dlm_config.ci_recover_callbacks) {
498 		ls->ls_ops = ops;
499 		ls->ls_ops_arg = ops_arg;
500 	}
501 
502 	if (flags & DLM_LSFL_TIMEWARN)
503 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
504 
505 	/* ls_exflags are forced to match among nodes, and we don't
506 	   need to require all nodes to have some flags set */
507 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
508 				    DLM_LSFL_NEWEXCL));
509 
510 	size = dlm_config.ci_rsbtbl_size;
511 	ls->ls_rsbtbl_size = size;
512 
513 	ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
514 	if (!ls->ls_rsbtbl)
515 		goto out_lsfree;
516 	for (i = 0; i < size; i++) {
517 		ls->ls_rsbtbl[i].keep.rb_node = NULL;
518 		ls->ls_rsbtbl[i].toss.rb_node = NULL;
519 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
520 	}
521 
522 	spin_lock_init(&ls->ls_remove_spin);
523 
524 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
525 		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
526 						 GFP_KERNEL);
527 		if (!ls->ls_remove_names[i])
528 			goto out_rsbtbl;
529 	}
530 
531 	idr_init(&ls->ls_lkbidr);
532 	spin_lock_init(&ls->ls_lkbidr_spin);
533 
534 	INIT_LIST_HEAD(&ls->ls_waiters);
535 	mutex_init(&ls->ls_waiters_mutex);
536 	INIT_LIST_HEAD(&ls->ls_orphans);
537 	mutex_init(&ls->ls_orphans_mutex);
538 	INIT_LIST_HEAD(&ls->ls_timeout);
539 	mutex_init(&ls->ls_timeout_mutex);
540 
541 	INIT_LIST_HEAD(&ls->ls_new_rsb);
542 	spin_lock_init(&ls->ls_new_rsb_spin);
543 
544 	INIT_LIST_HEAD(&ls->ls_nodes);
545 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
546 	ls->ls_num_nodes = 0;
547 	ls->ls_low_nodeid = 0;
548 	ls->ls_total_weight = 0;
549 	ls->ls_node_array = NULL;
550 
551 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
552 	ls->ls_stub_rsb.res_ls = ls;
553 
554 	ls->ls_debug_rsb_dentry = NULL;
555 	ls->ls_debug_waiters_dentry = NULL;
556 
557 	init_waitqueue_head(&ls->ls_uevent_wait);
558 	ls->ls_uevent_result = 0;
559 	init_completion(&ls->ls_members_done);
560 	ls->ls_members_result = -1;
561 
562 	mutex_init(&ls->ls_cb_mutex);
563 	INIT_LIST_HEAD(&ls->ls_cb_delay);
564 
565 	ls->ls_recoverd_task = NULL;
566 	mutex_init(&ls->ls_recoverd_active);
567 	spin_lock_init(&ls->ls_recover_lock);
568 	spin_lock_init(&ls->ls_rcom_spin);
569 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
570 	ls->ls_recover_status = 0;
571 	ls->ls_recover_seq = 0;
572 	ls->ls_recover_args = NULL;
573 	init_rwsem(&ls->ls_in_recovery);
574 	init_rwsem(&ls->ls_recv_active);
575 	INIT_LIST_HEAD(&ls->ls_requestqueue);
576 	mutex_init(&ls->ls_requestqueue_mutex);
577 	mutex_init(&ls->ls_clear_proc_locks);
578 
579 	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
580 	if (!ls->ls_recover_buf)
581 		goto out_lkbidr;
582 
583 	ls->ls_slot = 0;
584 	ls->ls_num_slots = 0;
585 	ls->ls_slots_size = 0;
586 	ls->ls_slots = NULL;
587 
588 	INIT_LIST_HEAD(&ls->ls_recover_list);
589 	spin_lock_init(&ls->ls_recover_list_lock);
590 	idr_init(&ls->ls_recover_idr);
591 	spin_lock_init(&ls->ls_recover_idr_lock);
592 	ls->ls_recover_list_count = 0;
593 	ls->ls_local_handle = ls;
594 	init_waitqueue_head(&ls->ls_wait_general);
595 	INIT_LIST_HEAD(&ls->ls_root_list);
596 	init_rwsem(&ls->ls_root_sem);
597 
598 	spin_lock(&lslist_lock);
599 	ls->ls_create_count = 1;
600 	list_add(&ls->ls_list, &lslist);
601 	spin_unlock(&lslist_lock);
602 
603 	if (flags & DLM_LSFL_FS) {
604 		error = dlm_callback_start(ls);
605 		if (error) {
606 			log_error(ls, "can't start dlm_callback %d", error);
607 			goto out_delist;
608 		}
609 	}
610 
611 	init_waitqueue_head(&ls->ls_recover_lock_wait);
612 
613 	/*
614 	 * Once started, dlm_recoverd first looks for ls in lslist, then
615 	 * initializes ls_in_recovery as locked in "down" mode.  We need
616 	 * to wait for the wakeup from dlm_recoverd because in_recovery
617 	 * has to start out in down mode.
618 	 */
619 
620 	error = dlm_recoverd_start(ls);
621 	if (error) {
622 		log_error(ls, "can't start dlm_recoverd %d", error);
623 		goto out_callback;
624 	}
625 
626 	wait_event(ls->ls_recover_lock_wait,
627 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
628 
629 	ls->ls_kobj.kset = dlm_kset;
630 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
631 				     "%s", ls->ls_name);
632 	if (error)
633 		goto out_recoverd;
634 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
635 
636 	/* let kobject handle freeing of ls if there's an error */
637 	do_unreg = 1;
638 
639 	/* This uevent triggers dlm_controld in userspace to add us to the
640 	   group of nodes that are members of this lockspace (managed by the
641 	   cluster infrastructure.)  Once it's done that, it tells us who the
642 	   current lockspace members are (via configfs) and then tells the
643 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
644 
645 	error = do_uevent(ls, 1);
646 	if (error)
647 		goto out_recoverd;
648 
649 	wait_for_completion(&ls->ls_members_done);
650 	error = ls->ls_members_result;
651 	if (error)
652 		goto out_members;
653 
654 	dlm_create_debug_file(ls);
655 
656 	log_rinfo(ls, "join complete");
657 	*lockspace = ls;
658 	return 0;
659 
660  out_members:
661 	do_uevent(ls, 0);
662 	dlm_clear_members(ls);
663 	kfree(ls->ls_node_array);
664  out_recoverd:
665 	dlm_recoverd_stop(ls);
666  out_callback:
667 	dlm_callback_stop(ls);
668  out_delist:
669 	spin_lock(&lslist_lock);
670 	list_del(&ls->ls_list);
671 	spin_unlock(&lslist_lock);
672 	idr_destroy(&ls->ls_recover_idr);
673 	kfree(ls->ls_recover_buf);
674  out_lkbidr:
675 	idr_destroy(&ls->ls_lkbidr);
676 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
677 		if (ls->ls_remove_names[i])
678 			kfree(ls->ls_remove_names[i]);
679 	}
680  out_rsbtbl:
681 	vfree(ls->ls_rsbtbl);
682  out_lsfree:
683 	if (do_unreg)
684 		kobject_put(&ls->ls_kobj);
685 	else
686 		kfree(ls);
687  out:
688 	module_put(THIS_MODULE);
689 	return error;
690 }
691 
692 int dlm_new_lockspace(const char *name, const char *cluster,
693 		      uint32_t flags, int lvblen,
694 		      const struct dlm_lockspace_ops *ops, void *ops_arg,
695 		      int *ops_result, dlm_lockspace_t **lockspace)
696 {
697 	int error = 0;
698 
699 	mutex_lock(&ls_lock);
700 	if (!ls_count)
701 		error = threads_start();
702 	if (error)
703 		goto out;
704 
705 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
706 			      ops_result, lockspace);
707 	if (!error)
708 		ls_count++;
709 	if (error > 0)
710 		error = 0;
711 	if (!ls_count)
712 		threads_stop();
713  out:
714 	mutex_unlock(&ls_lock);
715 	return error;
716 }
717 
718 static int lkb_idr_is_local(int id, void *p, void *data)
719 {
720 	struct dlm_lkb *lkb = p;
721 
722 	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
723 }
724 
725 static int lkb_idr_is_any(int id, void *p, void *data)
726 {
727 	return 1;
728 }
729 
730 static int lkb_idr_free(int id, void *p, void *data)
731 {
732 	struct dlm_lkb *lkb = p;
733 
734 	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
735 		dlm_free_lvb(lkb->lkb_lvbptr);
736 
737 	dlm_free_lkb(lkb);
738 	return 0;
739 }
740 
741 /* NOTE: We check the lkbidr here rather than the resource table.
742    This is because there may be LKBs queued as ASTs that have been unlinked
743    from their RSBs and are pending deletion once the AST has been delivered */
744 
745 static int lockspace_busy(struct dlm_ls *ls, int force)
746 {
747 	int rv;
748 
749 	spin_lock(&ls->ls_lkbidr_spin);
750 	if (force == 0) {
751 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
752 	} else if (force == 1) {
753 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
754 	} else {
755 		rv = 0;
756 	}
757 	spin_unlock(&ls->ls_lkbidr_spin);
758 	return rv;
759 }
760 
761 static int release_lockspace(struct dlm_ls *ls, int force)
762 {
763 	struct dlm_rsb *rsb;
764 	struct rb_node *n;
765 	int i, busy, rv;
766 
767 	busy = lockspace_busy(ls, force);
768 
769 	spin_lock(&lslist_lock);
770 	if (ls->ls_create_count == 1) {
771 		if (busy) {
772 			rv = -EBUSY;
773 		} else {
774 			/* remove_lockspace takes ls off lslist */
775 			ls->ls_create_count = 0;
776 			rv = 0;
777 		}
778 	} else if (ls->ls_create_count > 1) {
779 		rv = --ls->ls_create_count;
780 	} else {
781 		rv = -EINVAL;
782 	}
783 	spin_unlock(&lslist_lock);
784 
785 	if (rv) {
786 		log_debug(ls, "release_lockspace no remove %d", rv);
787 		return rv;
788 	}
789 
790 	dlm_device_deregister(ls);
791 
792 	if (force < 3 && dlm_user_daemon_available())
793 		do_uevent(ls, 0);
794 
795 	dlm_recoverd_stop(ls);
796 
797 	dlm_callback_stop(ls);
798 
799 	remove_lockspace(ls);
800 
801 	dlm_delete_debug_file(ls);
802 
803 	kfree(ls->ls_recover_buf);
804 
805 	/*
806 	 * Free all lkb's in idr
807 	 */
808 
809 	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
810 	idr_destroy(&ls->ls_lkbidr);
811 
812 	/*
813 	 * Free all rsb's on rsbtbl[] lists
814 	 */
815 
816 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
817 		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
818 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
819 			rb_erase(n, &ls->ls_rsbtbl[i].keep);
820 			dlm_free_rsb(rsb);
821 		}
822 
823 		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
824 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
825 			rb_erase(n, &ls->ls_rsbtbl[i].toss);
826 			dlm_free_rsb(rsb);
827 		}
828 	}
829 
830 	vfree(ls->ls_rsbtbl);
831 
832 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
833 		kfree(ls->ls_remove_names[i]);
834 
835 	while (!list_empty(&ls->ls_new_rsb)) {
836 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
837 				       res_hashchain);
838 		list_del(&rsb->res_hashchain);
839 		dlm_free_rsb(rsb);
840 	}
841 
842 	/*
843 	 * Free structures on any other lists
844 	 */
845 
846 	dlm_purge_requestqueue(ls);
847 	kfree(ls->ls_recover_args);
848 	dlm_clear_members(ls);
849 	dlm_clear_members_gone(ls);
850 	kfree(ls->ls_node_array);
851 	log_rinfo(ls, "release_lockspace final free");
852 	kobject_put(&ls->ls_kobj);
853 	/* The ls structure will be freed when the kobject is done with */
854 
855 	module_put(THIS_MODULE);
856 	return 0;
857 }
858 
859 /*
860  * Called when a system has released all its locks and is not going to use the
861  * lockspace any longer.  We free everything we're managing for this lockspace.
862  * Remaining nodes will go through the recovery process as if we'd died.  The
863  * lockspace must continue to function as usual, participating in recoveries,
864  * until this returns.
865  *
866  * Force has 4 possible values:
867  * 0 - don't destroy locksapce if it has any LKBs
868  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
869  * 2 - destroy lockspace regardless of LKBs
870  * 3 - destroy lockspace as part of a forced shutdown
871  */
872 
873 int dlm_release_lockspace(void *lockspace, int force)
874 {
875 	struct dlm_ls *ls;
876 	int error;
877 
878 	ls = dlm_find_lockspace_local(lockspace);
879 	if (!ls)
880 		return -EINVAL;
881 	dlm_put_lockspace(ls);
882 
883 	mutex_lock(&ls_lock);
884 	error = release_lockspace(ls, force);
885 	if (!error)
886 		ls_count--;
887 	if (!ls_count)
888 		threads_stop();
889 	mutex_unlock(&ls_lock);
890 
891 	return error;
892 }
893 
894 void dlm_stop_lockspaces(void)
895 {
896 	struct dlm_ls *ls;
897 	int count;
898 
899  restart:
900 	count = 0;
901 	spin_lock(&lslist_lock);
902 	list_for_each_entry(ls, &lslist, ls_list) {
903 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
904 			count++;
905 			continue;
906 		}
907 		spin_unlock(&lslist_lock);
908 		log_error(ls, "no userland control daemon, stopping lockspace");
909 		dlm_ls_stop(ls);
910 		goto restart;
911 	}
912 	spin_unlock(&lslist_lock);
913 
914 	if (count)
915 		log_print("dlm user daemon left %d lockspaces", count);
916 }
917 
918