xref: /openbmc/linux/fs/dlm/lockspace.c (revision e4781421e883340b796da5a724bda7226817990b)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include <linux/module.h>
15 
16 #include "dlm_internal.h"
17 #include "lockspace.h"
18 #include "member.h"
19 #include "recoverd.h"
20 #include "dir.h"
21 #include "lowcomms.h"
22 #include "config.h"
23 #include "memory.h"
24 #include "lock.h"
25 #include "recover.h"
26 #include "requestqueue.h"
27 #include "user.h"
28 #include "ast.h"
29 
30 static int			ls_count;
31 static struct mutex		ls_lock;
32 static struct list_head		lslist;
33 static spinlock_t		lslist_lock;
34 static struct task_struct *	scand_task;
35 
36 
37 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
38 {
39 	ssize_t ret = len;
40 	int n;
41 	int rc = kstrtoint(buf, 0, &n);
42 
43 	if (rc)
44 		return rc;
45 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
46 	if (!ls)
47 		return -EINVAL;
48 
49 	switch (n) {
50 	case 0:
51 		dlm_ls_stop(ls);
52 		break;
53 	case 1:
54 		dlm_ls_start(ls);
55 		break;
56 	default:
57 		ret = -EINVAL;
58 	}
59 	dlm_put_lockspace(ls);
60 	return ret;
61 }
62 
63 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
64 {
65 	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
66 
67 	if (rc)
68 		return rc;
69 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
70 	wake_up(&ls->ls_uevent_wait);
71 	return len;
72 }
73 
74 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
75 {
76 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
77 }
78 
79 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
80 {
81 	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
82 
83 	if (rc)
84 		return rc;
85 	return len;
86 }
87 
88 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
89 {
90 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
91 }
92 
93 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
94 {
95 	int val;
96 	int rc = kstrtoint(buf, 0, &val);
97 
98 	if (rc)
99 		return rc;
100 	if (val == 1)
101 		set_bit(LSFL_NODIR, &ls->ls_flags);
102 	return len;
103 }
104 
105 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
106 {
107 	uint32_t status = dlm_recover_status(ls);
108 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
109 }
110 
111 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
112 {
113 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
114 }
115 
116 struct dlm_attr {
117 	struct attribute attr;
118 	ssize_t (*show)(struct dlm_ls *, char *);
119 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
120 };
121 
122 static struct dlm_attr dlm_attr_control = {
123 	.attr  = {.name = "control", .mode = S_IWUSR},
124 	.store = dlm_control_store
125 };
126 
127 static struct dlm_attr dlm_attr_event = {
128 	.attr  = {.name = "event_done", .mode = S_IWUSR},
129 	.store = dlm_event_store
130 };
131 
132 static struct dlm_attr dlm_attr_id = {
133 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
134 	.show  = dlm_id_show,
135 	.store = dlm_id_store
136 };
137 
138 static struct dlm_attr dlm_attr_nodir = {
139 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
140 	.show  = dlm_nodir_show,
141 	.store = dlm_nodir_store
142 };
143 
144 static struct dlm_attr dlm_attr_recover_status = {
145 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
146 	.show  = dlm_recover_status_show
147 };
148 
149 static struct dlm_attr dlm_attr_recover_nodeid = {
150 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
151 	.show  = dlm_recover_nodeid_show
152 };
153 
154 static struct attribute *dlm_attrs[] = {
155 	&dlm_attr_control.attr,
156 	&dlm_attr_event.attr,
157 	&dlm_attr_id.attr,
158 	&dlm_attr_nodir.attr,
159 	&dlm_attr_recover_status.attr,
160 	&dlm_attr_recover_nodeid.attr,
161 	NULL,
162 };
163 
164 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
165 			     char *buf)
166 {
167 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
168 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
169 	return a->show ? a->show(ls, buf) : 0;
170 }
171 
172 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
173 			      const char *buf, size_t len)
174 {
175 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
176 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
177 	return a->store ? a->store(ls, buf, len) : len;
178 }
179 
180 static void lockspace_kobj_release(struct kobject *k)
181 {
182 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
183 	kfree(ls);
184 }
185 
186 static const struct sysfs_ops dlm_attr_ops = {
187 	.show  = dlm_attr_show,
188 	.store = dlm_attr_store,
189 };
190 
191 static struct kobj_type dlm_ktype = {
192 	.default_attrs = dlm_attrs,
193 	.sysfs_ops     = &dlm_attr_ops,
194 	.release       = lockspace_kobj_release,
195 };
196 
197 static struct kset *dlm_kset;
198 
199 static int do_uevent(struct dlm_ls *ls, int in)
200 {
201 	int error;
202 
203 	if (in)
204 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
205 	else
206 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
207 
208 	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
209 
210 	/* dlm_controld will see the uevent, do the necessary group management
211 	   and then write to sysfs to wake us */
212 
213 	error = wait_event_interruptible(ls->ls_uevent_wait,
214 			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
215 
216 	log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
217 
218 	if (error)
219 		goto out;
220 
221 	error = ls->ls_uevent_result;
222  out:
223 	if (error)
224 		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
225 			  error, ls->ls_uevent_result);
226 	return error;
227 }
228 
229 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
230 		      struct kobj_uevent_env *env)
231 {
232 	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
233 
234 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
235 	return 0;
236 }
237 
238 static struct kset_uevent_ops dlm_uevent_ops = {
239 	.uevent = dlm_uevent,
240 };
241 
242 int __init dlm_lockspace_init(void)
243 {
244 	ls_count = 0;
245 	mutex_init(&ls_lock);
246 	INIT_LIST_HEAD(&lslist);
247 	spin_lock_init(&lslist_lock);
248 
249 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
250 	if (!dlm_kset) {
251 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
252 		return -ENOMEM;
253 	}
254 	return 0;
255 }
256 
257 void dlm_lockspace_exit(void)
258 {
259 	kset_unregister(dlm_kset);
260 }
261 
262 static struct dlm_ls *find_ls_to_scan(void)
263 {
264 	struct dlm_ls *ls;
265 
266 	spin_lock(&lslist_lock);
267 	list_for_each_entry(ls, &lslist, ls_list) {
268 		if (time_after_eq(jiffies, ls->ls_scan_time +
269 					    dlm_config.ci_scan_secs * HZ)) {
270 			spin_unlock(&lslist_lock);
271 			return ls;
272 		}
273 	}
274 	spin_unlock(&lslist_lock);
275 	return NULL;
276 }
277 
278 static int dlm_scand(void *data)
279 {
280 	struct dlm_ls *ls;
281 
282 	while (!kthread_should_stop()) {
283 		ls = find_ls_to_scan();
284 		if (ls) {
285 			if (dlm_lock_recovery_try(ls)) {
286 				ls->ls_scan_time = jiffies;
287 				dlm_scan_rsbs(ls);
288 				dlm_scan_timeout(ls);
289 				dlm_scan_waiters(ls);
290 				dlm_unlock_recovery(ls);
291 			} else {
292 				ls->ls_scan_time += HZ;
293 			}
294 			continue;
295 		}
296 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
297 	}
298 	return 0;
299 }
300 
301 static int dlm_scand_start(void)
302 {
303 	struct task_struct *p;
304 	int error = 0;
305 
306 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
307 	if (IS_ERR(p))
308 		error = PTR_ERR(p);
309 	else
310 		scand_task = p;
311 	return error;
312 }
313 
314 static void dlm_scand_stop(void)
315 {
316 	kthread_stop(scand_task);
317 }
318 
319 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
320 {
321 	struct dlm_ls *ls;
322 
323 	spin_lock(&lslist_lock);
324 
325 	list_for_each_entry(ls, &lslist, ls_list) {
326 		if (ls->ls_global_id == id) {
327 			ls->ls_count++;
328 			goto out;
329 		}
330 	}
331 	ls = NULL;
332  out:
333 	spin_unlock(&lslist_lock);
334 	return ls;
335 }
336 
337 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
338 {
339 	struct dlm_ls *ls;
340 
341 	spin_lock(&lslist_lock);
342 	list_for_each_entry(ls, &lslist, ls_list) {
343 		if (ls->ls_local_handle == lockspace) {
344 			ls->ls_count++;
345 			goto out;
346 		}
347 	}
348 	ls = NULL;
349  out:
350 	spin_unlock(&lslist_lock);
351 	return ls;
352 }
353 
354 struct dlm_ls *dlm_find_lockspace_device(int minor)
355 {
356 	struct dlm_ls *ls;
357 
358 	spin_lock(&lslist_lock);
359 	list_for_each_entry(ls, &lslist, ls_list) {
360 		if (ls->ls_device.minor == minor) {
361 			ls->ls_count++;
362 			goto out;
363 		}
364 	}
365 	ls = NULL;
366  out:
367 	spin_unlock(&lslist_lock);
368 	return ls;
369 }
370 
371 void dlm_put_lockspace(struct dlm_ls *ls)
372 {
373 	spin_lock(&lslist_lock);
374 	ls->ls_count--;
375 	spin_unlock(&lslist_lock);
376 }
377 
378 static void remove_lockspace(struct dlm_ls *ls)
379 {
380 	for (;;) {
381 		spin_lock(&lslist_lock);
382 		if (ls->ls_count == 0) {
383 			WARN_ON(ls->ls_create_count != 0);
384 			list_del(&ls->ls_list);
385 			spin_unlock(&lslist_lock);
386 			return;
387 		}
388 		spin_unlock(&lslist_lock);
389 		ssleep(1);
390 	}
391 }
392 
393 static int threads_start(void)
394 {
395 	int error;
396 
397 	error = dlm_scand_start();
398 	if (error) {
399 		log_print("cannot start dlm_scand thread %d", error);
400 		goto fail;
401 	}
402 
403 	/* Thread for sending/receiving messages for all lockspace's */
404 	error = dlm_lowcomms_start();
405 	if (error) {
406 		log_print("cannot start dlm lowcomms %d", error);
407 		goto scand_fail;
408 	}
409 
410 	return 0;
411 
412  scand_fail:
413 	dlm_scand_stop();
414  fail:
415 	return error;
416 }
417 
418 static void threads_stop(void)
419 {
420 	dlm_scand_stop();
421 	dlm_lowcomms_stop();
422 }
423 
424 static int new_lockspace(const char *name, const char *cluster,
425 			 uint32_t flags, int lvblen,
426 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
427 			 int *ops_result, dlm_lockspace_t **lockspace)
428 {
429 	struct dlm_ls *ls;
430 	int i, size, error;
431 	int do_unreg = 0;
432 	int namelen = strlen(name);
433 
434 	if (namelen > DLM_LOCKSPACE_LEN)
435 		return -EINVAL;
436 
437 	if (!lvblen || (lvblen % 8))
438 		return -EINVAL;
439 
440 	if (!try_module_get(THIS_MODULE))
441 		return -EINVAL;
442 
443 	if (!dlm_user_daemon_available()) {
444 		log_print("dlm user daemon not available");
445 		error = -EUNATCH;
446 		goto out;
447 	}
448 
449 	if (ops && ops_result) {
450 	       	if (!dlm_config.ci_recover_callbacks)
451 			*ops_result = -EOPNOTSUPP;
452 		else
453 			*ops_result = 0;
454 	}
455 
456 	if (dlm_config.ci_recover_callbacks && cluster &&
457 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
458 		log_print("dlm cluster name %s mismatch %s",
459 			  dlm_config.ci_cluster_name, cluster);
460 		error = -EBADR;
461 		goto out;
462 	}
463 
464 	error = 0;
465 
466 	spin_lock(&lslist_lock);
467 	list_for_each_entry(ls, &lslist, ls_list) {
468 		WARN_ON(ls->ls_create_count <= 0);
469 		if (ls->ls_namelen != namelen)
470 			continue;
471 		if (memcmp(ls->ls_name, name, namelen))
472 			continue;
473 		if (flags & DLM_LSFL_NEWEXCL) {
474 			error = -EEXIST;
475 			break;
476 		}
477 		ls->ls_create_count++;
478 		*lockspace = ls;
479 		error = 1;
480 		break;
481 	}
482 	spin_unlock(&lslist_lock);
483 
484 	if (error)
485 		goto out;
486 
487 	error = -ENOMEM;
488 
489 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
490 	if (!ls)
491 		goto out;
492 	memcpy(ls->ls_name, name, namelen);
493 	ls->ls_namelen = namelen;
494 	ls->ls_lvblen = lvblen;
495 	ls->ls_count = 0;
496 	ls->ls_flags = 0;
497 	ls->ls_scan_time = jiffies;
498 
499 	if (ops && dlm_config.ci_recover_callbacks) {
500 		ls->ls_ops = ops;
501 		ls->ls_ops_arg = ops_arg;
502 	}
503 
504 	if (flags & DLM_LSFL_TIMEWARN)
505 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
506 
507 	/* ls_exflags are forced to match among nodes, and we don't
508 	   need to require all nodes to have some flags set */
509 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
510 				    DLM_LSFL_NEWEXCL));
511 
512 	size = dlm_config.ci_rsbtbl_size;
513 	ls->ls_rsbtbl_size = size;
514 
515 	ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
516 	if (!ls->ls_rsbtbl)
517 		goto out_lsfree;
518 	for (i = 0; i < size; i++) {
519 		ls->ls_rsbtbl[i].keep.rb_node = NULL;
520 		ls->ls_rsbtbl[i].toss.rb_node = NULL;
521 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
522 	}
523 
524 	spin_lock_init(&ls->ls_remove_spin);
525 
526 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
527 		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
528 						 GFP_KERNEL);
529 		if (!ls->ls_remove_names[i])
530 			goto out_rsbtbl;
531 	}
532 
533 	idr_init(&ls->ls_lkbidr);
534 	spin_lock_init(&ls->ls_lkbidr_spin);
535 
536 	INIT_LIST_HEAD(&ls->ls_waiters);
537 	mutex_init(&ls->ls_waiters_mutex);
538 	INIT_LIST_HEAD(&ls->ls_orphans);
539 	mutex_init(&ls->ls_orphans_mutex);
540 	INIT_LIST_HEAD(&ls->ls_timeout);
541 	mutex_init(&ls->ls_timeout_mutex);
542 
543 	INIT_LIST_HEAD(&ls->ls_new_rsb);
544 	spin_lock_init(&ls->ls_new_rsb_spin);
545 
546 	INIT_LIST_HEAD(&ls->ls_nodes);
547 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
548 	ls->ls_num_nodes = 0;
549 	ls->ls_low_nodeid = 0;
550 	ls->ls_total_weight = 0;
551 	ls->ls_node_array = NULL;
552 
553 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
554 	ls->ls_stub_rsb.res_ls = ls;
555 
556 	ls->ls_debug_rsb_dentry = NULL;
557 	ls->ls_debug_waiters_dentry = NULL;
558 
559 	init_waitqueue_head(&ls->ls_uevent_wait);
560 	ls->ls_uevent_result = 0;
561 	init_completion(&ls->ls_members_done);
562 	ls->ls_members_result = -1;
563 
564 	mutex_init(&ls->ls_cb_mutex);
565 	INIT_LIST_HEAD(&ls->ls_cb_delay);
566 
567 	ls->ls_recoverd_task = NULL;
568 	mutex_init(&ls->ls_recoverd_active);
569 	spin_lock_init(&ls->ls_recover_lock);
570 	spin_lock_init(&ls->ls_rcom_spin);
571 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
572 	ls->ls_recover_status = 0;
573 	ls->ls_recover_seq = 0;
574 	ls->ls_recover_args = NULL;
575 	init_rwsem(&ls->ls_in_recovery);
576 	init_rwsem(&ls->ls_recv_active);
577 	INIT_LIST_HEAD(&ls->ls_requestqueue);
578 	mutex_init(&ls->ls_requestqueue_mutex);
579 	mutex_init(&ls->ls_clear_proc_locks);
580 
581 	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
582 	if (!ls->ls_recover_buf)
583 		goto out_lkbidr;
584 
585 	ls->ls_slot = 0;
586 	ls->ls_num_slots = 0;
587 	ls->ls_slots_size = 0;
588 	ls->ls_slots = NULL;
589 
590 	INIT_LIST_HEAD(&ls->ls_recover_list);
591 	spin_lock_init(&ls->ls_recover_list_lock);
592 	idr_init(&ls->ls_recover_idr);
593 	spin_lock_init(&ls->ls_recover_idr_lock);
594 	ls->ls_recover_list_count = 0;
595 	ls->ls_local_handle = ls;
596 	init_waitqueue_head(&ls->ls_wait_general);
597 	INIT_LIST_HEAD(&ls->ls_root_list);
598 	init_rwsem(&ls->ls_root_sem);
599 
600 	spin_lock(&lslist_lock);
601 	ls->ls_create_count = 1;
602 	list_add(&ls->ls_list, &lslist);
603 	spin_unlock(&lslist_lock);
604 
605 	if (flags & DLM_LSFL_FS) {
606 		error = dlm_callback_start(ls);
607 		if (error) {
608 			log_error(ls, "can't start dlm_callback %d", error);
609 			goto out_delist;
610 		}
611 	}
612 
613 	init_waitqueue_head(&ls->ls_recover_lock_wait);
614 
615 	/*
616 	 * Once started, dlm_recoverd first looks for ls in lslist, then
617 	 * initializes ls_in_recovery as locked in "down" mode.  We need
618 	 * to wait for the wakeup from dlm_recoverd because in_recovery
619 	 * has to start out in down mode.
620 	 */
621 
622 	error = dlm_recoverd_start(ls);
623 	if (error) {
624 		log_error(ls, "can't start dlm_recoverd %d", error);
625 		goto out_callback;
626 	}
627 
628 	wait_event(ls->ls_recover_lock_wait,
629 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
630 
631 	ls->ls_kobj.kset = dlm_kset;
632 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
633 				     "%s", ls->ls_name);
634 	if (error)
635 		goto out_recoverd;
636 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
637 
638 	/* let kobject handle freeing of ls if there's an error */
639 	do_unreg = 1;
640 
641 	/* This uevent triggers dlm_controld in userspace to add us to the
642 	   group of nodes that are members of this lockspace (managed by the
643 	   cluster infrastructure.)  Once it's done that, it tells us who the
644 	   current lockspace members are (via configfs) and then tells the
645 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
646 
647 	error = do_uevent(ls, 1);
648 	if (error)
649 		goto out_recoverd;
650 
651 	wait_for_completion(&ls->ls_members_done);
652 	error = ls->ls_members_result;
653 	if (error)
654 		goto out_members;
655 
656 	dlm_create_debug_file(ls);
657 
658 	log_rinfo(ls, "join complete");
659 	*lockspace = ls;
660 	return 0;
661 
662  out_members:
663 	do_uevent(ls, 0);
664 	dlm_clear_members(ls);
665 	kfree(ls->ls_node_array);
666  out_recoverd:
667 	dlm_recoverd_stop(ls);
668  out_callback:
669 	dlm_callback_stop(ls);
670  out_delist:
671 	spin_lock(&lslist_lock);
672 	list_del(&ls->ls_list);
673 	spin_unlock(&lslist_lock);
674 	idr_destroy(&ls->ls_recover_idr);
675 	kfree(ls->ls_recover_buf);
676  out_lkbidr:
677 	idr_destroy(&ls->ls_lkbidr);
678 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
679 		if (ls->ls_remove_names[i])
680 			kfree(ls->ls_remove_names[i]);
681 	}
682  out_rsbtbl:
683 	vfree(ls->ls_rsbtbl);
684  out_lsfree:
685 	if (do_unreg)
686 		kobject_put(&ls->ls_kobj);
687 	else
688 		kfree(ls);
689  out:
690 	module_put(THIS_MODULE);
691 	return error;
692 }
693 
694 int dlm_new_lockspace(const char *name, const char *cluster,
695 		      uint32_t flags, int lvblen,
696 		      const struct dlm_lockspace_ops *ops, void *ops_arg,
697 		      int *ops_result, dlm_lockspace_t **lockspace)
698 {
699 	int error = 0;
700 
701 	mutex_lock(&ls_lock);
702 	if (!ls_count)
703 		error = threads_start();
704 	if (error)
705 		goto out;
706 
707 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
708 			      ops_result, lockspace);
709 	if (!error)
710 		ls_count++;
711 	if (error > 0)
712 		error = 0;
713 	if (!ls_count)
714 		threads_stop();
715  out:
716 	mutex_unlock(&ls_lock);
717 	return error;
718 }
719 
720 static int lkb_idr_is_local(int id, void *p, void *data)
721 {
722 	struct dlm_lkb *lkb = p;
723 
724 	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
725 }
726 
727 static int lkb_idr_is_any(int id, void *p, void *data)
728 {
729 	return 1;
730 }
731 
732 static int lkb_idr_free(int id, void *p, void *data)
733 {
734 	struct dlm_lkb *lkb = p;
735 
736 	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
737 		dlm_free_lvb(lkb->lkb_lvbptr);
738 
739 	dlm_free_lkb(lkb);
740 	return 0;
741 }
742 
743 /* NOTE: We check the lkbidr here rather than the resource table.
744    This is because there may be LKBs queued as ASTs that have been unlinked
745    from their RSBs and are pending deletion once the AST has been delivered */
746 
747 static int lockspace_busy(struct dlm_ls *ls, int force)
748 {
749 	int rv;
750 
751 	spin_lock(&ls->ls_lkbidr_spin);
752 	if (force == 0) {
753 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
754 	} else if (force == 1) {
755 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
756 	} else {
757 		rv = 0;
758 	}
759 	spin_unlock(&ls->ls_lkbidr_spin);
760 	return rv;
761 }
762 
763 static int release_lockspace(struct dlm_ls *ls, int force)
764 {
765 	struct dlm_rsb *rsb;
766 	struct rb_node *n;
767 	int i, busy, rv;
768 
769 	busy = lockspace_busy(ls, force);
770 
771 	spin_lock(&lslist_lock);
772 	if (ls->ls_create_count == 1) {
773 		if (busy) {
774 			rv = -EBUSY;
775 		} else {
776 			/* remove_lockspace takes ls off lslist */
777 			ls->ls_create_count = 0;
778 			rv = 0;
779 		}
780 	} else if (ls->ls_create_count > 1) {
781 		rv = --ls->ls_create_count;
782 	} else {
783 		rv = -EINVAL;
784 	}
785 	spin_unlock(&lslist_lock);
786 
787 	if (rv) {
788 		log_debug(ls, "release_lockspace no remove %d", rv);
789 		return rv;
790 	}
791 
792 	dlm_device_deregister(ls);
793 
794 	if (force < 3 && dlm_user_daemon_available())
795 		do_uevent(ls, 0);
796 
797 	dlm_recoverd_stop(ls);
798 
799 	dlm_callback_stop(ls);
800 
801 	remove_lockspace(ls);
802 
803 	dlm_delete_debug_file(ls);
804 
805 	kfree(ls->ls_recover_buf);
806 
807 	/*
808 	 * Free all lkb's in idr
809 	 */
810 
811 	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
812 	idr_destroy(&ls->ls_lkbidr);
813 
814 	/*
815 	 * Free all rsb's on rsbtbl[] lists
816 	 */
817 
818 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
819 		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
820 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
821 			rb_erase(n, &ls->ls_rsbtbl[i].keep);
822 			dlm_free_rsb(rsb);
823 		}
824 
825 		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
826 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
827 			rb_erase(n, &ls->ls_rsbtbl[i].toss);
828 			dlm_free_rsb(rsb);
829 		}
830 	}
831 
832 	vfree(ls->ls_rsbtbl);
833 
834 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
835 		kfree(ls->ls_remove_names[i]);
836 
837 	while (!list_empty(&ls->ls_new_rsb)) {
838 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
839 				       res_hashchain);
840 		list_del(&rsb->res_hashchain);
841 		dlm_free_rsb(rsb);
842 	}
843 
844 	/*
845 	 * Free structures on any other lists
846 	 */
847 
848 	dlm_purge_requestqueue(ls);
849 	kfree(ls->ls_recover_args);
850 	dlm_clear_members(ls);
851 	dlm_clear_members_gone(ls);
852 	kfree(ls->ls_node_array);
853 	log_rinfo(ls, "release_lockspace final free");
854 	kobject_put(&ls->ls_kobj);
855 	/* The ls structure will be freed when the kobject is done with */
856 
857 	module_put(THIS_MODULE);
858 	return 0;
859 }
860 
861 /*
862  * Called when a system has released all its locks and is not going to use the
863  * lockspace any longer.  We free everything we're managing for this lockspace.
864  * Remaining nodes will go through the recovery process as if we'd died.  The
865  * lockspace must continue to function as usual, participating in recoveries,
866  * until this returns.
867  *
868  * Force has 4 possible values:
869  * 0 - don't destroy locksapce if it has any LKBs
870  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
871  * 2 - destroy lockspace regardless of LKBs
872  * 3 - destroy lockspace as part of a forced shutdown
873  */
874 
875 int dlm_release_lockspace(void *lockspace, int force)
876 {
877 	struct dlm_ls *ls;
878 	int error;
879 
880 	ls = dlm_find_lockspace_local(lockspace);
881 	if (!ls)
882 		return -EINVAL;
883 	dlm_put_lockspace(ls);
884 
885 	mutex_lock(&ls_lock);
886 	error = release_lockspace(ls, force);
887 	if (!error)
888 		ls_count--;
889 	if (!ls_count)
890 		threads_stop();
891 	mutex_unlock(&ls_lock);
892 
893 	return error;
894 }
895 
896 void dlm_stop_lockspaces(void)
897 {
898 	struct dlm_ls *ls;
899 	int count;
900 
901  restart:
902 	count = 0;
903 	spin_lock(&lslist_lock);
904 	list_for_each_entry(ls, &lslist, ls_list) {
905 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
906 			count++;
907 			continue;
908 		}
909 		spin_unlock(&lslist_lock);
910 		log_error(ls, "no userland control daemon, stopping lockspace");
911 		dlm_ls_stop(ls);
912 		goto restart;
913 	}
914 	spin_unlock(&lslist_lock);
915 
916 	if (count)
917 		log_print("dlm user daemon left %d lockspaces", count);
918 }
919 
920