xref: /openbmc/linux/fs/dlm/lockspace.c (revision f9834f18)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include <linux/module.h>
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 static struct task_struct *	scand_task;
33 
34 
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37 	ssize_t ret = len;
38 	int n;
39 	int rc = kstrtoint(buf, 0, &n);
40 
41 	if (rc)
42 		return rc;
43 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
44 	if (!ls)
45 		return -EINVAL;
46 
47 	switch (n) {
48 	case 0:
49 		dlm_ls_stop(ls);
50 		break;
51 	case 1:
52 		dlm_ls_start(ls);
53 		break;
54 	default:
55 		ret = -EINVAL;
56 	}
57 	dlm_put_lockspace(ls);
58 	return ret;
59 }
60 
61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62 {
63 	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64 
65 	if (rc)
66 		return rc;
67 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68 	wake_up(&ls->ls_uevent_wait);
69 	return len;
70 }
71 
72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73 {
74 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75 }
76 
77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78 {
79 	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80 
81 	if (rc)
82 		return rc;
83 	return len;
84 }
85 
86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87 {
88 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89 }
90 
91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92 {
93 	int val;
94 	int rc = kstrtoint(buf, 0, &val);
95 
96 	if (rc)
97 		return rc;
98 	if (val == 1)
99 		set_bit(LSFL_NODIR, &ls->ls_flags);
100 	return len;
101 }
102 
103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104 {
105 	uint32_t status = dlm_recover_status(ls);
106 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
107 }
108 
109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110 {
111 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112 }
113 
114 struct dlm_attr {
115 	struct attribute attr;
116 	ssize_t (*show)(struct dlm_ls *, char *);
117 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118 };
119 
120 static struct dlm_attr dlm_attr_control = {
121 	.attr  = {.name = "control", .mode = S_IWUSR},
122 	.store = dlm_control_store
123 };
124 
125 static struct dlm_attr dlm_attr_event = {
126 	.attr  = {.name = "event_done", .mode = S_IWUSR},
127 	.store = dlm_event_store
128 };
129 
130 static struct dlm_attr dlm_attr_id = {
131 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132 	.show  = dlm_id_show,
133 	.store = dlm_id_store
134 };
135 
136 static struct dlm_attr dlm_attr_nodir = {
137 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138 	.show  = dlm_nodir_show,
139 	.store = dlm_nodir_store
140 };
141 
142 static struct dlm_attr dlm_attr_recover_status = {
143 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
144 	.show  = dlm_recover_status_show
145 };
146 
147 static struct dlm_attr dlm_attr_recover_nodeid = {
148 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149 	.show  = dlm_recover_nodeid_show
150 };
151 
152 static struct attribute *dlm_attrs[] = {
153 	&dlm_attr_control.attr,
154 	&dlm_attr_event.attr,
155 	&dlm_attr_id.attr,
156 	&dlm_attr_nodir.attr,
157 	&dlm_attr_recover_status.attr,
158 	&dlm_attr_recover_nodeid.attr,
159 	NULL,
160 };
161 ATTRIBUTE_GROUPS(dlm);
162 
163 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
164 			     char *buf)
165 {
166 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
167 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
168 	return a->show ? a->show(ls, buf) : 0;
169 }
170 
171 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
172 			      const char *buf, size_t len)
173 {
174 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
175 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
176 	return a->store ? a->store(ls, buf, len) : len;
177 }
178 
179 static void lockspace_kobj_release(struct kobject *k)
180 {
181 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
182 	kfree(ls);
183 }
184 
185 static const struct sysfs_ops dlm_attr_ops = {
186 	.show  = dlm_attr_show,
187 	.store = dlm_attr_store,
188 };
189 
190 static struct kobj_type dlm_ktype = {
191 	.default_groups = dlm_groups,
192 	.sysfs_ops     = &dlm_attr_ops,
193 	.release       = lockspace_kobj_release,
194 };
195 
196 static struct kset *dlm_kset;
197 
198 static int do_uevent(struct dlm_ls *ls, int in)
199 {
200 	int error;
201 
202 	if (in)
203 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
204 	else
205 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
206 
207 	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
208 
209 	/* dlm_controld will see the uevent, do the necessary group management
210 	   and then write to sysfs to wake us */
211 
212 	error = wait_event_interruptible(ls->ls_uevent_wait,
213 			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
214 
215 	log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
216 
217 	if (error)
218 		goto out;
219 
220 	error = ls->ls_uevent_result;
221  out:
222 	if (error)
223 		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
224 			  error, ls->ls_uevent_result);
225 	return error;
226 }
227 
228 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
229 		      struct kobj_uevent_env *env)
230 {
231 	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
232 
233 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
234 	return 0;
235 }
236 
237 static const struct kset_uevent_ops dlm_uevent_ops = {
238 	.uevent = dlm_uevent,
239 };
240 
241 int __init dlm_lockspace_init(void)
242 {
243 	ls_count = 0;
244 	mutex_init(&ls_lock);
245 	INIT_LIST_HEAD(&lslist);
246 	spin_lock_init(&lslist_lock);
247 
248 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
249 	if (!dlm_kset) {
250 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
251 		return -ENOMEM;
252 	}
253 	return 0;
254 }
255 
256 void dlm_lockspace_exit(void)
257 {
258 	kset_unregister(dlm_kset);
259 }
260 
261 static struct dlm_ls *find_ls_to_scan(void)
262 {
263 	struct dlm_ls *ls;
264 
265 	spin_lock(&lslist_lock);
266 	list_for_each_entry(ls, &lslist, ls_list) {
267 		if (time_after_eq(jiffies, ls->ls_scan_time +
268 					    dlm_config.ci_scan_secs * HZ)) {
269 			spin_unlock(&lslist_lock);
270 			return ls;
271 		}
272 	}
273 	spin_unlock(&lslist_lock);
274 	return NULL;
275 }
276 
277 static int dlm_scand(void *data)
278 {
279 	struct dlm_ls *ls;
280 
281 	while (!kthread_should_stop()) {
282 		ls = find_ls_to_scan();
283 		if (ls) {
284 			if (dlm_lock_recovery_try(ls)) {
285 				ls->ls_scan_time = jiffies;
286 				dlm_scan_rsbs(ls);
287 				dlm_scan_timeout(ls);
288 				dlm_scan_waiters(ls);
289 				dlm_unlock_recovery(ls);
290 			} else {
291 				ls->ls_scan_time += HZ;
292 			}
293 			continue;
294 		}
295 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
296 	}
297 	return 0;
298 }
299 
300 static int dlm_scand_start(void)
301 {
302 	struct task_struct *p;
303 	int error = 0;
304 
305 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
306 	if (IS_ERR(p))
307 		error = PTR_ERR(p);
308 	else
309 		scand_task = p;
310 	return error;
311 }
312 
313 static void dlm_scand_stop(void)
314 {
315 	kthread_stop(scand_task);
316 }
317 
318 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
319 {
320 	struct dlm_ls *ls;
321 
322 	spin_lock(&lslist_lock);
323 
324 	list_for_each_entry(ls, &lslist, ls_list) {
325 		if (ls->ls_global_id == id) {
326 			ls->ls_count++;
327 			goto out;
328 		}
329 	}
330 	ls = NULL;
331  out:
332 	spin_unlock(&lslist_lock);
333 	return ls;
334 }
335 
336 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
337 {
338 	struct dlm_ls *ls;
339 
340 	spin_lock(&lslist_lock);
341 	list_for_each_entry(ls, &lslist, ls_list) {
342 		if (ls->ls_local_handle == lockspace) {
343 			ls->ls_count++;
344 			goto out;
345 		}
346 	}
347 	ls = NULL;
348  out:
349 	spin_unlock(&lslist_lock);
350 	return ls;
351 }
352 
353 struct dlm_ls *dlm_find_lockspace_device(int minor)
354 {
355 	struct dlm_ls *ls;
356 
357 	spin_lock(&lslist_lock);
358 	list_for_each_entry(ls, &lslist, ls_list) {
359 		if (ls->ls_device.minor == minor) {
360 			ls->ls_count++;
361 			goto out;
362 		}
363 	}
364 	ls = NULL;
365  out:
366 	spin_unlock(&lslist_lock);
367 	return ls;
368 }
369 
370 void dlm_put_lockspace(struct dlm_ls *ls)
371 {
372 	spin_lock(&lslist_lock);
373 	ls->ls_count--;
374 	spin_unlock(&lslist_lock);
375 }
376 
377 static void remove_lockspace(struct dlm_ls *ls)
378 {
379 	for (;;) {
380 		spin_lock(&lslist_lock);
381 		if (ls->ls_count == 0) {
382 			WARN_ON(ls->ls_create_count != 0);
383 			list_del(&ls->ls_list);
384 			spin_unlock(&lslist_lock);
385 			return;
386 		}
387 		spin_unlock(&lslist_lock);
388 		ssleep(1);
389 	}
390 }
391 
392 static int threads_start(void)
393 {
394 	int error;
395 
396 	error = dlm_scand_start();
397 	if (error) {
398 		log_print("cannot start dlm_scand thread %d", error);
399 		goto fail;
400 	}
401 
402 	/* Thread for sending/receiving messages for all lockspace's */
403 	error = dlm_lowcomms_start();
404 	if (error) {
405 		log_print("cannot start dlm lowcomms %d", error);
406 		goto scand_fail;
407 	}
408 
409 	return 0;
410 
411  scand_fail:
412 	dlm_scand_stop();
413  fail:
414 	return error;
415 }
416 
417 static void threads_stop(void)
418 {
419 	dlm_scand_stop();
420 	dlm_lowcomms_stop();
421 }
422 
423 static int new_lockspace(const char *name, const char *cluster,
424 			 uint32_t flags, int lvblen,
425 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
426 			 int *ops_result, dlm_lockspace_t **lockspace)
427 {
428 	struct dlm_ls *ls;
429 	int i, size, error;
430 	int do_unreg = 0;
431 	int namelen = strlen(name);
432 
433 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
434 		return -EINVAL;
435 
436 	if (!lvblen || (lvblen % 8))
437 		return -EINVAL;
438 
439 	if (!try_module_get(THIS_MODULE))
440 		return -EINVAL;
441 
442 	if (!dlm_user_daemon_available()) {
443 		log_print("dlm user daemon not available");
444 		error = -EUNATCH;
445 		goto out;
446 	}
447 
448 	if (ops && ops_result) {
449 	       	if (!dlm_config.ci_recover_callbacks)
450 			*ops_result = -EOPNOTSUPP;
451 		else
452 			*ops_result = 0;
453 	}
454 
455 	if (!cluster)
456 		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
457 			  dlm_config.ci_cluster_name);
458 
459 	if (dlm_config.ci_recover_callbacks && cluster &&
460 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
461 		log_print("dlm cluster name '%s' does not match "
462 			  "the application cluster name '%s'",
463 			  dlm_config.ci_cluster_name, cluster);
464 		error = -EBADR;
465 		goto out;
466 	}
467 
468 	error = 0;
469 
470 	spin_lock(&lslist_lock);
471 	list_for_each_entry(ls, &lslist, ls_list) {
472 		WARN_ON(ls->ls_create_count <= 0);
473 		if (ls->ls_namelen != namelen)
474 			continue;
475 		if (memcmp(ls->ls_name, name, namelen))
476 			continue;
477 		if (flags & DLM_LSFL_NEWEXCL) {
478 			error = -EEXIST;
479 			break;
480 		}
481 		ls->ls_create_count++;
482 		*lockspace = ls;
483 		error = 1;
484 		break;
485 	}
486 	spin_unlock(&lslist_lock);
487 
488 	if (error)
489 		goto out;
490 
491 	error = -ENOMEM;
492 
493 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
494 	if (!ls)
495 		goto out;
496 	memcpy(ls->ls_name, name, namelen);
497 	ls->ls_namelen = namelen;
498 	ls->ls_lvblen = lvblen;
499 	ls->ls_count = 0;
500 	ls->ls_flags = 0;
501 	ls->ls_scan_time = jiffies;
502 
503 	if (ops && dlm_config.ci_recover_callbacks) {
504 		ls->ls_ops = ops;
505 		ls->ls_ops_arg = ops_arg;
506 	}
507 
508 	if (flags & DLM_LSFL_TIMEWARN)
509 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
510 
511 	/* ls_exflags are forced to match among nodes, and we don't
512 	   need to require all nodes to have some flags set */
513 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
514 				    DLM_LSFL_NEWEXCL));
515 
516 	size = dlm_config.ci_rsbtbl_size;
517 	ls->ls_rsbtbl_size = size;
518 
519 	ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
520 	if (!ls->ls_rsbtbl)
521 		goto out_lsfree;
522 	for (i = 0; i < size; i++) {
523 		ls->ls_rsbtbl[i].keep.rb_node = NULL;
524 		ls->ls_rsbtbl[i].toss.rb_node = NULL;
525 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
526 	}
527 
528 	spin_lock_init(&ls->ls_remove_spin);
529 
530 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
531 		ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
532 						 GFP_KERNEL);
533 		if (!ls->ls_remove_names[i])
534 			goto out_rsbtbl;
535 	}
536 
537 	idr_init(&ls->ls_lkbidr);
538 	spin_lock_init(&ls->ls_lkbidr_spin);
539 
540 	INIT_LIST_HEAD(&ls->ls_waiters);
541 	mutex_init(&ls->ls_waiters_mutex);
542 	INIT_LIST_HEAD(&ls->ls_orphans);
543 	mutex_init(&ls->ls_orphans_mutex);
544 	INIT_LIST_HEAD(&ls->ls_timeout);
545 	mutex_init(&ls->ls_timeout_mutex);
546 
547 	INIT_LIST_HEAD(&ls->ls_new_rsb);
548 	spin_lock_init(&ls->ls_new_rsb_spin);
549 
550 	INIT_LIST_HEAD(&ls->ls_nodes);
551 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
552 	ls->ls_num_nodes = 0;
553 	ls->ls_low_nodeid = 0;
554 	ls->ls_total_weight = 0;
555 	ls->ls_node_array = NULL;
556 
557 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
558 	ls->ls_stub_rsb.res_ls = ls;
559 
560 	ls->ls_debug_rsb_dentry = NULL;
561 	ls->ls_debug_waiters_dentry = NULL;
562 
563 	init_waitqueue_head(&ls->ls_uevent_wait);
564 	ls->ls_uevent_result = 0;
565 	init_completion(&ls->ls_members_done);
566 	ls->ls_members_result = -1;
567 
568 	mutex_init(&ls->ls_cb_mutex);
569 	INIT_LIST_HEAD(&ls->ls_cb_delay);
570 
571 	ls->ls_recoverd_task = NULL;
572 	mutex_init(&ls->ls_recoverd_active);
573 	spin_lock_init(&ls->ls_recover_lock);
574 	spin_lock_init(&ls->ls_rcom_spin);
575 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
576 	ls->ls_recover_status = 0;
577 	ls->ls_recover_seq = 0;
578 	ls->ls_recover_args = NULL;
579 	init_rwsem(&ls->ls_in_recovery);
580 	init_rwsem(&ls->ls_recv_active);
581 	INIT_LIST_HEAD(&ls->ls_requestqueue);
582 	mutex_init(&ls->ls_requestqueue_mutex);
583 	mutex_init(&ls->ls_clear_proc_locks);
584 
585 	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
586 	if (!ls->ls_recover_buf)
587 		goto out_lkbidr;
588 
589 	ls->ls_slot = 0;
590 	ls->ls_num_slots = 0;
591 	ls->ls_slots_size = 0;
592 	ls->ls_slots = NULL;
593 
594 	INIT_LIST_HEAD(&ls->ls_recover_list);
595 	spin_lock_init(&ls->ls_recover_list_lock);
596 	idr_init(&ls->ls_recover_idr);
597 	spin_lock_init(&ls->ls_recover_idr_lock);
598 	ls->ls_recover_list_count = 0;
599 	ls->ls_local_handle = ls;
600 	init_waitqueue_head(&ls->ls_wait_general);
601 	INIT_LIST_HEAD(&ls->ls_root_list);
602 	init_rwsem(&ls->ls_root_sem);
603 
604 	spin_lock(&lslist_lock);
605 	ls->ls_create_count = 1;
606 	list_add(&ls->ls_list, &lslist);
607 	spin_unlock(&lslist_lock);
608 
609 	if (flags & DLM_LSFL_FS) {
610 		error = dlm_callback_start(ls);
611 		if (error) {
612 			log_error(ls, "can't start dlm_callback %d", error);
613 			goto out_delist;
614 		}
615 	}
616 
617 	init_waitqueue_head(&ls->ls_recover_lock_wait);
618 
619 	/*
620 	 * Once started, dlm_recoverd first looks for ls in lslist, then
621 	 * initializes ls_in_recovery as locked in "down" mode.  We need
622 	 * to wait for the wakeup from dlm_recoverd because in_recovery
623 	 * has to start out in down mode.
624 	 */
625 
626 	error = dlm_recoverd_start(ls);
627 	if (error) {
628 		log_error(ls, "can't start dlm_recoverd %d", error);
629 		goto out_callback;
630 	}
631 
632 	wait_event(ls->ls_recover_lock_wait,
633 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
634 
635 	ls->ls_kobj.kset = dlm_kset;
636 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
637 				     "%s", ls->ls_name);
638 	if (error)
639 		goto out_recoverd;
640 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
641 
642 	/* let kobject handle freeing of ls if there's an error */
643 	do_unreg = 1;
644 
645 	/* This uevent triggers dlm_controld in userspace to add us to the
646 	   group of nodes that are members of this lockspace (managed by the
647 	   cluster infrastructure.)  Once it's done that, it tells us who the
648 	   current lockspace members are (via configfs) and then tells the
649 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
650 
651 	error = do_uevent(ls, 1);
652 	if (error)
653 		goto out_recoverd;
654 
655 	wait_for_completion(&ls->ls_members_done);
656 	error = ls->ls_members_result;
657 	if (error)
658 		goto out_members;
659 
660 	dlm_create_debug_file(ls);
661 
662 	log_rinfo(ls, "join complete");
663 	*lockspace = ls;
664 	return 0;
665 
666  out_members:
667 	do_uevent(ls, 0);
668 	dlm_clear_members(ls);
669 	kfree(ls->ls_node_array);
670  out_recoverd:
671 	dlm_recoverd_stop(ls);
672  out_callback:
673 	dlm_callback_stop(ls);
674  out_delist:
675 	spin_lock(&lslist_lock);
676 	list_del(&ls->ls_list);
677 	spin_unlock(&lslist_lock);
678 	idr_destroy(&ls->ls_recover_idr);
679 	kfree(ls->ls_recover_buf);
680  out_lkbidr:
681 	idr_destroy(&ls->ls_lkbidr);
682  out_rsbtbl:
683 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
684 		kfree(ls->ls_remove_names[i]);
685 	vfree(ls->ls_rsbtbl);
686  out_lsfree:
687 	if (do_unreg)
688 		kobject_put(&ls->ls_kobj);
689 	else
690 		kfree(ls);
691  out:
692 	module_put(THIS_MODULE);
693 	return error;
694 }
695 
696 int dlm_new_lockspace(const char *name, const char *cluster,
697 		      uint32_t flags, int lvblen,
698 		      const struct dlm_lockspace_ops *ops, void *ops_arg,
699 		      int *ops_result, dlm_lockspace_t **lockspace)
700 {
701 	int error = 0;
702 
703 	mutex_lock(&ls_lock);
704 	if (!ls_count)
705 		error = threads_start();
706 	if (error)
707 		goto out;
708 
709 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
710 			      ops_result, lockspace);
711 	if (!error)
712 		ls_count++;
713 	if (error > 0)
714 		error = 0;
715 	if (!ls_count)
716 		threads_stop();
717  out:
718 	mutex_unlock(&ls_lock);
719 	return error;
720 }
721 
722 static int lkb_idr_is_local(int id, void *p, void *data)
723 {
724 	struct dlm_lkb *lkb = p;
725 
726 	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
727 }
728 
729 static int lkb_idr_is_any(int id, void *p, void *data)
730 {
731 	return 1;
732 }
733 
734 static int lkb_idr_free(int id, void *p, void *data)
735 {
736 	struct dlm_lkb *lkb = p;
737 
738 	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
739 		dlm_free_lvb(lkb->lkb_lvbptr);
740 
741 	dlm_free_lkb(lkb);
742 	return 0;
743 }
744 
745 /* NOTE: We check the lkbidr here rather than the resource table.
746    This is because there may be LKBs queued as ASTs that have been unlinked
747    from their RSBs and are pending deletion once the AST has been delivered */
748 
749 static int lockspace_busy(struct dlm_ls *ls, int force)
750 {
751 	int rv;
752 
753 	spin_lock(&ls->ls_lkbidr_spin);
754 	if (force == 0) {
755 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
756 	} else if (force == 1) {
757 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
758 	} else {
759 		rv = 0;
760 	}
761 	spin_unlock(&ls->ls_lkbidr_spin);
762 	return rv;
763 }
764 
765 static int release_lockspace(struct dlm_ls *ls, int force)
766 {
767 	struct dlm_rsb *rsb;
768 	struct rb_node *n;
769 	int i, busy, rv;
770 
771 	busy = lockspace_busy(ls, force);
772 
773 	spin_lock(&lslist_lock);
774 	if (ls->ls_create_count == 1) {
775 		if (busy) {
776 			rv = -EBUSY;
777 		} else {
778 			/* remove_lockspace takes ls off lslist */
779 			ls->ls_create_count = 0;
780 			rv = 0;
781 		}
782 	} else if (ls->ls_create_count > 1) {
783 		rv = --ls->ls_create_count;
784 	} else {
785 		rv = -EINVAL;
786 	}
787 	spin_unlock(&lslist_lock);
788 
789 	if (rv) {
790 		log_debug(ls, "release_lockspace no remove %d", rv);
791 		return rv;
792 	}
793 
794 	dlm_device_deregister(ls);
795 
796 	if (force < 3 && dlm_user_daemon_available())
797 		do_uevent(ls, 0);
798 
799 	dlm_recoverd_stop(ls);
800 
801 	dlm_callback_stop(ls);
802 
803 	remove_lockspace(ls);
804 
805 	dlm_delete_debug_file(ls);
806 
807 	idr_destroy(&ls->ls_recover_idr);
808 	kfree(ls->ls_recover_buf);
809 
810 	/*
811 	 * Free all lkb's in idr
812 	 */
813 
814 	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
815 	idr_destroy(&ls->ls_lkbidr);
816 
817 	/*
818 	 * Free all rsb's on rsbtbl[] lists
819 	 */
820 
821 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
822 		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
823 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
824 			rb_erase(n, &ls->ls_rsbtbl[i].keep);
825 			dlm_free_rsb(rsb);
826 		}
827 
828 		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
829 			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
830 			rb_erase(n, &ls->ls_rsbtbl[i].toss);
831 			dlm_free_rsb(rsb);
832 		}
833 	}
834 
835 	vfree(ls->ls_rsbtbl);
836 
837 	for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
838 		kfree(ls->ls_remove_names[i]);
839 
840 	while (!list_empty(&ls->ls_new_rsb)) {
841 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
842 				       res_hashchain);
843 		list_del(&rsb->res_hashchain);
844 		dlm_free_rsb(rsb);
845 	}
846 
847 	/*
848 	 * Free structures on any other lists
849 	 */
850 
851 	dlm_purge_requestqueue(ls);
852 	kfree(ls->ls_recover_args);
853 	dlm_clear_members(ls);
854 	dlm_clear_members_gone(ls);
855 	kfree(ls->ls_node_array);
856 	log_rinfo(ls, "release_lockspace final free");
857 	kobject_put(&ls->ls_kobj);
858 	/* The ls structure will be freed when the kobject is done with */
859 
860 	module_put(THIS_MODULE);
861 	return 0;
862 }
863 
864 /*
865  * Called when a system has released all its locks and is not going to use the
866  * lockspace any longer.  We free everything we're managing for this lockspace.
867  * Remaining nodes will go through the recovery process as if we'd died.  The
868  * lockspace must continue to function as usual, participating in recoveries,
869  * until this returns.
870  *
871  * Force has 4 possible values:
872  * 0 - don't destroy locksapce if it has any LKBs
873  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
874  * 2 - destroy lockspace regardless of LKBs
875  * 3 - destroy lockspace as part of a forced shutdown
876  */
877 
878 int dlm_release_lockspace(void *lockspace, int force)
879 {
880 	struct dlm_ls *ls;
881 	int error;
882 
883 	ls = dlm_find_lockspace_local(lockspace);
884 	if (!ls)
885 		return -EINVAL;
886 	dlm_put_lockspace(ls);
887 
888 	mutex_lock(&ls_lock);
889 	error = release_lockspace(ls, force);
890 	if (!error)
891 		ls_count--;
892 	if (!ls_count)
893 		threads_stop();
894 	mutex_unlock(&ls_lock);
895 
896 	return error;
897 }
898 
899 void dlm_stop_lockspaces(void)
900 {
901 	struct dlm_ls *ls;
902 	int count;
903 
904  restart:
905 	count = 0;
906 	spin_lock(&lslist_lock);
907 	list_for_each_entry(ls, &lslist, ls_list) {
908 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
909 			count++;
910 			continue;
911 		}
912 		spin_unlock(&lslist_lock);
913 		log_error(ls, "no userland control daemon, stopping lockspace");
914 		dlm_ls_stop(ls);
915 		goto restart;
916 	}
917 	spin_unlock(&lslist_lock);
918 
919 	if (count)
920 		log_print("dlm user daemon left %d lockspaces", count);
921 }
922 
923