xref: /openbmc/linux/fs/dlm/lockspace.c (revision a1dff44b)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include <linux/module.h>
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 static struct task_struct *	scand_task;
33 
34 
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37 	ssize_t ret = len;
38 	int n;
39 	int rc = kstrtoint(buf, 0, &n);
40 
41 	if (rc)
42 		return rc;
43 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
44 	if (!ls)
45 		return -EINVAL;
46 
47 	switch (n) {
48 	case 0:
49 		dlm_ls_stop(ls);
50 		break;
51 	case 1:
52 		dlm_ls_start(ls);
53 		break;
54 	default:
55 		ret = -EINVAL;
56 	}
57 	dlm_put_lockspace(ls);
58 	return ret;
59 }
60 
61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62 {
63 	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64 
65 	if (rc)
66 		return rc;
67 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68 	wake_up(&ls->ls_uevent_wait);
69 	return len;
70 }
71 
72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73 {
74 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75 }
76 
77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78 {
79 	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80 
81 	if (rc)
82 		return rc;
83 	return len;
84 }
85 
86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87 {
88 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89 }
90 
91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92 {
93 	int val;
94 	int rc = kstrtoint(buf, 0, &val);
95 
96 	if (rc)
97 		return rc;
98 	if (val == 1)
99 		set_bit(LSFL_NODIR, &ls->ls_flags);
100 	return len;
101 }
102 
103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104 {
105 	uint32_t status = dlm_recover_status(ls);
106 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
107 }
108 
109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110 {
111 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112 }
113 
114 struct dlm_attr {
115 	struct attribute attr;
116 	ssize_t (*show)(struct dlm_ls *, char *);
117 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118 };
119 
120 static struct dlm_attr dlm_attr_control = {
121 	.attr  = {.name = "control", .mode = S_IWUSR},
122 	.store = dlm_control_store
123 };
124 
125 static struct dlm_attr dlm_attr_event = {
126 	.attr  = {.name = "event_done", .mode = S_IWUSR},
127 	.store = dlm_event_store
128 };
129 
130 static struct dlm_attr dlm_attr_id = {
131 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132 	.show  = dlm_id_show,
133 	.store = dlm_id_store
134 };
135 
136 static struct dlm_attr dlm_attr_nodir = {
137 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138 	.show  = dlm_nodir_show,
139 	.store = dlm_nodir_store
140 };
141 
142 static struct dlm_attr dlm_attr_recover_status = {
143 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
144 	.show  = dlm_recover_status_show
145 };
146 
147 static struct dlm_attr dlm_attr_recover_nodeid = {
148 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149 	.show  = dlm_recover_nodeid_show
150 };
151 
152 static struct attribute *dlm_attrs[] = {
153 	&dlm_attr_control.attr,
154 	&dlm_attr_event.attr,
155 	&dlm_attr_id.attr,
156 	&dlm_attr_nodir.attr,
157 	&dlm_attr_recover_status.attr,
158 	&dlm_attr_recover_nodeid.attr,
159 	NULL,
160 };
161 ATTRIBUTE_GROUPS(dlm);
162 
163 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
164 			     char *buf)
165 {
166 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
167 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
168 	return a->show ? a->show(ls, buf) : 0;
169 }
170 
171 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
172 			      const char *buf, size_t len)
173 {
174 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
175 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
176 	return a->store ? a->store(ls, buf, len) : len;
177 }
178 
179 static void lockspace_kobj_release(struct kobject *k)
180 {
181 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
182 	kfree(ls);
183 }
184 
185 static const struct sysfs_ops dlm_attr_ops = {
186 	.show  = dlm_attr_show,
187 	.store = dlm_attr_store,
188 };
189 
190 static struct kobj_type dlm_ktype = {
191 	.default_groups = dlm_groups,
192 	.sysfs_ops     = &dlm_attr_ops,
193 	.release       = lockspace_kobj_release,
194 };
195 
196 static struct kset *dlm_kset;
197 
198 static int do_uevent(struct dlm_ls *ls, int in)
199 {
200 	if (in)
201 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
202 	else
203 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
204 
205 	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
206 
207 	/* dlm_controld will see the uevent, do the necessary group management
208 	   and then write to sysfs to wake us */
209 
210 	wait_event(ls->ls_uevent_wait,
211 		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
212 
213 	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
214 
215 	return ls->ls_uevent_result;
216 }
217 
218 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
219 		      struct kobj_uevent_env *env)
220 {
221 	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
222 
223 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
224 	return 0;
225 }
226 
227 static const struct kset_uevent_ops dlm_uevent_ops = {
228 	.uevent = dlm_uevent,
229 };
230 
231 int __init dlm_lockspace_init(void)
232 {
233 	ls_count = 0;
234 	mutex_init(&ls_lock);
235 	INIT_LIST_HEAD(&lslist);
236 	spin_lock_init(&lslist_lock);
237 
238 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
239 	if (!dlm_kset) {
240 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
241 		return -ENOMEM;
242 	}
243 	return 0;
244 }
245 
246 void dlm_lockspace_exit(void)
247 {
248 	kset_unregister(dlm_kset);
249 }
250 
251 static struct dlm_ls *find_ls_to_scan(void)
252 {
253 	struct dlm_ls *ls;
254 
255 	spin_lock(&lslist_lock);
256 	list_for_each_entry(ls, &lslist, ls_list) {
257 		if (time_after_eq(jiffies, ls->ls_scan_time +
258 					    dlm_config.ci_scan_secs * HZ)) {
259 			spin_unlock(&lslist_lock);
260 			return ls;
261 		}
262 	}
263 	spin_unlock(&lslist_lock);
264 	return NULL;
265 }
266 
267 static int dlm_scand(void *data)
268 {
269 	struct dlm_ls *ls;
270 
271 	while (!kthread_should_stop()) {
272 		ls = find_ls_to_scan();
273 		if (ls) {
274 			if (dlm_lock_recovery_try(ls)) {
275 				ls->ls_scan_time = jiffies;
276 				dlm_scan_rsbs(ls);
277 				dlm_scan_timeout(ls);
278 				dlm_scan_waiters(ls);
279 				dlm_unlock_recovery(ls);
280 			} else {
281 				ls->ls_scan_time += HZ;
282 			}
283 			continue;
284 		}
285 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
286 	}
287 	return 0;
288 }
289 
290 static int dlm_scand_start(void)
291 {
292 	struct task_struct *p;
293 	int error = 0;
294 
295 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
296 	if (IS_ERR(p))
297 		error = PTR_ERR(p);
298 	else
299 		scand_task = p;
300 	return error;
301 }
302 
303 static void dlm_scand_stop(void)
304 {
305 	kthread_stop(scand_task);
306 }
307 
308 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
309 {
310 	struct dlm_ls *ls;
311 
312 	spin_lock(&lslist_lock);
313 
314 	list_for_each_entry(ls, &lslist, ls_list) {
315 		if (ls->ls_global_id == id) {
316 			ls->ls_count++;
317 			goto out;
318 		}
319 	}
320 	ls = NULL;
321  out:
322 	spin_unlock(&lslist_lock);
323 	return ls;
324 }
325 
326 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
327 {
328 	struct dlm_ls *ls;
329 
330 	spin_lock(&lslist_lock);
331 	list_for_each_entry(ls, &lslist, ls_list) {
332 		if (ls->ls_local_handle == lockspace) {
333 			ls->ls_count++;
334 			goto out;
335 		}
336 	}
337 	ls = NULL;
338  out:
339 	spin_unlock(&lslist_lock);
340 	return ls;
341 }
342 
343 struct dlm_ls *dlm_find_lockspace_device(int minor)
344 {
345 	struct dlm_ls *ls;
346 
347 	spin_lock(&lslist_lock);
348 	list_for_each_entry(ls, &lslist, ls_list) {
349 		if (ls->ls_device.minor == minor) {
350 			ls->ls_count++;
351 			goto out;
352 		}
353 	}
354 	ls = NULL;
355  out:
356 	spin_unlock(&lslist_lock);
357 	return ls;
358 }
359 
360 void dlm_put_lockspace(struct dlm_ls *ls)
361 {
362 	spin_lock(&lslist_lock);
363 	ls->ls_count--;
364 	spin_unlock(&lslist_lock);
365 }
366 
367 static void remove_lockspace(struct dlm_ls *ls)
368 {
369 	for (;;) {
370 		spin_lock(&lslist_lock);
371 		if (ls->ls_count == 0) {
372 			WARN_ON(ls->ls_create_count != 0);
373 			list_del(&ls->ls_list);
374 			spin_unlock(&lslist_lock);
375 			return;
376 		}
377 		spin_unlock(&lslist_lock);
378 		ssleep(1);
379 	}
380 }
381 
382 static int threads_start(void)
383 {
384 	int error;
385 
386 	error = dlm_scand_start();
387 	if (error) {
388 		log_print("cannot start dlm_scand thread %d", error);
389 		goto fail;
390 	}
391 
392 	/* Thread for sending/receiving messages for all lockspace's */
393 	error = dlm_lowcomms_start();
394 	if (error) {
395 		log_print("cannot start dlm lowcomms %d", error);
396 		goto scand_fail;
397 	}
398 
399 	return 0;
400 
401  scand_fail:
402 	dlm_scand_stop();
403  fail:
404 	return error;
405 }
406 
407 static int new_lockspace(const char *name, const char *cluster,
408 			 uint32_t flags, int lvblen,
409 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
410 			 int *ops_result, dlm_lockspace_t **lockspace)
411 {
412 	struct dlm_ls *ls;
413 	int i, size, error;
414 	int do_unreg = 0;
415 	int namelen = strlen(name);
416 
417 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
418 		return -EINVAL;
419 
420 	if (!lvblen || (lvblen % 8))
421 		return -EINVAL;
422 
423 	if (!try_module_get(THIS_MODULE))
424 		return -EINVAL;
425 
426 	if (!dlm_user_daemon_available()) {
427 		log_print("dlm user daemon not available");
428 		error = -EUNATCH;
429 		goto out;
430 	}
431 
432 	if (ops && ops_result) {
433 	       	if (!dlm_config.ci_recover_callbacks)
434 			*ops_result = -EOPNOTSUPP;
435 		else
436 			*ops_result = 0;
437 	}
438 
439 	if (!cluster)
440 		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
441 			  dlm_config.ci_cluster_name);
442 
443 	if (dlm_config.ci_recover_callbacks && cluster &&
444 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
445 		log_print("dlm cluster name '%s' does not match "
446 			  "the application cluster name '%s'",
447 			  dlm_config.ci_cluster_name, cluster);
448 		error = -EBADR;
449 		goto out;
450 	}
451 
452 	error = 0;
453 
454 	spin_lock(&lslist_lock);
455 	list_for_each_entry(ls, &lslist, ls_list) {
456 		WARN_ON(ls->ls_create_count <= 0);
457 		if (ls->ls_namelen != namelen)
458 			continue;
459 		if (memcmp(ls->ls_name, name, namelen))
460 			continue;
461 		if (flags & DLM_LSFL_NEWEXCL) {
462 			error = -EEXIST;
463 			break;
464 		}
465 		ls->ls_create_count++;
466 		*lockspace = ls;
467 		error = 1;
468 		break;
469 	}
470 	spin_unlock(&lslist_lock);
471 
472 	if (error)
473 		goto out;
474 
475 	error = -ENOMEM;
476 
477 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
478 	if (!ls)
479 		goto out;
480 	memcpy(ls->ls_name, name, namelen);
481 	ls->ls_namelen = namelen;
482 	ls->ls_lvblen = lvblen;
483 	ls->ls_count = 0;
484 	ls->ls_flags = 0;
485 	ls->ls_scan_time = jiffies;
486 
487 	if (ops && dlm_config.ci_recover_callbacks) {
488 		ls->ls_ops = ops;
489 		ls->ls_ops_arg = ops_arg;
490 	}
491 
492 	if (flags & DLM_LSFL_TIMEWARN)
493 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
494 
495 	/* ls_exflags are forced to match among nodes, and we don't
496 	   need to require all nodes to have some flags set */
497 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
498 				    DLM_LSFL_NEWEXCL));
499 
500 	size = dlm_config.ci_rsbtbl_size;
501 	ls->ls_rsbtbl_size = size;
502 
503 	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
504 	if (!ls->ls_rsbtbl)
505 		goto out_lsfree;
506 	for (i = 0; i < size; i++) {
507 		ls->ls_rsbtbl[i].keep.rb_node = NULL;
508 		ls->ls_rsbtbl[i].toss.rb_node = NULL;
509 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
510 	}
511 
512 	spin_lock_init(&ls->ls_remove_spin);
513 
514 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
515 		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
516 						 GFP_KERNEL);
517 		if (!ls->ls_remove_names[i])
518 			goto out_rsbtbl;
519 	}
520 
521 	idr_init(&ls->ls_lkbidr);
522 	spin_lock_init(&ls->ls_lkbidr_spin);
523 
524 	INIT_LIST_HEAD(&ls->ls_waiters);
525 	mutex_init(&ls->ls_waiters_mutex);
526 	INIT_LIST_HEAD(&ls->ls_orphans);
527 	mutex_init(&ls->ls_orphans_mutex);
528 	INIT_LIST_HEAD(&ls->ls_timeout);
529 	mutex_init(&ls->ls_timeout_mutex);
530 
531 	INIT_LIST_HEAD(&ls->ls_new_rsb);
532 	spin_lock_init(&ls->ls_new_rsb_spin);
533 
534 	INIT_LIST_HEAD(&ls->ls_nodes);
535 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
536 	ls->ls_num_nodes = 0;
537 	ls->ls_low_nodeid = 0;
538 	ls->ls_total_weight = 0;
539 	ls->ls_node_array = NULL;
540 
541 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
542 	ls->ls_stub_rsb.res_ls = ls;
543 
544 	ls->ls_debug_rsb_dentry = NULL;
545 	ls->ls_debug_waiters_dentry = NULL;
546 
547 	init_waitqueue_head(&ls->ls_uevent_wait);
548 	ls->ls_uevent_result = 0;
549 	init_completion(&ls->ls_members_done);
550 	ls->ls_members_result = -1;
551 
552 	mutex_init(&ls->ls_cb_mutex);
553 	INIT_LIST_HEAD(&ls->ls_cb_delay);
554 
555 	ls->ls_recoverd_task = NULL;
556 	mutex_init(&ls->ls_recoverd_active);
557 	spin_lock_init(&ls->ls_recover_lock);
558 	spin_lock_init(&ls->ls_rcom_spin);
559 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
560 	ls->ls_recover_status = 0;
561 	ls->ls_recover_seq = 0;
562 	ls->ls_recover_args = NULL;
563 	init_rwsem(&ls->ls_in_recovery);
564 	init_rwsem(&ls->ls_recv_active);
565 	INIT_LIST_HEAD(&ls->ls_requestqueue);
566 	mutex_init(&ls->ls_requestqueue_mutex);
567 	mutex_init(&ls->ls_clear_proc_locks);
568 
569 	ls->ls_recover_buf = kmalloc(LOWCOMMS_MAX_TX_BUFFER_LEN, GFP_NOFS);
570 	if (!ls->ls_recover_buf)
571 		goto out_lkbidr;
572 
573 	ls->ls_slot = 0;
574 	ls->ls_num_slots = 0;
575 	ls->ls_slots_size = 0;
576 	ls->ls_slots = NULL;
577 
578 	INIT_LIST_HEAD(&ls->ls_recover_list);
579 	spin_lock_init(&ls->ls_recover_list_lock);
580 	idr_init(&ls->ls_recover_idr);
581 	spin_lock_init(&ls->ls_recover_idr_lock);
582 	ls->ls_recover_list_count = 0;
583 	ls->ls_local_handle = ls;
584 	init_waitqueue_head(&ls->ls_wait_general);
585 	INIT_LIST_HEAD(&ls->ls_root_list);
586 	init_rwsem(&ls->ls_root_sem);
587 
588 	spin_lock(&lslist_lock);
589 	ls->ls_create_count = 1;
590 	list_add(&ls->ls_list, &lslist);
591 	spin_unlock(&lslist_lock);
592 
593 	if (flags & DLM_LSFL_FS) {
594 		error = dlm_callback_start(ls);
595 		if (error) {
596 			log_error(ls, "can't start dlm_callback %d", error);
597 			goto out_delist;
598 		}
599 	}
600 
601 	init_waitqueue_head(&ls->ls_recover_lock_wait);
602 
603 	/*
604 	 * Once started, dlm_recoverd first looks for ls in lslist, then
605 	 * initializes ls_in_recovery as locked in "down" mode.  We need
606 	 * to wait for the wakeup from dlm_recoverd because in_recovery
607 	 * has to start out in down mode.
608 	 */
609 
610 	error = dlm_recoverd_start(ls);
611 	if (error) {
612 		log_error(ls, "can't start dlm_recoverd %d", error);
613 		goto out_callback;
614 	}
615 
616 	wait_event(ls->ls_recover_lock_wait,
617 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
618 
619 	/* let kobject handle freeing of ls if there's an error */
620 	do_unreg = 1;
621 
622 	ls->ls_kobj.kset = dlm_kset;
623 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
624 				     "%s", ls->ls_name);
625 	if (error)
626 		goto out_recoverd;
627 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
628 
629 	/* This uevent triggers dlm_controld in userspace to add us to the
630 	   group of nodes that are members of this lockspace (managed by the
631 	   cluster infrastructure.)  Once it's done that, it tells us who the
632 	   current lockspace members are (via configfs) and then tells the
633 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
634 
635 	error = do_uevent(ls, 1);
636 	if (error)
637 		goto out_recoverd;
638 
639 	wait_for_completion(&ls->ls_members_done);
640 	error = ls->ls_members_result;
641 	if (error)
642 		goto out_members;
643 
644 	dlm_create_debug_file(ls);
645 
646 	log_rinfo(ls, "join complete");
647 	*lockspace = ls;
648 	return 0;
649 
650  out_members:
651 	do_uevent(ls, 0);
652 	dlm_clear_members(ls);
653 	kfree(ls->ls_node_array);
654  out_recoverd:
655 	dlm_recoverd_stop(ls);
656  out_callback:
657 	dlm_callback_stop(ls);
658  out_delist:
659 	spin_lock(&lslist_lock);
660 	list_del(&ls->ls_list);
661 	spin_unlock(&lslist_lock);
662 	idr_destroy(&ls->ls_recover_idr);
663 	kfree(ls->ls_recover_buf);
664  out_lkbidr:
665 	idr_destroy(&ls->ls_lkbidr);
666  out_rsbtbl:
667 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
668 		kfree(ls->ls_remove_names[i]);
669 	vfree(ls->ls_rsbtbl);
670  out_lsfree:
671 	if (do_unreg)
672 		kobject_put(&ls->ls_kobj);
673 	else
674 		kfree(ls);
675  out:
676 	module_put(THIS_MODULE);
677 	return error;
678 }
679 
680 int dlm_new_lockspace(const char *name, const char *cluster,
681 		      uint32_t flags, int lvblen,
682 		      const struct dlm_lockspace_ops *ops, void *ops_arg,
683 		      int *ops_result, dlm_lockspace_t **lockspace)
684 {
685 	int error = 0;
686 
687 	mutex_lock(&ls_lock);
688 	if (!ls_count)
689 		error = threads_start();
690 	if (error)
691 		goto out;
692 
693 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
694 			      ops_result, lockspace);
695 	if (!error)
696 		ls_count++;
697 	if (error > 0)
698 		error = 0;
699 	if (!ls_count) {
700 		dlm_scand_stop();
701 		dlm_lowcomms_shutdown();
702 		dlm_lowcomms_stop();
703 	}
704  out:
705 	mutex_unlock(&ls_lock);
706 	return error;
707 }
708 
709 static int lkb_idr_is_local(int id, void *p, void *data)
710 {
711 	struct dlm_lkb *lkb = p;
712 
713 	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
714 }
715 
716 static int lkb_idr_is_any(int id, void *p, void *data)
717 {
718 	return 1;
719 }
720 
721 static int lkb_idr_free(int id, void *p, void *data)
722 {
723 	struct dlm_lkb *lkb = p;
724 
725 	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
726 		dlm_free_lvb(lkb->lkb_lvbptr);
727 
728 	dlm_free_lkb(lkb);
729 	return 0;
730 }
731 
732 /* NOTE: We check the lkbidr here rather than the resource table.
733    This is because there may be LKBs queued as ASTs that have been unlinked
734    from their RSBs and are pending deletion once the AST has been delivered */
735 
736 static int lockspace_busy(struct dlm_ls *ls, int force)
737 {
738 	int rv;
739 
740 	spin_lock(&ls->ls_lkbidr_spin);
741 	if (force == 0) {
742 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
743 	} else if (force == 1) {
744 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
745 	} else {
746 		rv = 0;
747 	}
748 	spin_unlock(&ls->ls_lkbidr_spin);
749 	return rv;
750 }
751 
752 static int release_lockspace(struct dlm_ls *ls, int force)
753 {
754 	struct dlm_rsb *rsb;
755 	struct rb_node *n;
756 	int i, busy, rv;
757 
758 	busy = lockspace_busy(ls, force);
759 
760 	spin_lock(&lslist_lock);
761 	if (ls->ls_create_count == 1) {
762 		if (busy) {
763 			rv = -EBUSY;
764 		} else {
765 			/* remove_lockspace takes ls off lslist */
766 			ls->ls_create_count = 0;
767 			rv = 0;
768 		}
769 	} else if (ls->ls_create_count > 1) {
770 		rv = --ls->ls_create_count;
771 	} else {
772 		rv = -EINVAL;
773 	}
774 	spin_unlock(&lslist_lock);
775 
776 	if (rv) {
777 		log_debug(ls, "release_lockspace no remove %d", rv);
778 		return rv;
779 	}
780 
781 	dlm_device_deregister(ls);
782 
783 	if (force < 3 && dlm_user_daemon_available())
784 		do_uevent(ls, 0);
785 
786 	dlm_recoverd_stop(ls);
787 
788 	if (ls_count == 1) {
789 		dlm_scand_stop();
790 		dlm_lowcomms_shutdown();
791 	}
792 
793 	dlm_callback_stop(ls);
794 
795 	remove_lockspace(ls);
796 
797 	dlm_delete_debug_file(ls);
798 
799 	idr_destroy(&ls->ls_recover_idr);
800 	kfree(ls->ls_recover_buf);
801 
802 	/*
803 	 * Free all lkb's in idr
804 	 */
805 
806 	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
807 	idr_destroy(&ls->ls_lkbidr);
808 
809 	/*
810 	 * Free all rsb's on rsbtbl[] lists
811 	 */
812 
813 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
814 		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
815 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
816 			rb_erase(n, &ls->ls_rsbtbl[i].keep);
817 			dlm_free_rsb(rsb);
818 		}
819 
820 		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
821 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
822 			rb_erase(n, &ls->ls_rsbtbl[i].toss);
823 			dlm_free_rsb(rsb);
824 		}
825 	}
826 
827 	vfree(ls->ls_rsbtbl);
828 
829 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
830 		kfree(ls->ls_remove_names[i]);
831 
832 	while (!list_empty(&ls->ls_new_rsb)) {
833 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
834 				       res_hashchain);
835 		list_del(&rsb->res_hashchain);
836 		dlm_free_rsb(rsb);
837 	}
838 
839 	/*
840 	 * Free structures on any other lists
841 	 */
842 
843 	dlm_purge_requestqueue(ls);
844 	kfree(ls->ls_recover_args);
845 	dlm_clear_members(ls);
846 	dlm_clear_members_gone(ls);
847 	kfree(ls->ls_node_array);
848 	log_rinfo(ls, "release_lockspace final free");
849 	kobject_put(&ls->ls_kobj);
850 	/* The ls structure will be freed when the kobject is done with */
851 
852 	module_put(THIS_MODULE);
853 	return 0;
854 }
855 
856 /*
857  * Called when a system has released all its locks and is not going to use the
858  * lockspace any longer.  We free everything we're managing for this lockspace.
859  * Remaining nodes will go through the recovery process as if we'd died.  The
860  * lockspace must continue to function as usual, participating in recoveries,
861  * until this returns.
862  *
863  * Force has 4 possible values:
864  * 0 - don't destroy locksapce if it has any LKBs
865  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
866  * 2 - destroy lockspace regardless of LKBs
867  * 3 - destroy lockspace as part of a forced shutdown
868  */
869 
870 int dlm_release_lockspace(void *lockspace, int force)
871 {
872 	struct dlm_ls *ls;
873 	int error;
874 
875 	ls = dlm_find_lockspace_local(lockspace);
876 	if (!ls)
877 		return -EINVAL;
878 	dlm_put_lockspace(ls);
879 
880 	mutex_lock(&ls_lock);
881 	error = release_lockspace(ls, force);
882 	if (!error)
883 		ls_count--;
884 	if (!ls_count)
885 		dlm_lowcomms_stop();
886 	mutex_unlock(&ls_lock);
887 
888 	return error;
889 }
890 
891 void dlm_stop_lockspaces(void)
892 {
893 	struct dlm_ls *ls;
894 	int count;
895 
896  restart:
897 	count = 0;
898 	spin_lock(&lslist_lock);
899 	list_for_each_entry(ls, &lslist, ls_list) {
900 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
901 			count++;
902 			continue;
903 		}
904 		spin_unlock(&lslist_lock);
905 		log_error(ls, "no userland control daemon, stopping lockspace");
906 		dlm_ls_stop(ls);
907 		goto restart;
908 	}
909 	spin_unlock(&lslist_lock);
910 
911 	if (count)
912 		log_print("dlm user daemon left %d lockspaces", count);
913 }
914 
915