xref: /openbmc/linux/fs/dlm/lockspace.c (revision e7bae9bb)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include <linux/module.h>
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 static struct task_struct *	scand_task;
33 
34 
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37 	ssize_t ret = len;
38 	int n;
39 	int rc = kstrtoint(buf, 0, &n);
40 
41 	if (rc)
42 		return rc;
43 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
44 	if (!ls)
45 		return -EINVAL;
46 
47 	switch (n) {
48 	case 0:
49 		dlm_ls_stop(ls);
50 		break;
51 	case 1:
52 		dlm_ls_start(ls);
53 		break;
54 	default:
55 		ret = -EINVAL;
56 	}
57 	dlm_put_lockspace(ls);
58 	return ret;
59 }
60 
61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62 {
63 	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64 
65 	if (rc)
66 		return rc;
67 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68 	wake_up(&ls->ls_uevent_wait);
69 	return len;
70 }
71 
72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73 {
74 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75 }
76 
77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78 {
79 	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80 
81 	if (rc)
82 		return rc;
83 	return len;
84 }
85 
86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87 {
88 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89 }
90 
91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92 {
93 	int val;
94 	int rc = kstrtoint(buf, 0, &val);
95 
96 	if (rc)
97 		return rc;
98 	if (val == 1)
99 		set_bit(LSFL_NODIR, &ls->ls_flags);
100 	return len;
101 }
102 
103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104 {
105 	uint32_t status = dlm_recover_status(ls);
106 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
107 }
108 
109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110 {
111 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112 }
113 
114 struct dlm_attr {
115 	struct attribute attr;
116 	ssize_t (*show)(struct dlm_ls *, char *);
117 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118 };
119 
120 static struct dlm_attr dlm_attr_control = {
121 	.attr  = {.name = "control", .mode = S_IWUSR},
122 	.store = dlm_control_store
123 };
124 
125 static struct dlm_attr dlm_attr_event = {
126 	.attr  = {.name = "event_done", .mode = S_IWUSR},
127 	.store = dlm_event_store
128 };
129 
130 static struct dlm_attr dlm_attr_id = {
131 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132 	.show  = dlm_id_show,
133 	.store = dlm_id_store
134 };
135 
136 static struct dlm_attr dlm_attr_nodir = {
137 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138 	.show  = dlm_nodir_show,
139 	.store = dlm_nodir_store
140 };
141 
142 static struct dlm_attr dlm_attr_recover_status = {
143 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
144 	.show  = dlm_recover_status_show
145 };
146 
147 static struct dlm_attr dlm_attr_recover_nodeid = {
148 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149 	.show  = dlm_recover_nodeid_show
150 };
151 
152 static struct attribute *dlm_attrs[] = {
153 	&dlm_attr_control.attr,
154 	&dlm_attr_event.attr,
155 	&dlm_attr_id.attr,
156 	&dlm_attr_nodir.attr,
157 	&dlm_attr_recover_status.attr,
158 	&dlm_attr_recover_nodeid.attr,
159 	NULL,
160 };
161 ATTRIBUTE_GROUPS(dlm);
162 
163 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
164 			     char *buf)
165 {
166 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
167 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
168 	return a->show ? a->show(ls, buf) : 0;
169 }
170 
171 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
172 			      const char *buf, size_t len)
173 {
174 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
175 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
176 	return a->store ? a->store(ls, buf, len) : len;
177 }
178 
179 static void lockspace_kobj_release(struct kobject *k)
180 {
181 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
182 	kfree(ls);
183 }
184 
185 static const struct sysfs_ops dlm_attr_ops = {
186 	.show  = dlm_attr_show,
187 	.store = dlm_attr_store,
188 };
189 
190 static struct kobj_type dlm_ktype = {
191 	.default_groups = dlm_groups,
192 	.sysfs_ops     = &dlm_attr_ops,
193 	.release       = lockspace_kobj_release,
194 };
195 
196 static struct kset *dlm_kset;
197 
198 static int do_uevent(struct dlm_ls *ls, int in)
199 {
200 	if (in)
201 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
202 	else
203 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
204 
205 	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
206 
207 	/* dlm_controld will see the uevent, do the necessary group management
208 	   and then write to sysfs to wake us */
209 
210 	wait_event(ls->ls_uevent_wait,
211 		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
212 
213 	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
214 
215 	return ls->ls_uevent_result;
216 }
217 
218 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
219 		      struct kobj_uevent_env *env)
220 {
221 	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
222 
223 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
224 	return 0;
225 }
226 
227 static const struct kset_uevent_ops dlm_uevent_ops = {
228 	.uevent = dlm_uevent,
229 };
230 
231 int __init dlm_lockspace_init(void)
232 {
233 	ls_count = 0;
234 	mutex_init(&ls_lock);
235 	INIT_LIST_HEAD(&lslist);
236 	spin_lock_init(&lslist_lock);
237 
238 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
239 	if (!dlm_kset) {
240 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
241 		return -ENOMEM;
242 	}
243 	return 0;
244 }
245 
246 void dlm_lockspace_exit(void)
247 {
248 	kset_unregister(dlm_kset);
249 }
250 
251 static struct dlm_ls *find_ls_to_scan(void)
252 {
253 	struct dlm_ls *ls;
254 
255 	spin_lock(&lslist_lock);
256 	list_for_each_entry(ls, &lslist, ls_list) {
257 		if (time_after_eq(jiffies, ls->ls_scan_time +
258 					    dlm_config.ci_scan_secs * HZ)) {
259 			spin_unlock(&lslist_lock);
260 			return ls;
261 		}
262 	}
263 	spin_unlock(&lslist_lock);
264 	return NULL;
265 }
266 
267 static int dlm_scand(void *data)
268 {
269 	struct dlm_ls *ls;
270 
271 	while (!kthread_should_stop()) {
272 		ls = find_ls_to_scan();
273 		if (ls) {
274 			if (dlm_lock_recovery_try(ls)) {
275 				ls->ls_scan_time = jiffies;
276 				dlm_scan_rsbs(ls);
277 				dlm_scan_timeout(ls);
278 				dlm_scan_waiters(ls);
279 				dlm_unlock_recovery(ls);
280 			} else {
281 				ls->ls_scan_time += HZ;
282 			}
283 			continue;
284 		}
285 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
286 	}
287 	return 0;
288 }
289 
290 static int dlm_scand_start(void)
291 {
292 	struct task_struct *p;
293 	int error = 0;
294 
295 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
296 	if (IS_ERR(p))
297 		error = PTR_ERR(p);
298 	else
299 		scand_task = p;
300 	return error;
301 }
302 
303 static void dlm_scand_stop(void)
304 {
305 	kthread_stop(scand_task);
306 }
307 
308 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
309 {
310 	struct dlm_ls *ls;
311 
312 	spin_lock(&lslist_lock);
313 
314 	list_for_each_entry(ls, &lslist, ls_list) {
315 		if (ls->ls_global_id == id) {
316 			ls->ls_count++;
317 			goto out;
318 		}
319 	}
320 	ls = NULL;
321  out:
322 	spin_unlock(&lslist_lock);
323 	return ls;
324 }
325 
326 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
327 {
328 	struct dlm_ls *ls;
329 
330 	spin_lock(&lslist_lock);
331 	list_for_each_entry(ls, &lslist, ls_list) {
332 		if (ls->ls_local_handle == lockspace) {
333 			ls->ls_count++;
334 			goto out;
335 		}
336 	}
337 	ls = NULL;
338  out:
339 	spin_unlock(&lslist_lock);
340 	return ls;
341 }
342 
343 struct dlm_ls *dlm_find_lockspace_device(int minor)
344 {
345 	struct dlm_ls *ls;
346 
347 	spin_lock(&lslist_lock);
348 	list_for_each_entry(ls, &lslist, ls_list) {
349 		if (ls->ls_device.minor == minor) {
350 			ls->ls_count++;
351 			goto out;
352 		}
353 	}
354 	ls = NULL;
355  out:
356 	spin_unlock(&lslist_lock);
357 	return ls;
358 }
359 
360 void dlm_put_lockspace(struct dlm_ls *ls)
361 {
362 	spin_lock(&lslist_lock);
363 	ls->ls_count--;
364 	spin_unlock(&lslist_lock);
365 }
366 
367 static void remove_lockspace(struct dlm_ls *ls)
368 {
369 	for (;;) {
370 		spin_lock(&lslist_lock);
371 		if (ls->ls_count == 0) {
372 			WARN_ON(ls->ls_create_count != 0);
373 			list_del(&ls->ls_list);
374 			spin_unlock(&lslist_lock);
375 			return;
376 		}
377 		spin_unlock(&lslist_lock);
378 		ssleep(1);
379 	}
380 }
381 
382 static int threads_start(void)
383 {
384 	int error;
385 
386 	error = dlm_scand_start();
387 	if (error) {
388 		log_print("cannot start dlm_scand thread %d", error);
389 		goto fail;
390 	}
391 
392 	/* Thread for sending/receiving messages for all lockspace's */
393 	error = dlm_lowcomms_start();
394 	if (error) {
395 		log_print("cannot start dlm lowcomms %d", error);
396 		goto scand_fail;
397 	}
398 
399 	return 0;
400 
401  scand_fail:
402 	dlm_scand_stop();
403  fail:
404 	return error;
405 }
406 
407 static void threads_stop(void)
408 {
409 	dlm_scand_stop();
410 	dlm_lowcomms_stop();
411 }
412 
413 static int new_lockspace(const char *name, const char *cluster,
414 			 uint32_t flags, int lvblen,
415 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
416 			 int *ops_result, dlm_lockspace_t **lockspace)
417 {
418 	struct dlm_ls *ls;
419 	int i, size, error;
420 	int do_unreg = 0;
421 	int namelen = strlen(name);
422 
423 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
424 		return -EINVAL;
425 
426 	if (!lvblen || (lvblen % 8))
427 		return -EINVAL;
428 
429 	if (!try_module_get(THIS_MODULE))
430 		return -EINVAL;
431 
432 	if (!dlm_user_daemon_available()) {
433 		log_print("dlm user daemon not available");
434 		error = -EUNATCH;
435 		goto out;
436 	}
437 
438 	if (ops && ops_result) {
439 	       	if (!dlm_config.ci_recover_callbacks)
440 			*ops_result = -EOPNOTSUPP;
441 		else
442 			*ops_result = 0;
443 	}
444 
445 	if (!cluster)
446 		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
447 			  dlm_config.ci_cluster_name);
448 
449 	if (dlm_config.ci_recover_callbacks && cluster &&
450 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
451 		log_print("dlm cluster name '%s' does not match "
452 			  "the application cluster name '%s'",
453 			  dlm_config.ci_cluster_name, cluster);
454 		error = -EBADR;
455 		goto out;
456 	}
457 
458 	error = 0;
459 
460 	spin_lock(&lslist_lock);
461 	list_for_each_entry(ls, &lslist, ls_list) {
462 		WARN_ON(ls->ls_create_count <= 0);
463 		if (ls->ls_namelen != namelen)
464 			continue;
465 		if (memcmp(ls->ls_name, name, namelen))
466 			continue;
467 		if (flags & DLM_LSFL_NEWEXCL) {
468 			error = -EEXIST;
469 			break;
470 		}
471 		ls->ls_create_count++;
472 		*lockspace = ls;
473 		error = 1;
474 		break;
475 	}
476 	spin_unlock(&lslist_lock);
477 
478 	if (error)
479 		goto out;
480 
481 	error = -ENOMEM;
482 
483 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
484 	if (!ls)
485 		goto out;
486 	memcpy(ls->ls_name, name, namelen);
487 	ls->ls_namelen = namelen;
488 	ls->ls_lvblen = lvblen;
489 	ls->ls_count = 0;
490 	ls->ls_flags = 0;
491 	ls->ls_scan_time = jiffies;
492 
493 	if (ops && dlm_config.ci_recover_callbacks) {
494 		ls->ls_ops = ops;
495 		ls->ls_ops_arg = ops_arg;
496 	}
497 
498 	if (flags & DLM_LSFL_TIMEWARN)
499 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
500 
501 	/* ls_exflags are forced to match among nodes, and we don't
502 	   need to require all nodes to have some flags set */
503 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
504 				    DLM_LSFL_NEWEXCL));
505 
506 	size = dlm_config.ci_rsbtbl_size;
507 	ls->ls_rsbtbl_size = size;
508 
509 	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
510 	if (!ls->ls_rsbtbl)
511 		goto out_lsfree;
512 	for (i = 0; i < size; i++) {
513 		ls->ls_rsbtbl[i].keep.rb_node = NULL;
514 		ls->ls_rsbtbl[i].toss.rb_node = NULL;
515 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
516 	}
517 
518 	spin_lock_init(&ls->ls_remove_spin);
519 
520 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
521 		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
522 						 GFP_KERNEL);
523 		if (!ls->ls_remove_names[i])
524 			goto out_rsbtbl;
525 	}
526 
527 	idr_init(&ls->ls_lkbidr);
528 	spin_lock_init(&ls->ls_lkbidr_spin);
529 
530 	INIT_LIST_HEAD(&ls->ls_waiters);
531 	mutex_init(&ls->ls_waiters_mutex);
532 	INIT_LIST_HEAD(&ls->ls_orphans);
533 	mutex_init(&ls->ls_orphans_mutex);
534 	INIT_LIST_HEAD(&ls->ls_timeout);
535 	mutex_init(&ls->ls_timeout_mutex);
536 
537 	INIT_LIST_HEAD(&ls->ls_new_rsb);
538 	spin_lock_init(&ls->ls_new_rsb_spin);
539 
540 	INIT_LIST_HEAD(&ls->ls_nodes);
541 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
542 	ls->ls_num_nodes = 0;
543 	ls->ls_low_nodeid = 0;
544 	ls->ls_total_weight = 0;
545 	ls->ls_node_array = NULL;
546 
547 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
548 	ls->ls_stub_rsb.res_ls = ls;
549 
550 	ls->ls_debug_rsb_dentry = NULL;
551 	ls->ls_debug_waiters_dentry = NULL;
552 
553 	init_waitqueue_head(&ls->ls_uevent_wait);
554 	ls->ls_uevent_result = 0;
555 	init_completion(&ls->ls_members_done);
556 	ls->ls_members_result = -1;
557 
558 	mutex_init(&ls->ls_cb_mutex);
559 	INIT_LIST_HEAD(&ls->ls_cb_delay);
560 
561 	ls->ls_recoverd_task = NULL;
562 	mutex_init(&ls->ls_recoverd_active);
563 	spin_lock_init(&ls->ls_recover_lock);
564 	spin_lock_init(&ls->ls_rcom_spin);
565 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
566 	ls->ls_recover_status = 0;
567 	ls->ls_recover_seq = 0;
568 	ls->ls_recover_args = NULL;
569 	init_rwsem(&ls->ls_in_recovery);
570 	init_rwsem(&ls->ls_recv_active);
571 	INIT_LIST_HEAD(&ls->ls_requestqueue);
572 	mutex_init(&ls->ls_requestqueue_mutex);
573 	mutex_init(&ls->ls_clear_proc_locks);
574 
575 	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
576 	if (!ls->ls_recover_buf)
577 		goto out_lkbidr;
578 
579 	ls->ls_slot = 0;
580 	ls->ls_num_slots = 0;
581 	ls->ls_slots_size = 0;
582 	ls->ls_slots = NULL;
583 
584 	INIT_LIST_HEAD(&ls->ls_recover_list);
585 	spin_lock_init(&ls->ls_recover_list_lock);
586 	idr_init(&ls->ls_recover_idr);
587 	spin_lock_init(&ls->ls_recover_idr_lock);
588 	ls->ls_recover_list_count = 0;
589 	ls->ls_local_handle = ls;
590 	init_waitqueue_head(&ls->ls_wait_general);
591 	INIT_LIST_HEAD(&ls->ls_root_list);
592 	init_rwsem(&ls->ls_root_sem);
593 
594 	spin_lock(&lslist_lock);
595 	ls->ls_create_count = 1;
596 	list_add(&ls->ls_list, &lslist);
597 	spin_unlock(&lslist_lock);
598 
599 	if (flags & DLM_LSFL_FS) {
600 		error = dlm_callback_start(ls);
601 		if (error) {
602 			log_error(ls, "can't start dlm_callback %d", error);
603 			goto out_delist;
604 		}
605 	}
606 
607 	init_waitqueue_head(&ls->ls_recover_lock_wait);
608 
609 	/*
610 	 * Once started, dlm_recoverd first looks for ls in lslist, then
611 	 * initializes ls_in_recovery as locked in "down" mode.  We need
612 	 * to wait for the wakeup from dlm_recoverd because in_recovery
613 	 * has to start out in down mode.
614 	 */
615 
616 	error = dlm_recoverd_start(ls);
617 	if (error) {
618 		log_error(ls, "can't start dlm_recoverd %d", error);
619 		goto out_callback;
620 	}
621 
622 	wait_event(ls->ls_recover_lock_wait,
623 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
624 
625 	/* let kobject handle freeing of ls if there's an error */
626 	do_unreg = 1;
627 
628 	ls->ls_kobj.kset = dlm_kset;
629 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
630 				     "%s", ls->ls_name);
631 	if (error)
632 		goto out_recoverd;
633 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
634 
635 	/* This uevent triggers dlm_controld in userspace to add us to the
636 	   group of nodes that are members of this lockspace (managed by the
637 	   cluster infrastructure.)  Once it's done that, it tells us who the
638 	   current lockspace members are (via configfs) and then tells the
639 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
640 
641 	error = do_uevent(ls, 1);
642 	if (error)
643 		goto out_recoverd;
644 
645 	wait_for_completion(&ls->ls_members_done);
646 	error = ls->ls_members_result;
647 	if (error)
648 		goto out_members;
649 
650 	dlm_create_debug_file(ls);
651 
652 	log_rinfo(ls, "join complete");
653 	*lockspace = ls;
654 	return 0;
655 
656  out_members:
657 	do_uevent(ls, 0);
658 	dlm_clear_members(ls);
659 	kfree(ls->ls_node_array);
660  out_recoverd:
661 	dlm_recoverd_stop(ls);
662  out_callback:
663 	dlm_callback_stop(ls);
664  out_delist:
665 	spin_lock(&lslist_lock);
666 	list_del(&ls->ls_list);
667 	spin_unlock(&lslist_lock);
668 	idr_destroy(&ls->ls_recover_idr);
669 	kfree(ls->ls_recover_buf);
670  out_lkbidr:
671 	idr_destroy(&ls->ls_lkbidr);
672  out_rsbtbl:
673 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
674 		kfree(ls->ls_remove_names[i]);
675 	vfree(ls->ls_rsbtbl);
676  out_lsfree:
677 	if (do_unreg)
678 		kobject_put(&ls->ls_kobj);
679 	else
680 		kfree(ls);
681  out:
682 	module_put(THIS_MODULE);
683 	return error;
684 }
685 
686 int dlm_new_lockspace(const char *name, const char *cluster,
687 		      uint32_t flags, int lvblen,
688 		      const struct dlm_lockspace_ops *ops, void *ops_arg,
689 		      int *ops_result, dlm_lockspace_t **lockspace)
690 {
691 	int error = 0;
692 
693 	mutex_lock(&ls_lock);
694 	if (!ls_count)
695 		error = threads_start();
696 	if (error)
697 		goto out;
698 
699 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
700 			      ops_result, lockspace);
701 	if (!error)
702 		ls_count++;
703 	if (error > 0)
704 		error = 0;
705 	if (!ls_count)
706 		threads_stop();
707  out:
708 	mutex_unlock(&ls_lock);
709 	return error;
710 }
711 
712 static int lkb_idr_is_local(int id, void *p, void *data)
713 {
714 	struct dlm_lkb *lkb = p;
715 
716 	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
717 }
718 
719 static int lkb_idr_is_any(int id, void *p, void *data)
720 {
721 	return 1;
722 }
723 
724 static int lkb_idr_free(int id, void *p, void *data)
725 {
726 	struct dlm_lkb *lkb = p;
727 
728 	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
729 		dlm_free_lvb(lkb->lkb_lvbptr);
730 
731 	dlm_free_lkb(lkb);
732 	return 0;
733 }
734 
735 /* NOTE: We check the lkbidr here rather than the resource table.
736    This is because there may be LKBs queued as ASTs that have been unlinked
737    from their RSBs and are pending deletion once the AST has been delivered */
738 
739 static int lockspace_busy(struct dlm_ls *ls, int force)
740 {
741 	int rv;
742 
743 	spin_lock(&ls->ls_lkbidr_spin);
744 	if (force == 0) {
745 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
746 	} else if (force == 1) {
747 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
748 	} else {
749 		rv = 0;
750 	}
751 	spin_unlock(&ls->ls_lkbidr_spin);
752 	return rv;
753 }
754 
755 static int release_lockspace(struct dlm_ls *ls, int force)
756 {
757 	struct dlm_rsb *rsb;
758 	struct rb_node *n;
759 	int i, busy, rv;
760 
761 	busy = lockspace_busy(ls, force);
762 
763 	spin_lock(&lslist_lock);
764 	if (ls->ls_create_count == 1) {
765 		if (busy) {
766 			rv = -EBUSY;
767 		} else {
768 			/* remove_lockspace takes ls off lslist */
769 			ls->ls_create_count = 0;
770 			rv = 0;
771 		}
772 	} else if (ls->ls_create_count > 1) {
773 		rv = --ls->ls_create_count;
774 	} else {
775 		rv = -EINVAL;
776 	}
777 	spin_unlock(&lslist_lock);
778 
779 	if (rv) {
780 		log_debug(ls, "release_lockspace no remove %d", rv);
781 		return rv;
782 	}
783 
784 	dlm_device_deregister(ls);
785 
786 	if (force < 3 && dlm_user_daemon_available())
787 		do_uevent(ls, 0);
788 
789 	dlm_recoverd_stop(ls);
790 
791 	dlm_callback_stop(ls);
792 
793 	remove_lockspace(ls);
794 
795 	dlm_delete_debug_file(ls);
796 
797 	idr_destroy(&ls->ls_recover_idr);
798 	kfree(ls->ls_recover_buf);
799 
800 	/*
801 	 * Free all lkb's in idr
802 	 */
803 
804 	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
805 	idr_destroy(&ls->ls_lkbidr);
806 
807 	/*
808 	 * Free all rsb's on rsbtbl[] lists
809 	 */
810 
811 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
812 		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
813 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
814 			rb_erase(n, &ls->ls_rsbtbl[i].keep);
815 			dlm_free_rsb(rsb);
816 		}
817 
818 		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
819 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
820 			rb_erase(n, &ls->ls_rsbtbl[i].toss);
821 			dlm_free_rsb(rsb);
822 		}
823 	}
824 
825 	vfree(ls->ls_rsbtbl);
826 
827 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
828 		kfree(ls->ls_remove_names[i]);
829 
830 	while (!list_empty(&ls->ls_new_rsb)) {
831 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
832 				       res_hashchain);
833 		list_del(&rsb->res_hashchain);
834 		dlm_free_rsb(rsb);
835 	}
836 
837 	/*
838 	 * Free structures on any other lists
839 	 */
840 
841 	dlm_purge_requestqueue(ls);
842 	kfree(ls->ls_recover_args);
843 	dlm_clear_members(ls);
844 	dlm_clear_members_gone(ls);
845 	kfree(ls->ls_node_array);
846 	log_rinfo(ls, "release_lockspace final free");
847 	kobject_put(&ls->ls_kobj);
848 	/* The ls structure will be freed when the kobject is done with */
849 
850 	module_put(THIS_MODULE);
851 	return 0;
852 }
853 
854 /*
855  * Called when a system has released all its locks and is not going to use the
856  * lockspace any longer.  We free everything we're managing for this lockspace.
857  * Remaining nodes will go through the recovery process as if we'd died.  The
858  * lockspace must continue to function as usual, participating in recoveries,
859  * until this returns.
860  *
861  * Force has 4 possible values:
862  * 0 - don't destroy locksapce if it has any LKBs
863  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
864  * 2 - destroy lockspace regardless of LKBs
865  * 3 - destroy lockspace as part of a forced shutdown
866  */
867 
868 int dlm_release_lockspace(void *lockspace, int force)
869 {
870 	struct dlm_ls *ls;
871 	int error;
872 
873 	ls = dlm_find_lockspace_local(lockspace);
874 	if (!ls)
875 		return -EINVAL;
876 	dlm_put_lockspace(ls);
877 
878 	mutex_lock(&ls_lock);
879 	error = release_lockspace(ls, force);
880 	if (!error)
881 		ls_count--;
882 	if (!ls_count)
883 		threads_stop();
884 	mutex_unlock(&ls_lock);
885 
886 	return error;
887 }
888 
889 void dlm_stop_lockspaces(void)
890 {
891 	struct dlm_ls *ls;
892 	int count;
893 
894  restart:
895 	count = 0;
896 	spin_lock(&lslist_lock);
897 	list_for_each_entry(ls, &lslist, ls_list) {
898 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
899 			count++;
900 			continue;
901 		}
902 		spin_unlock(&lslist_lock);
903 		log_error(ls, "no userland control daemon, stopping lockspace");
904 		dlm_ls_stop(ls);
905 		goto restart;
906 	}
907 	spin_unlock(&lslist_lock);
908 
909 	if (count)
910 		log_print("dlm user daemon left %d lockspaces", count);
911 }
912 
913