xref: /openbmc/linux/fs/dlm/lockspace.c (revision 22d55f02)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include <linux/module.h>
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 static struct task_struct *	scand_task;
33 
34 
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37 	ssize_t ret = len;
38 	int n;
39 	int rc = kstrtoint(buf, 0, &n);
40 
41 	if (rc)
42 		return rc;
43 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
44 	if (!ls)
45 		return -EINVAL;
46 
47 	switch (n) {
48 	case 0:
49 		dlm_ls_stop(ls);
50 		break;
51 	case 1:
52 		dlm_ls_start(ls);
53 		break;
54 	default:
55 		ret = -EINVAL;
56 	}
57 	dlm_put_lockspace(ls);
58 	return ret;
59 }
60 
61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62 {
63 	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64 
65 	if (rc)
66 		return rc;
67 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68 	wake_up(&ls->ls_uevent_wait);
69 	return len;
70 }
71 
72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73 {
74 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75 }
76 
77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78 {
79 	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80 
81 	if (rc)
82 		return rc;
83 	return len;
84 }
85 
86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87 {
88 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89 }
90 
91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92 {
93 	int val;
94 	int rc = kstrtoint(buf, 0, &val);
95 
96 	if (rc)
97 		return rc;
98 	if (val == 1)
99 		set_bit(LSFL_NODIR, &ls->ls_flags);
100 	return len;
101 }
102 
103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104 {
105 	uint32_t status = dlm_recover_status(ls);
106 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
107 }
108 
109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110 {
111 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112 }
113 
114 struct dlm_attr {
115 	struct attribute attr;
116 	ssize_t (*show)(struct dlm_ls *, char *);
117 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118 };
119 
120 static struct dlm_attr dlm_attr_control = {
121 	.attr  = {.name = "control", .mode = S_IWUSR},
122 	.store = dlm_control_store
123 };
124 
125 static struct dlm_attr dlm_attr_event = {
126 	.attr  = {.name = "event_done", .mode = S_IWUSR},
127 	.store = dlm_event_store
128 };
129 
130 static struct dlm_attr dlm_attr_id = {
131 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132 	.show  = dlm_id_show,
133 	.store = dlm_id_store
134 };
135 
136 static struct dlm_attr dlm_attr_nodir = {
137 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138 	.show  = dlm_nodir_show,
139 	.store = dlm_nodir_store
140 };
141 
142 static struct dlm_attr dlm_attr_recover_status = {
143 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
144 	.show  = dlm_recover_status_show
145 };
146 
147 static struct dlm_attr dlm_attr_recover_nodeid = {
148 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149 	.show  = dlm_recover_nodeid_show
150 };
151 
152 static struct attribute *dlm_attrs[] = {
153 	&dlm_attr_control.attr,
154 	&dlm_attr_event.attr,
155 	&dlm_attr_id.attr,
156 	&dlm_attr_nodir.attr,
157 	&dlm_attr_recover_status.attr,
158 	&dlm_attr_recover_nodeid.attr,
159 	NULL,
160 };
161 
162 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
163 			     char *buf)
164 {
165 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
166 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
167 	return a->show ? a->show(ls, buf) : 0;
168 }
169 
170 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
171 			      const char *buf, size_t len)
172 {
173 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
174 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
175 	return a->store ? a->store(ls, buf, len) : len;
176 }
177 
178 static void lockspace_kobj_release(struct kobject *k)
179 {
180 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
181 	kfree(ls);
182 }
183 
184 static const struct sysfs_ops dlm_attr_ops = {
185 	.show  = dlm_attr_show,
186 	.store = dlm_attr_store,
187 };
188 
189 static struct kobj_type dlm_ktype = {
190 	.default_attrs = dlm_attrs,
191 	.sysfs_ops     = &dlm_attr_ops,
192 	.release       = lockspace_kobj_release,
193 };
194 
195 static struct kset *dlm_kset;
196 
197 static int do_uevent(struct dlm_ls *ls, int in)
198 {
199 	int error;
200 
201 	if (in)
202 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
203 	else
204 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
205 
206 	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
207 
208 	/* dlm_controld will see the uevent, do the necessary group management
209 	   and then write to sysfs to wake us */
210 
211 	error = wait_event_interruptible(ls->ls_uevent_wait,
212 			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
213 
214 	log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
215 
216 	if (error)
217 		goto out;
218 
219 	error = ls->ls_uevent_result;
220  out:
221 	if (error)
222 		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
223 			  error, ls->ls_uevent_result);
224 	return error;
225 }
226 
227 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
228 		      struct kobj_uevent_env *env)
229 {
230 	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
231 
232 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
233 	return 0;
234 }
235 
236 static const struct kset_uevent_ops dlm_uevent_ops = {
237 	.uevent = dlm_uevent,
238 };
239 
240 int __init dlm_lockspace_init(void)
241 {
242 	ls_count = 0;
243 	mutex_init(&ls_lock);
244 	INIT_LIST_HEAD(&lslist);
245 	spin_lock_init(&lslist_lock);
246 
247 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
248 	if (!dlm_kset) {
249 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
250 		return -ENOMEM;
251 	}
252 	return 0;
253 }
254 
255 void dlm_lockspace_exit(void)
256 {
257 	kset_unregister(dlm_kset);
258 }
259 
260 static struct dlm_ls *find_ls_to_scan(void)
261 {
262 	struct dlm_ls *ls;
263 
264 	spin_lock(&lslist_lock);
265 	list_for_each_entry(ls, &lslist, ls_list) {
266 		if (time_after_eq(jiffies, ls->ls_scan_time +
267 					    dlm_config.ci_scan_secs * HZ)) {
268 			spin_unlock(&lslist_lock);
269 			return ls;
270 		}
271 	}
272 	spin_unlock(&lslist_lock);
273 	return NULL;
274 }
275 
276 static int dlm_scand(void *data)
277 {
278 	struct dlm_ls *ls;
279 
280 	while (!kthread_should_stop()) {
281 		ls = find_ls_to_scan();
282 		if (ls) {
283 			if (dlm_lock_recovery_try(ls)) {
284 				ls->ls_scan_time = jiffies;
285 				dlm_scan_rsbs(ls);
286 				dlm_scan_timeout(ls);
287 				dlm_scan_waiters(ls);
288 				dlm_unlock_recovery(ls);
289 			} else {
290 				ls->ls_scan_time += HZ;
291 			}
292 			continue;
293 		}
294 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
295 	}
296 	return 0;
297 }
298 
299 static int dlm_scand_start(void)
300 {
301 	struct task_struct *p;
302 	int error = 0;
303 
304 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
305 	if (IS_ERR(p))
306 		error = PTR_ERR(p);
307 	else
308 		scand_task = p;
309 	return error;
310 }
311 
312 static void dlm_scand_stop(void)
313 {
314 	kthread_stop(scand_task);
315 }
316 
317 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
318 {
319 	struct dlm_ls *ls;
320 
321 	spin_lock(&lslist_lock);
322 
323 	list_for_each_entry(ls, &lslist, ls_list) {
324 		if (ls->ls_global_id == id) {
325 			ls->ls_count++;
326 			goto out;
327 		}
328 	}
329 	ls = NULL;
330  out:
331 	spin_unlock(&lslist_lock);
332 	return ls;
333 }
334 
335 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
336 {
337 	struct dlm_ls *ls;
338 
339 	spin_lock(&lslist_lock);
340 	list_for_each_entry(ls, &lslist, ls_list) {
341 		if (ls->ls_local_handle == lockspace) {
342 			ls->ls_count++;
343 			goto out;
344 		}
345 	}
346 	ls = NULL;
347  out:
348 	spin_unlock(&lslist_lock);
349 	return ls;
350 }
351 
352 struct dlm_ls *dlm_find_lockspace_device(int minor)
353 {
354 	struct dlm_ls *ls;
355 
356 	spin_lock(&lslist_lock);
357 	list_for_each_entry(ls, &lslist, ls_list) {
358 		if (ls->ls_device.minor == minor) {
359 			ls->ls_count++;
360 			goto out;
361 		}
362 	}
363 	ls = NULL;
364  out:
365 	spin_unlock(&lslist_lock);
366 	return ls;
367 }
368 
369 void dlm_put_lockspace(struct dlm_ls *ls)
370 {
371 	spin_lock(&lslist_lock);
372 	ls->ls_count--;
373 	spin_unlock(&lslist_lock);
374 }
375 
376 static void remove_lockspace(struct dlm_ls *ls)
377 {
378 	for (;;) {
379 		spin_lock(&lslist_lock);
380 		if (ls->ls_count == 0) {
381 			WARN_ON(ls->ls_create_count != 0);
382 			list_del(&ls->ls_list);
383 			spin_unlock(&lslist_lock);
384 			return;
385 		}
386 		spin_unlock(&lslist_lock);
387 		ssleep(1);
388 	}
389 }
390 
391 static int threads_start(void)
392 {
393 	int error;
394 
395 	error = dlm_scand_start();
396 	if (error) {
397 		log_print("cannot start dlm_scand thread %d", error);
398 		goto fail;
399 	}
400 
401 	/* Thread for sending/receiving messages for all lockspace's */
402 	error = dlm_lowcomms_start();
403 	if (error) {
404 		log_print("cannot start dlm lowcomms %d", error);
405 		goto scand_fail;
406 	}
407 
408 	return 0;
409 
410  scand_fail:
411 	dlm_scand_stop();
412  fail:
413 	return error;
414 }
415 
416 static void threads_stop(void)
417 {
418 	dlm_scand_stop();
419 	dlm_lowcomms_stop();
420 }
421 
422 static int new_lockspace(const char *name, const char *cluster,
423 			 uint32_t flags, int lvblen,
424 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
425 			 int *ops_result, dlm_lockspace_t **lockspace)
426 {
427 	struct dlm_ls *ls;
428 	int i, size, error;
429 	int do_unreg = 0;
430 	int namelen = strlen(name);
431 
432 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
433 		return -EINVAL;
434 
435 	if (!lvblen || (lvblen % 8))
436 		return -EINVAL;
437 
438 	if (!try_module_get(THIS_MODULE))
439 		return -EINVAL;
440 
441 	if (!dlm_user_daemon_available()) {
442 		log_print("dlm user daemon not available");
443 		error = -EUNATCH;
444 		goto out;
445 	}
446 
447 	if (ops && ops_result) {
448 	       	if (!dlm_config.ci_recover_callbacks)
449 			*ops_result = -EOPNOTSUPP;
450 		else
451 			*ops_result = 0;
452 	}
453 
454 	if (!cluster)
455 		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
456 			  dlm_config.ci_cluster_name);
457 
458 	if (dlm_config.ci_recover_callbacks && cluster &&
459 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
460 		log_print("dlm cluster name '%s' does not match "
461 			  "the application cluster name '%s'",
462 			  dlm_config.ci_cluster_name, cluster);
463 		error = -EBADR;
464 		goto out;
465 	}
466 
467 	error = 0;
468 
469 	spin_lock(&lslist_lock);
470 	list_for_each_entry(ls, &lslist, ls_list) {
471 		WARN_ON(ls->ls_create_count <= 0);
472 		if (ls->ls_namelen != namelen)
473 			continue;
474 		if (memcmp(ls->ls_name, name, namelen))
475 			continue;
476 		if (flags & DLM_LSFL_NEWEXCL) {
477 			error = -EEXIST;
478 			break;
479 		}
480 		ls->ls_create_count++;
481 		*lockspace = ls;
482 		error = 1;
483 		break;
484 	}
485 	spin_unlock(&lslist_lock);
486 
487 	if (error)
488 		goto out;
489 
490 	error = -ENOMEM;
491 
492 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
493 	if (!ls)
494 		goto out;
495 	memcpy(ls->ls_name, name, namelen);
496 	ls->ls_namelen = namelen;
497 	ls->ls_lvblen = lvblen;
498 	ls->ls_count = 0;
499 	ls->ls_flags = 0;
500 	ls->ls_scan_time = jiffies;
501 
502 	if (ops && dlm_config.ci_recover_callbacks) {
503 		ls->ls_ops = ops;
504 		ls->ls_ops_arg = ops_arg;
505 	}
506 
507 	if (flags & DLM_LSFL_TIMEWARN)
508 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
509 
510 	/* ls_exflags are forced to match among nodes, and we don't
511 	   need to require all nodes to have some flags set */
512 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
513 				    DLM_LSFL_NEWEXCL));
514 
515 	size = dlm_config.ci_rsbtbl_size;
516 	ls->ls_rsbtbl_size = size;
517 
518 	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
519 	if (!ls->ls_rsbtbl)
520 		goto out_lsfree;
521 	for (i = 0; i < size; i++) {
522 		ls->ls_rsbtbl[i].keep.rb_node = NULL;
523 		ls->ls_rsbtbl[i].toss.rb_node = NULL;
524 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
525 	}
526 
527 	spin_lock_init(&ls->ls_remove_spin);
528 
529 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
530 		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
531 						 GFP_KERNEL);
532 		if (!ls->ls_remove_names[i])
533 			goto out_rsbtbl;
534 	}
535 
536 	idr_init(&ls->ls_lkbidr);
537 	spin_lock_init(&ls->ls_lkbidr_spin);
538 
539 	INIT_LIST_HEAD(&ls->ls_waiters);
540 	mutex_init(&ls->ls_waiters_mutex);
541 	INIT_LIST_HEAD(&ls->ls_orphans);
542 	mutex_init(&ls->ls_orphans_mutex);
543 	INIT_LIST_HEAD(&ls->ls_timeout);
544 	mutex_init(&ls->ls_timeout_mutex);
545 
546 	INIT_LIST_HEAD(&ls->ls_new_rsb);
547 	spin_lock_init(&ls->ls_new_rsb_spin);
548 
549 	INIT_LIST_HEAD(&ls->ls_nodes);
550 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
551 	ls->ls_num_nodes = 0;
552 	ls->ls_low_nodeid = 0;
553 	ls->ls_total_weight = 0;
554 	ls->ls_node_array = NULL;
555 
556 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
557 	ls->ls_stub_rsb.res_ls = ls;
558 
559 	ls->ls_debug_rsb_dentry = NULL;
560 	ls->ls_debug_waiters_dentry = NULL;
561 
562 	init_waitqueue_head(&ls->ls_uevent_wait);
563 	ls->ls_uevent_result = 0;
564 	init_completion(&ls->ls_members_done);
565 	ls->ls_members_result = -1;
566 
567 	mutex_init(&ls->ls_cb_mutex);
568 	INIT_LIST_HEAD(&ls->ls_cb_delay);
569 
570 	ls->ls_recoverd_task = NULL;
571 	mutex_init(&ls->ls_recoverd_active);
572 	spin_lock_init(&ls->ls_recover_lock);
573 	spin_lock_init(&ls->ls_rcom_spin);
574 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
575 	ls->ls_recover_status = 0;
576 	ls->ls_recover_seq = 0;
577 	ls->ls_recover_args = NULL;
578 	init_rwsem(&ls->ls_in_recovery);
579 	init_rwsem(&ls->ls_recv_active);
580 	INIT_LIST_HEAD(&ls->ls_requestqueue);
581 	mutex_init(&ls->ls_requestqueue_mutex);
582 	mutex_init(&ls->ls_clear_proc_locks);
583 
584 	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
585 	if (!ls->ls_recover_buf)
586 		goto out_lkbidr;
587 
588 	ls->ls_slot = 0;
589 	ls->ls_num_slots = 0;
590 	ls->ls_slots_size = 0;
591 	ls->ls_slots = NULL;
592 
593 	INIT_LIST_HEAD(&ls->ls_recover_list);
594 	spin_lock_init(&ls->ls_recover_list_lock);
595 	idr_init(&ls->ls_recover_idr);
596 	spin_lock_init(&ls->ls_recover_idr_lock);
597 	ls->ls_recover_list_count = 0;
598 	ls->ls_local_handle = ls;
599 	init_waitqueue_head(&ls->ls_wait_general);
600 	INIT_LIST_HEAD(&ls->ls_root_list);
601 	init_rwsem(&ls->ls_root_sem);
602 
603 	spin_lock(&lslist_lock);
604 	ls->ls_create_count = 1;
605 	list_add(&ls->ls_list, &lslist);
606 	spin_unlock(&lslist_lock);
607 
608 	if (flags & DLM_LSFL_FS) {
609 		error = dlm_callback_start(ls);
610 		if (error) {
611 			log_error(ls, "can't start dlm_callback %d", error);
612 			goto out_delist;
613 		}
614 	}
615 
616 	init_waitqueue_head(&ls->ls_recover_lock_wait);
617 
618 	/*
619 	 * Once started, dlm_recoverd first looks for ls in lslist, then
620 	 * initializes ls_in_recovery as locked in "down" mode.  We need
621 	 * to wait for the wakeup from dlm_recoverd because in_recovery
622 	 * has to start out in down mode.
623 	 */
624 
625 	error = dlm_recoverd_start(ls);
626 	if (error) {
627 		log_error(ls, "can't start dlm_recoverd %d", error);
628 		goto out_callback;
629 	}
630 
631 	wait_event(ls->ls_recover_lock_wait,
632 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
633 
634 	ls->ls_kobj.kset = dlm_kset;
635 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
636 				     "%s", ls->ls_name);
637 	if (error)
638 		goto out_recoverd;
639 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
640 
641 	/* let kobject handle freeing of ls if there's an error */
642 	do_unreg = 1;
643 
644 	/* This uevent triggers dlm_controld in userspace to add us to the
645 	   group of nodes that are members of this lockspace (managed by the
646 	   cluster infrastructure.)  Once it's done that, it tells us who the
647 	   current lockspace members are (via configfs) and then tells the
648 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
649 
650 	error = do_uevent(ls, 1);
651 	if (error)
652 		goto out_recoverd;
653 
654 	wait_for_completion(&ls->ls_members_done);
655 	error = ls->ls_members_result;
656 	if (error)
657 		goto out_members;
658 
659 	dlm_create_debug_file(ls);
660 
661 	log_rinfo(ls, "join complete");
662 	*lockspace = ls;
663 	return 0;
664 
665  out_members:
666 	do_uevent(ls, 0);
667 	dlm_clear_members(ls);
668 	kfree(ls->ls_node_array);
669  out_recoverd:
670 	dlm_recoverd_stop(ls);
671  out_callback:
672 	dlm_callback_stop(ls);
673  out_delist:
674 	spin_lock(&lslist_lock);
675 	list_del(&ls->ls_list);
676 	spin_unlock(&lslist_lock);
677 	idr_destroy(&ls->ls_recover_idr);
678 	kfree(ls->ls_recover_buf);
679  out_lkbidr:
680 	idr_destroy(&ls->ls_lkbidr);
681  out_rsbtbl:
682 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
683 		kfree(ls->ls_remove_names[i]);
684 	vfree(ls->ls_rsbtbl);
685  out_lsfree:
686 	if (do_unreg)
687 		kobject_put(&ls->ls_kobj);
688 	else
689 		kfree(ls);
690  out:
691 	module_put(THIS_MODULE);
692 	return error;
693 }
694 
695 int dlm_new_lockspace(const char *name, const char *cluster,
696 		      uint32_t flags, int lvblen,
697 		      const struct dlm_lockspace_ops *ops, void *ops_arg,
698 		      int *ops_result, dlm_lockspace_t **lockspace)
699 {
700 	int error = 0;
701 
702 	mutex_lock(&ls_lock);
703 	if (!ls_count)
704 		error = threads_start();
705 	if (error)
706 		goto out;
707 
708 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
709 			      ops_result, lockspace);
710 	if (!error)
711 		ls_count++;
712 	if (error > 0)
713 		error = 0;
714 	if (!ls_count)
715 		threads_stop();
716  out:
717 	mutex_unlock(&ls_lock);
718 	return error;
719 }
720 
721 static int lkb_idr_is_local(int id, void *p, void *data)
722 {
723 	struct dlm_lkb *lkb = p;
724 
725 	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
726 }
727 
728 static int lkb_idr_is_any(int id, void *p, void *data)
729 {
730 	return 1;
731 }
732 
733 static int lkb_idr_free(int id, void *p, void *data)
734 {
735 	struct dlm_lkb *lkb = p;
736 
737 	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
738 		dlm_free_lvb(lkb->lkb_lvbptr);
739 
740 	dlm_free_lkb(lkb);
741 	return 0;
742 }
743 
744 /* NOTE: We check the lkbidr here rather than the resource table.
745    This is because there may be LKBs queued as ASTs that have been unlinked
746    from their RSBs and are pending deletion once the AST has been delivered */
747 
748 static int lockspace_busy(struct dlm_ls *ls, int force)
749 {
750 	int rv;
751 
752 	spin_lock(&ls->ls_lkbidr_spin);
753 	if (force == 0) {
754 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
755 	} else if (force == 1) {
756 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
757 	} else {
758 		rv = 0;
759 	}
760 	spin_unlock(&ls->ls_lkbidr_spin);
761 	return rv;
762 }
763 
764 static int release_lockspace(struct dlm_ls *ls, int force)
765 {
766 	struct dlm_rsb *rsb;
767 	struct rb_node *n;
768 	int i, busy, rv;
769 
770 	busy = lockspace_busy(ls, force);
771 
772 	spin_lock(&lslist_lock);
773 	if (ls->ls_create_count == 1) {
774 		if (busy) {
775 			rv = -EBUSY;
776 		} else {
777 			/* remove_lockspace takes ls off lslist */
778 			ls->ls_create_count = 0;
779 			rv = 0;
780 		}
781 	} else if (ls->ls_create_count > 1) {
782 		rv = --ls->ls_create_count;
783 	} else {
784 		rv = -EINVAL;
785 	}
786 	spin_unlock(&lslist_lock);
787 
788 	if (rv) {
789 		log_debug(ls, "release_lockspace no remove %d", rv);
790 		return rv;
791 	}
792 
793 	dlm_device_deregister(ls);
794 
795 	if (force < 3 && dlm_user_daemon_available())
796 		do_uevent(ls, 0);
797 
798 	dlm_recoverd_stop(ls);
799 
800 	dlm_callback_stop(ls);
801 
802 	remove_lockspace(ls);
803 
804 	dlm_delete_debug_file(ls);
805 
806 	idr_destroy(&ls->ls_recover_idr);
807 	kfree(ls->ls_recover_buf);
808 
809 	/*
810 	 * Free all lkb's in idr
811 	 */
812 
813 	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
814 	idr_destroy(&ls->ls_lkbidr);
815 
816 	/*
817 	 * Free all rsb's on rsbtbl[] lists
818 	 */
819 
820 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
821 		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
822 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
823 			rb_erase(n, &ls->ls_rsbtbl[i].keep);
824 			dlm_free_rsb(rsb);
825 		}
826 
827 		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
828 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
829 			rb_erase(n, &ls->ls_rsbtbl[i].toss);
830 			dlm_free_rsb(rsb);
831 		}
832 	}
833 
834 	vfree(ls->ls_rsbtbl);
835 
836 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
837 		kfree(ls->ls_remove_names[i]);
838 
839 	while (!list_empty(&ls->ls_new_rsb)) {
840 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
841 				       res_hashchain);
842 		list_del(&rsb->res_hashchain);
843 		dlm_free_rsb(rsb);
844 	}
845 
846 	/*
847 	 * Free structures on any other lists
848 	 */
849 
850 	dlm_purge_requestqueue(ls);
851 	kfree(ls->ls_recover_args);
852 	dlm_clear_members(ls);
853 	dlm_clear_members_gone(ls);
854 	kfree(ls->ls_node_array);
855 	log_rinfo(ls, "release_lockspace final free");
856 	kobject_put(&ls->ls_kobj);
857 	/* The ls structure will be freed when the kobject is done with */
858 
859 	module_put(THIS_MODULE);
860 	return 0;
861 }
862 
863 /*
864  * Called when a system has released all its locks and is not going to use the
865  * lockspace any longer.  We free everything we're managing for this lockspace.
866  * Remaining nodes will go through the recovery process as if we'd died.  The
867  * lockspace must continue to function as usual, participating in recoveries,
868  * until this returns.
869  *
870  * Force has 4 possible values:
871  * 0 - don't destroy locksapce if it has any LKBs
872  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
873  * 2 - destroy lockspace regardless of LKBs
874  * 3 - destroy lockspace as part of a forced shutdown
875  */
876 
877 int dlm_release_lockspace(void *lockspace, int force)
878 {
879 	struct dlm_ls *ls;
880 	int error;
881 
882 	ls = dlm_find_lockspace_local(lockspace);
883 	if (!ls)
884 		return -EINVAL;
885 	dlm_put_lockspace(ls);
886 
887 	mutex_lock(&ls_lock);
888 	error = release_lockspace(ls, force);
889 	if (!error)
890 		ls_count--;
891 	if (!ls_count)
892 		threads_stop();
893 	mutex_unlock(&ls_lock);
894 
895 	return error;
896 }
897 
898 void dlm_stop_lockspaces(void)
899 {
900 	struct dlm_ls *ls;
901 	int count;
902 
903  restart:
904 	count = 0;
905 	spin_lock(&lslist_lock);
906 	list_for_each_entry(ls, &lslist, ls_list) {
907 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
908 			count++;
909 			continue;
910 		}
911 		spin_unlock(&lslist_lock);
912 		log_error(ls, "no userland control daemon, stopping lockspace");
913 		dlm_ls_stop(ls);
914 		goto restart;
915 	}
916 	spin_unlock(&lslist_lock);
917 
918 	if (count)
919 		log_print("dlm user daemon left %d lockspaces", count);
920 }
921 
922