xref: /openbmc/linux/fs/dlm/lockspace.c (revision 1cac4f26)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include <linux/module.h>
15 
16 #include "dlm_internal.h"
17 #include "lockspace.h"
18 #include "member.h"
19 #include "recoverd.h"
20 #include "dir.h"
21 #include "lowcomms.h"
22 #include "config.h"
23 #include "memory.h"
24 #include "lock.h"
25 #include "recover.h"
26 #include "requestqueue.h"
27 #include "user.h"
28 #include "ast.h"
29 
30 static int			ls_count;
31 static struct mutex		ls_lock;
32 static struct list_head		lslist;
33 static spinlock_t		lslist_lock;
34 static struct task_struct *	scand_task;
35 
36 
37 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
38 {
39 	ssize_t ret = len;
40 	int n;
41 	int rc = kstrtoint(buf, 0, &n);
42 
43 	if (rc)
44 		return rc;
45 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
46 	if (!ls)
47 		return -EINVAL;
48 
49 	switch (n) {
50 	case 0:
51 		dlm_ls_stop(ls);
52 		break;
53 	case 1:
54 		dlm_ls_start(ls);
55 		break;
56 	default:
57 		ret = -EINVAL;
58 	}
59 	dlm_put_lockspace(ls);
60 	return ret;
61 }
62 
63 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
64 {
65 	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
66 
67 	if (rc)
68 		return rc;
69 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
70 	wake_up(&ls->ls_uevent_wait);
71 	return len;
72 }
73 
74 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
75 {
76 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
77 }
78 
79 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
80 {
81 	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
82 
83 	if (rc)
84 		return rc;
85 	return len;
86 }
87 
88 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
89 {
90 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
91 }
92 
93 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
94 {
95 	int val;
96 	int rc = kstrtoint(buf, 0, &val);
97 
98 	if (rc)
99 		return rc;
100 	if (val == 1)
101 		set_bit(LSFL_NODIR, &ls->ls_flags);
102 	return len;
103 }
104 
105 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
106 {
107 	uint32_t status = dlm_recover_status(ls);
108 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
109 }
110 
111 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
112 {
113 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
114 }
115 
116 struct dlm_attr {
117 	struct attribute attr;
118 	ssize_t (*show)(struct dlm_ls *, char *);
119 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
120 };
121 
122 static struct dlm_attr dlm_attr_control = {
123 	.attr  = {.name = "control", .mode = S_IWUSR},
124 	.store = dlm_control_store
125 };
126 
127 static struct dlm_attr dlm_attr_event = {
128 	.attr  = {.name = "event_done", .mode = S_IWUSR},
129 	.store = dlm_event_store
130 };
131 
132 static struct dlm_attr dlm_attr_id = {
133 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
134 	.show  = dlm_id_show,
135 	.store = dlm_id_store
136 };
137 
138 static struct dlm_attr dlm_attr_nodir = {
139 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
140 	.show  = dlm_nodir_show,
141 	.store = dlm_nodir_store
142 };
143 
144 static struct dlm_attr dlm_attr_recover_status = {
145 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
146 	.show  = dlm_recover_status_show
147 };
148 
149 static struct dlm_attr dlm_attr_recover_nodeid = {
150 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
151 	.show  = dlm_recover_nodeid_show
152 };
153 
154 static struct attribute *dlm_attrs[] = {
155 	&dlm_attr_control.attr,
156 	&dlm_attr_event.attr,
157 	&dlm_attr_id.attr,
158 	&dlm_attr_nodir.attr,
159 	&dlm_attr_recover_status.attr,
160 	&dlm_attr_recover_nodeid.attr,
161 	NULL,
162 };
163 
164 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
165 			     char *buf)
166 {
167 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
168 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
169 	return a->show ? a->show(ls, buf) : 0;
170 }
171 
172 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
173 			      const char *buf, size_t len)
174 {
175 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
176 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
177 	return a->store ? a->store(ls, buf, len) : len;
178 }
179 
180 static void lockspace_kobj_release(struct kobject *k)
181 {
182 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
183 	kfree(ls);
184 }
185 
186 static const struct sysfs_ops dlm_attr_ops = {
187 	.show  = dlm_attr_show,
188 	.store = dlm_attr_store,
189 };
190 
191 static struct kobj_type dlm_ktype = {
192 	.default_attrs = dlm_attrs,
193 	.sysfs_ops     = &dlm_attr_ops,
194 	.release       = lockspace_kobj_release,
195 };
196 
197 static struct kset *dlm_kset;
198 
199 static int do_uevent(struct dlm_ls *ls, int in)
200 {
201 	int error;
202 
203 	if (in)
204 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
205 	else
206 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
207 
208 	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
209 
210 	/* dlm_controld will see the uevent, do the necessary group management
211 	   and then write to sysfs to wake us */
212 
213 	error = wait_event_interruptible(ls->ls_uevent_wait,
214 			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
215 
216 	log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
217 
218 	if (error)
219 		goto out;
220 
221 	error = ls->ls_uevent_result;
222  out:
223 	if (error)
224 		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
225 			  error, ls->ls_uevent_result);
226 	return error;
227 }
228 
229 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
230 		      struct kobj_uevent_env *env)
231 {
232 	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
233 
234 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
235 	return 0;
236 }
237 
238 static const struct kset_uevent_ops dlm_uevent_ops = {
239 	.uevent = dlm_uevent,
240 };
241 
242 int __init dlm_lockspace_init(void)
243 {
244 	ls_count = 0;
245 	mutex_init(&ls_lock);
246 	INIT_LIST_HEAD(&lslist);
247 	spin_lock_init(&lslist_lock);
248 
249 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
250 	if (!dlm_kset) {
251 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
252 		return -ENOMEM;
253 	}
254 	return 0;
255 }
256 
257 void dlm_lockspace_exit(void)
258 {
259 	kset_unregister(dlm_kset);
260 }
261 
262 static struct dlm_ls *find_ls_to_scan(void)
263 {
264 	struct dlm_ls *ls;
265 
266 	spin_lock(&lslist_lock);
267 	list_for_each_entry(ls, &lslist, ls_list) {
268 		if (time_after_eq(jiffies, ls->ls_scan_time +
269 					    dlm_config.ci_scan_secs * HZ)) {
270 			spin_unlock(&lslist_lock);
271 			return ls;
272 		}
273 	}
274 	spin_unlock(&lslist_lock);
275 	return NULL;
276 }
277 
278 static int dlm_scand(void *data)
279 {
280 	struct dlm_ls *ls;
281 
282 	while (!kthread_should_stop()) {
283 		ls = find_ls_to_scan();
284 		if (ls) {
285 			if (dlm_lock_recovery_try(ls)) {
286 				ls->ls_scan_time = jiffies;
287 				dlm_scan_rsbs(ls);
288 				dlm_scan_timeout(ls);
289 				dlm_scan_waiters(ls);
290 				dlm_unlock_recovery(ls);
291 			} else {
292 				ls->ls_scan_time += HZ;
293 			}
294 			continue;
295 		}
296 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
297 	}
298 	return 0;
299 }
300 
301 static int dlm_scand_start(void)
302 {
303 	struct task_struct *p;
304 	int error = 0;
305 
306 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
307 	if (IS_ERR(p))
308 		error = PTR_ERR(p);
309 	else
310 		scand_task = p;
311 	return error;
312 }
313 
314 static void dlm_scand_stop(void)
315 {
316 	kthread_stop(scand_task);
317 }
318 
319 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
320 {
321 	struct dlm_ls *ls;
322 
323 	spin_lock(&lslist_lock);
324 
325 	list_for_each_entry(ls, &lslist, ls_list) {
326 		if (ls->ls_global_id == id) {
327 			ls->ls_count++;
328 			goto out;
329 		}
330 	}
331 	ls = NULL;
332  out:
333 	spin_unlock(&lslist_lock);
334 	return ls;
335 }
336 
337 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
338 {
339 	struct dlm_ls *ls;
340 
341 	spin_lock(&lslist_lock);
342 	list_for_each_entry(ls, &lslist, ls_list) {
343 		if (ls->ls_local_handle == lockspace) {
344 			ls->ls_count++;
345 			goto out;
346 		}
347 	}
348 	ls = NULL;
349  out:
350 	spin_unlock(&lslist_lock);
351 	return ls;
352 }
353 
354 struct dlm_ls *dlm_find_lockspace_device(int minor)
355 {
356 	struct dlm_ls *ls;
357 
358 	spin_lock(&lslist_lock);
359 	list_for_each_entry(ls, &lslist, ls_list) {
360 		if (ls->ls_device.minor == minor) {
361 			ls->ls_count++;
362 			goto out;
363 		}
364 	}
365 	ls = NULL;
366  out:
367 	spin_unlock(&lslist_lock);
368 	return ls;
369 }
370 
371 void dlm_put_lockspace(struct dlm_ls *ls)
372 {
373 	spin_lock(&lslist_lock);
374 	ls->ls_count--;
375 	spin_unlock(&lslist_lock);
376 }
377 
378 static void remove_lockspace(struct dlm_ls *ls)
379 {
380 	for (;;) {
381 		spin_lock(&lslist_lock);
382 		if (ls->ls_count == 0) {
383 			WARN_ON(ls->ls_create_count != 0);
384 			list_del(&ls->ls_list);
385 			spin_unlock(&lslist_lock);
386 			return;
387 		}
388 		spin_unlock(&lslist_lock);
389 		ssleep(1);
390 	}
391 }
392 
393 static int threads_start(void)
394 {
395 	int error;
396 
397 	error = dlm_scand_start();
398 	if (error) {
399 		log_print("cannot start dlm_scand thread %d", error);
400 		goto fail;
401 	}
402 
403 	/* Thread for sending/receiving messages for all lockspace's */
404 	error = dlm_lowcomms_start();
405 	if (error) {
406 		log_print("cannot start dlm lowcomms %d", error);
407 		goto scand_fail;
408 	}
409 
410 	return 0;
411 
412  scand_fail:
413 	dlm_scand_stop();
414  fail:
415 	return error;
416 }
417 
418 static void threads_stop(void)
419 {
420 	dlm_scand_stop();
421 	dlm_lowcomms_stop();
422 }
423 
424 static int new_lockspace(const char *name, const char *cluster,
425 			 uint32_t flags, int lvblen,
426 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
427 			 int *ops_result, dlm_lockspace_t **lockspace)
428 {
429 	struct dlm_ls *ls;
430 	int i, size, error;
431 	int do_unreg = 0;
432 	int namelen = strlen(name);
433 
434 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
435 		return -EINVAL;
436 
437 	if (!lvblen || (lvblen % 8))
438 		return -EINVAL;
439 
440 	if (!try_module_get(THIS_MODULE))
441 		return -EINVAL;
442 
443 	if (!dlm_user_daemon_available()) {
444 		log_print("dlm user daemon not available");
445 		error = -EUNATCH;
446 		goto out;
447 	}
448 
449 	if (ops && ops_result) {
450 	       	if (!dlm_config.ci_recover_callbacks)
451 			*ops_result = -EOPNOTSUPP;
452 		else
453 			*ops_result = 0;
454 	}
455 
456 	if (!cluster)
457 		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
458 			  dlm_config.ci_cluster_name);
459 
460 	if (dlm_config.ci_recover_callbacks && cluster &&
461 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
462 		log_print("dlm cluster name '%s' does not match "
463 			  "the application cluster name '%s'",
464 			  dlm_config.ci_cluster_name, cluster);
465 		error = -EBADR;
466 		goto out;
467 	}
468 
469 	error = 0;
470 
471 	spin_lock(&lslist_lock);
472 	list_for_each_entry(ls, &lslist, ls_list) {
473 		WARN_ON(ls->ls_create_count <= 0);
474 		if (ls->ls_namelen != namelen)
475 			continue;
476 		if (memcmp(ls->ls_name, name, namelen))
477 			continue;
478 		if (flags & DLM_LSFL_NEWEXCL) {
479 			error = -EEXIST;
480 			break;
481 		}
482 		ls->ls_create_count++;
483 		*lockspace = ls;
484 		error = 1;
485 		break;
486 	}
487 	spin_unlock(&lslist_lock);
488 
489 	if (error)
490 		goto out;
491 
492 	error = -ENOMEM;
493 
494 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
495 	if (!ls)
496 		goto out;
497 	memcpy(ls->ls_name, name, namelen);
498 	ls->ls_namelen = namelen;
499 	ls->ls_lvblen = lvblen;
500 	ls->ls_count = 0;
501 	ls->ls_flags = 0;
502 	ls->ls_scan_time = jiffies;
503 
504 	if (ops && dlm_config.ci_recover_callbacks) {
505 		ls->ls_ops = ops;
506 		ls->ls_ops_arg = ops_arg;
507 	}
508 
509 	if (flags & DLM_LSFL_TIMEWARN)
510 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
511 
512 	/* ls_exflags are forced to match among nodes, and we don't
513 	   need to require all nodes to have some flags set */
514 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
515 				    DLM_LSFL_NEWEXCL));
516 
517 	size = dlm_config.ci_rsbtbl_size;
518 	ls->ls_rsbtbl_size = size;
519 
520 	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
521 	if (!ls->ls_rsbtbl)
522 		goto out_lsfree;
523 	for (i = 0; i < size; i++) {
524 		ls->ls_rsbtbl[i].keep.rb_node = NULL;
525 		ls->ls_rsbtbl[i].toss.rb_node = NULL;
526 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
527 	}
528 
529 	spin_lock_init(&ls->ls_remove_spin);
530 
531 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
532 		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
533 						 GFP_KERNEL);
534 		if (!ls->ls_remove_names[i])
535 			goto out_rsbtbl;
536 	}
537 
538 	idr_init(&ls->ls_lkbidr);
539 	spin_lock_init(&ls->ls_lkbidr_spin);
540 
541 	INIT_LIST_HEAD(&ls->ls_waiters);
542 	mutex_init(&ls->ls_waiters_mutex);
543 	INIT_LIST_HEAD(&ls->ls_orphans);
544 	mutex_init(&ls->ls_orphans_mutex);
545 	INIT_LIST_HEAD(&ls->ls_timeout);
546 	mutex_init(&ls->ls_timeout_mutex);
547 
548 	INIT_LIST_HEAD(&ls->ls_new_rsb);
549 	spin_lock_init(&ls->ls_new_rsb_spin);
550 
551 	INIT_LIST_HEAD(&ls->ls_nodes);
552 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
553 	ls->ls_num_nodes = 0;
554 	ls->ls_low_nodeid = 0;
555 	ls->ls_total_weight = 0;
556 	ls->ls_node_array = NULL;
557 
558 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
559 	ls->ls_stub_rsb.res_ls = ls;
560 
561 	ls->ls_debug_rsb_dentry = NULL;
562 	ls->ls_debug_waiters_dentry = NULL;
563 
564 	init_waitqueue_head(&ls->ls_uevent_wait);
565 	ls->ls_uevent_result = 0;
566 	init_completion(&ls->ls_members_done);
567 	ls->ls_members_result = -1;
568 
569 	mutex_init(&ls->ls_cb_mutex);
570 	INIT_LIST_HEAD(&ls->ls_cb_delay);
571 
572 	ls->ls_recoverd_task = NULL;
573 	mutex_init(&ls->ls_recoverd_active);
574 	spin_lock_init(&ls->ls_recover_lock);
575 	spin_lock_init(&ls->ls_rcom_spin);
576 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
577 	ls->ls_recover_status = 0;
578 	ls->ls_recover_seq = 0;
579 	ls->ls_recover_args = NULL;
580 	init_rwsem(&ls->ls_in_recovery);
581 	init_rwsem(&ls->ls_recv_active);
582 	INIT_LIST_HEAD(&ls->ls_requestqueue);
583 	mutex_init(&ls->ls_requestqueue_mutex);
584 	mutex_init(&ls->ls_clear_proc_locks);
585 
586 	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
587 	if (!ls->ls_recover_buf)
588 		goto out_lkbidr;
589 
590 	ls->ls_slot = 0;
591 	ls->ls_num_slots = 0;
592 	ls->ls_slots_size = 0;
593 	ls->ls_slots = NULL;
594 
595 	INIT_LIST_HEAD(&ls->ls_recover_list);
596 	spin_lock_init(&ls->ls_recover_list_lock);
597 	idr_init(&ls->ls_recover_idr);
598 	spin_lock_init(&ls->ls_recover_idr_lock);
599 	ls->ls_recover_list_count = 0;
600 	ls->ls_local_handle = ls;
601 	init_waitqueue_head(&ls->ls_wait_general);
602 	INIT_LIST_HEAD(&ls->ls_root_list);
603 	init_rwsem(&ls->ls_root_sem);
604 
605 	spin_lock(&lslist_lock);
606 	ls->ls_create_count = 1;
607 	list_add(&ls->ls_list, &lslist);
608 	spin_unlock(&lslist_lock);
609 
610 	if (flags & DLM_LSFL_FS) {
611 		error = dlm_callback_start(ls);
612 		if (error) {
613 			log_error(ls, "can't start dlm_callback %d", error);
614 			goto out_delist;
615 		}
616 	}
617 
618 	init_waitqueue_head(&ls->ls_recover_lock_wait);
619 
620 	/*
621 	 * Once started, dlm_recoverd first looks for ls in lslist, then
622 	 * initializes ls_in_recovery as locked in "down" mode.  We need
623 	 * to wait for the wakeup from dlm_recoverd because in_recovery
624 	 * has to start out in down mode.
625 	 */
626 
627 	error = dlm_recoverd_start(ls);
628 	if (error) {
629 		log_error(ls, "can't start dlm_recoverd %d", error);
630 		goto out_callback;
631 	}
632 
633 	wait_event(ls->ls_recover_lock_wait,
634 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
635 
636 	ls->ls_kobj.kset = dlm_kset;
637 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
638 				     "%s", ls->ls_name);
639 	if (error)
640 		goto out_recoverd;
641 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
642 
643 	/* let kobject handle freeing of ls if there's an error */
644 	do_unreg = 1;
645 
646 	/* This uevent triggers dlm_controld in userspace to add us to the
647 	   group of nodes that are members of this lockspace (managed by the
648 	   cluster infrastructure.)  Once it's done that, it tells us who the
649 	   current lockspace members are (via configfs) and then tells the
650 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
651 
652 	error = do_uevent(ls, 1);
653 	if (error)
654 		goto out_recoverd;
655 
656 	wait_for_completion(&ls->ls_members_done);
657 	error = ls->ls_members_result;
658 	if (error)
659 		goto out_members;
660 
661 	dlm_create_debug_file(ls);
662 
663 	log_rinfo(ls, "join complete");
664 	*lockspace = ls;
665 	return 0;
666 
667  out_members:
668 	do_uevent(ls, 0);
669 	dlm_clear_members(ls);
670 	kfree(ls->ls_node_array);
671  out_recoverd:
672 	dlm_recoverd_stop(ls);
673  out_callback:
674 	dlm_callback_stop(ls);
675  out_delist:
676 	spin_lock(&lslist_lock);
677 	list_del(&ls->ls_list);
678 	spin_unlock(&lslist_lock);
679 	idr_destroy(&ls->ls_recover_idr);
680 	kfree(ls->ls_recover_buf);
681  out_lkbidr:
682 	idr_destroy(&ls->ls_lkbidr);
683  out_rsbtbl:
684 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
685 		kfree(ls->ls_remove_names[i]);
686 	vfree(ls->ls_rsbtbl);
687  out_lsfree:
688 	if (do_unreg)
689 		kobject_put(&ls->ls_kobj);
690 	else
691 		kfree(ls);
692  out:
693 	module_put(THIS_MODULE);
694 	return error;
695 }
696 
697 int dlm_new_lockspace(const char *name, const char *cluster,
698 		      uint32_t flags, int lvblen,
699 		      const struct dlm_lockspace_ops *ops, void *ops_arg,
700 		      int *ops_result, dlm_lockspace_t **lockspace)
701 {
702 	int error = 0;
703 
704 	mutex_lock(&ls_lock);
705 	if (!ls_count)
706 		error = threads_start();
707 	if (error)
708 		goto out;
709 
710 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
711 			      ops_result, lockspace);
712 	if (!error)
713 		ls_count++;
714 	if (error > 0)
715 		error = 0;
716 	if (!ls_count)
717 		threads_stop();
718  out:
719 	mutex_unlock(&ls_lock);
720 	return error;
721 }
722 
723 static int lkb_idr_is_local(int id, void *p, void *data)
724 {
725 	struct dlm_lkb *lkb = p;
726 
727 	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
728 }
729 
730 static int lkb_idr_is_any(int id, void *p, void *data)
731 {
732 	return 1;
733 }
734 
735 static int lkb_idr_free(int id, void *p, void *data)
736 {
737 	struct dlm_lkb *lkb = p;
738 
739 	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
740 		dlm_free_lvb(lkb->lkb_lvbptr);
741 
742 	dlm_free_lkb(lkb);
743 	return 0;
744 }
745 
746 /* NOTE: We check the lkbidr here rather than the resource table.
747    This is because there may be LKBs queued as ASTs that have been unlinked
748    from their RSBs and are pending deletion once the AST has been delivered */
749 
750 static int lockspace_busy(struct dlm_ls *ls, int force)
751 {
752 	int rv;
753 
754 	spin_lock(&ls->ls_lkbidr_spin);
755 	if (force == 0) {
756 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
757 	} else if (force == 1) {
758 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
759 	} else {
760 		rv = 0;
761 	}
762 	spin_unlock(&ls->ls_lkbidr_spin);
763 	return rv;
764 }
765 
766 static int release_lockspace(struct dlm_ls *ls, int force)
767 {
768 	struct dlm_rsb *rsb;
769 	struct rb_node *n;
770 	int i, busy, rv;
771 
772 	busy = lockspace_busy(ls, force);
773 
774 	spin_lock(&lslist_lock);
775 	if (ls->ls_create_count == 1) {
776 		if (busy) {
777 			rv = -EBUSY;
778 		} else {
779 			/* remove_lockspace takes ls off lslist */
780 			ls->ls_create_count = 0;
781 			rv = 0;
782 		}
783 	} else if (ls->ls_create_count > 1) {
784 		rv = --ls->ls_create_count;
785 	} else {
786 		rv = -EINVAL;
787 	}
788 	spin_unlock(&lslist_lock);
789 
790 	if (rv) {
791 		log_debug(ls, "release_lockspace no remove %d", rv);
792 		return rv;
793 	}
794 
795 	dlm_device_deregister(ls);
796 
797 	if (force < 3 && dlm_user_daemon_available())
798 		do_uevent(ls, 0);
799 
800 	dlm_recoverd_stop(ls);
801 
802 	dlm_callback_stop(ls);
803 
804 	remove_lockspace(ls);
805 
806 	dlm_delete_debug_file(ls);
807 
808 	idr_destroy(&ls->ls_recover_idr);
809 	kfree(ls->ls_recover_buf);
810 
811 	/*
812 	 * Free all lkb's in idr
813 	 */
814 
815 	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
816 	idr_destroy(&ls->ls_lkbidr);
817 
818 	/*
819 	 * Free all rsb's on rsbtbl[] lists
820 	 */
821 
822 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
823 		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
824 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
825 			rb_erase(n, &ls->ls_rsbtbl[i].keep);
826 			dlm_free_rsb(rsb);
827 		}
828 
829 		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
830 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
831 			rb_erase(n, &ls->ls_rsbtbl[i].toss);
832 			dlm_free_rsb(rsb);
833 		}
834 	}
835 
836 	vfree(ls->ls_rsbtbl);
837 
838 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
839 		kfree(ls->ls_remove_names[i]);
840 
841 	while (!list_empty(&ls->ls_new_rsb)) {
842 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
843 				       res_hashchain);
844 		list_del(&rsb->res_hashchain);
845 		dlm_free_rsb(rsb);
846 	}
847 
848 	/*
849 	 * Free structures on any other lists
850 	 */
851 
852 	dlm_purge_requestqueue(ls);
853 	kfree(ls->ls_recover_args);
854 	dlm_clear_members(ls);
855 	dlm_clear_members_gone(ls);
856 	kfree(ls->ls_node_array);
857 	log_rinfo(ls, "release_lockspace final free");
858 	kobject_put(&ls->ls_kobj);
859 	/* The ls structure will be freed when the kobject is done with */
860 
861 	module_put(THIS_MODULE);
862 	return 0;
863 }
864 
865 /*
866  * Called when a system has released all its locks and is not going to use the
867  * lockspace any longer.  We free everything we're managing for this lockspace.
868  * Remaining nodes will go through the recovery process as if we'd died.  The
869  * lockspace must continue to function as usual, participating in recoveries,
870  * until this returns.
871  *
872  * Force has 4 possible values:
873  * 0 - don't destroy locksapce if it has any LKBs
874  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
875  * 2 - destroy lockspace regardless of LKBs
876  * 3 - destroy lockspace as part of a forced shutdown
877  */
878 
879 int dlm_release_lockspace(void *lockspace, int force)
880 {
881 	struct dlm_ls *ls;
882 	int error;
883 
884 	ls = dlm_find_lockspace_local(lockspace);
885 	if (!ls)
886 		return -EINVAL;
887 	dlm_put_lockspace(ls);
888 
889 	mutex_lock(&ls_lock);
890 	error = release_lockspace(ls, force);
891 	if (!error)
892 		ls_count--;
893 	if (!ls_count)
894 		threads_stop();
895 	mutex_unlock(&ls_lock);
896 
897 	return error;
898 }
899 
900 void dlm_stop_lockspaces(void)
901 {
902 	struct dlm_ls *ls;
903 	int count;
904 
905  restart:
906 	count = 0;
907 	spin_lock(&lslist_lock);
908 	list_for_each_entry(ls, &lslist, ls_list) {
909 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
910 			count++;
911 			continue;
912 		}
913 		spin_unlock(&lslist_lock);
914 		log_error(ls, "no userland control daemon, stopping lockspace");
915 		dlm_ls_stop(ls);
916 		goto restart;
917 	}
918 	spin_unlock(&lslist_lock);
919 
920 	if (count)
921 		log_print("dlm user daemon left %d lockspaces", count);
922 }
923 
924