xref: /openbmc/linux/fs/dlm/lockspace.c (revision 75f25bd3)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2008 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 static struct task_struct *	scand_task;
33 
34 
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37 	ssize_t ret = len;
38 	int n = simple_strtol(buf, NULL, 0);
39 
40 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
41 	if (!ls)
42 		return -EINVAL;
43 
44 	switch (n) {
45 	case 0:
46 		dlm_ls_stop(ls);
47 		break;
48 	case 1:
49 		dlm_ls_start(ls);
50 		break;
51 	default:
52 		ret = -EINVAL;
53 	}
54 	dlm_put_lockspace(ls);
55 	return ret;
56 }
57 
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60 	ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62 	wake_up(&ls->ls_uevent_wait);
63 	return len;
64 }
65 
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70 
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73 	ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74 	return len;
75 }
76 
77 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
78 {
79 	uint32_t status = dlm_recover_status(ls);
80 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
81 }
82 
83 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
84 {
85 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
86 }
87 
88 struct dlm_attr {
89 	struct attribute attr;
90 	ssize_t (*show)(struct dlm_ls *, char *);
91 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
92 };
93 
94 static struct dlm_attr dlm_attr_control = {
95 	.attr  = {.name = "control", .mode = S_IWUSR},
96 	.store = dlm_control_store
97 };
98 
99 static struct dlm_attr dlm_attr_event = {
100 	.attr  = {.name = "event_done", .mode = S_IWUSR},
101 	.store = dlm_event_store
102 };
103 
104 static struct dlm_attr dlm_attr_id = {
105 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
106 	.show  = dlm_id_show,
107 	.store = dlm_id_store
108 };
109 
110 static struct dlm_attr dlm_attr_recover_status = {
111 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
112 	.show  = dlm_recover_status_show
113 };
114 
115 static struct dlm_attr dlm_attr_recover_nodeid = {
116 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
117 	.show  = dlm_recover_nodeid_show
118 };
119 
120 static struct attribute *dlm_attrs[] = {
121 	&dlm_attr_control.attr,
122 	&dlm_attr_event.attr,
123 	&dlm_attr_id.attr,
124 	&dlm_attr_recover_status.attr,
125 	&dlm_attr_recover_nodeid.attr,
126 	NULL,
127 };
128 
129 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
130 			     char *buf)
131 {
132 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
133 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
134 	return a->show ? a->show(ls, buf) : 0;
135 }
136 
137 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
138 			      const char *buf, size_t len)
139 {
140 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
141 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
142 	return a->store ? a->store(ls, buf, len) : len;
143 }
144 
145 static void lockspace_kobj_release(struct kobject *k)
146 {
147 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
148 	kfree(ls);
149 }
150 
151 static const struct sysfs_ops dlm_attr_ops = {
152 	.show  = dlm_attr_show,
153 	.store = dlm_attr_store,
154 };
155 
156 static struct kobj_type dlm_ktype = {
157 	.default_attrs = dlm_attrs,
158 	.sysfs_ops     = &dlm_attr_ops,
159 	.release       = lockspace_kobj_release,
160 };
161 
162 static struct kset *dlm_kset;
163 
164 static int do_uevent(struct dlm_ls *ls, int in)
165 {
166 	int error;
167 
168 	if (in)
169 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
170 	else
171 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
172 
173 	log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
174 
175 	/* dlm_controld will see the uevent, do the necessary group management
176 	   and then write to sysfs to wake us */
177 
178 	error = wait_event_interruptible(ls->ls_uevent_wait,
179 			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
180 
181 	log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
182 
183 	if (error)
184 		goto out;
185 
186 	error = ls->ls_uevent_result;
187  out:
188 	if (error)
189 		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
190 			  error, ls->ls_uevent_result);
191 	return error;
192 }
193 
194 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
195 		      struct kobj_uevent_env *env)
196 {
197 	struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
198 
199 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
200 	return 0;
201 }
202 
203 static struct kset_uevent_ops dlm_uevent_ops = {
204 	.uevent = dlm_uevent,
205 };
206 
207 int __init dlm_lockspace_init(void)
208 {
209 	ls_count = 0;
210 	mutex_init(&ls_lock);
211 	INIT_LIST_HEAD(&lslist);
212 	spin_lock_init(&lslist_lock);
213 
214 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
215 	if (!dlm_kset) {
216 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
217 		return -ENOMEM;
218 	}
219 	return 0;
220 }
221 
222 void dlm_lockspace_exit(void)
223 {
224 	kset_unregister(dlm_kset);
225 }
226 
227 static struct dlm_ls *find_ls_to_scan(void)
228 {
229 	struct dlm_ls *ls;
230 
231 	spin_lock(&lslist_lock);
232 	list_for_each_entry(ls, &lslist, ls_list) {
233 		if (time_after_eq(jiffies, ls->ls_scan_time +
234 					    dlm_config.ci_scan_secs * HZ)) {
235 			spin_unlock(&lslist_lock);
236 			return ls;
237 		}
238 	}
239 	spin_unlock(&lslist_lock);
240 	return NULL;
241 }
242 
243 static int dlm_scand(void *data)
244 {
245 	struct dlm_ls *ls;
246 
247 	while (!kthread_should_stop()) {
248 		ls = find_ls_to_scan();
249 		if (ls) {
250 			if (dlm_lock_recovery_try(ls)) {
251 				ls->ls_scan_time = jiffies;
252 				dlm_scan_rsbs(ls);
253 				dlm_scan_timeout(ls);
254 				dlm_scan_waiters(ls);
255 				dlm_unlock_recovery(ls);
256 			} else {
257 				ls->ls_scan_time += HZ;
258 			}
259 			continue;
260 		}
261 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
262 	}
263 	return 0;
264 }
265 
266 static int dlm_scand_start(void)
267 {
268 	struct task_struct *p;
269 	int error = 0;
270 
271 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
272 	if (IS_ERR(p))
273 		error = PTR_ERR(p);
274 	else
275 		scand_task = p;
276 	return error;
277 }
278 
279 static void dlm_scand_stop(void)
280 {
281 	kthread_stop(scand_task);
282 }
283 
284 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
285 {
286 	struct dlm_ls *ls;
287 
288 	spin_lock(&lslist_lock);
289 
290 	list_for_each_entry(ls, &lslist, ls_list) {
291 		if (ls->ls_global_id == id) {
292 			ls->ls_count++;
293 			goto out;
294 		}
295 	}
296 	ls = NULL;
297  out:
298 	spin_unlock(&lslist_lock);
299 	return ls;
300 }
301 
302 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
303 {
304 	struct dlm_ls *ls;
305 
306 	spin_lock(&lslist_lock);
307 	list_for_each_entry(ls, &lslist, ls_list) {
308 		if (ls->ls_local_handle == lockspace) {
309 			ls->ls_count++;
310 			goto out;
311 		}
312 	}
313 	ls = NULL;
314  out:
315 	spin_unlock(&lslist_lock);
316 	return ls;
317 }
318 
319 struct dlm_ls *dlm_find_lockspace_device(int minor)
320 {
321 	struct dlm_ls *ls;
322 
323 	spin_lock(&lslist_lock);
324 	list_for_each_entry(ls, &lslist, ls_list) {
325 		if (ls->ls_device.minor == minor) {
326 			ls->ls_count++;
327 			goto out;
328 		}
329 	}
330 	ls = NULL;
331  out:
332 	spin_unlock(&lslist_lock);
333 	return ls;
334 }
335 
336 void dlm_put_lockspace(struct dlm_ls *ls)
337 {
338 	spin_lock(&lslist_lock);
339 	ls->ls_count--;
340 	spin_unlock(&lslist_lock);
341 }
342 
343 static void remove_lockspace(struct dlm_ls *ls)
344 {
345 	for (;;) {
346 		spin_lock(&lslist_lock);
347 		if (ls->ls_count == 0) {
348 			WARN_ON(ls->ls_create_count != 0);
349 			list_del(&ls->ls_list);
350 			spin_unlock(&lslist_lock);
351 			return;
352 		}
353 		spin_unlock(&lslist_lock);
354 		ssleep(1);
355 	}
356 }
357 
358 static int threads_start(void)
359 {
360 	int error;
361 
362 	error = dlm_scand_start();
363 	if (error) {
364 		log_print("cannot start dlm_scand thread %d", error);
365 		goto fail;
366 	}
367 
368 	/* Thread for sending/receiving messages for all lockspace's */
369 	error = dlm_lowcomms_start();
370 	if (error) {
371 		log_print("cannot start dlm lowcomms %d", error);
372 		goto scand_fail;
373 	}
374 
375 	return 0;
376 
377  scand_fail:
378 	dlm_scand_stop();
379  fail:
380 	return error;
381 }
382 
383 static void threads_stop(void)
384 {
385 	dlm_scand_stop();
386 	dlm_lowcomms_stop();
387 }
388 
389 static int new_lockspace(const char *name, int namelen, void **lockspace,
390 			 uint32_t flags, int lvblen)
391 {
392 	struct dlm_ls *ls;
393 	int i, size, error;
394 	int do_unreg = 0;
395 
396 	if (namelen > DLM_LOCKSPACE_LEN)
397 		return -EINVAL;
398 
399 	if (!lvblen || (lvblen % 8))
400 		return -EINVAL;
401 
402 	if (!try_module_get(THIS_MODULE))
403 		return -EINVAL;
404 
405 	if (!dlm_user_daemon_available()) {
406 		module_put(THIS_MODULE);
407 		return -EUNATCH;
408 	}
409 
410 	error = 0;
411 
412 	spin_lock(&lslist_lock);
413 	list_for_each_entry(ls, &lslist, ls_list) {
414 		WARN_ON(ls->ls_create_count <= 0);
415 		if (ls->ls_namelen != namelen)
416 			continue;
417 		if (memcmp(ls->ls_name, name, namelen))
418 			continue;
419 		if (flags & DLM_LSFL_NEWEXCL) {
420 			error = -EEXIST;
421 			break;
422 		}
423 		ls->ls_create_count++;
424 		*lockspace = ls;
425 		error = 1;
426 		break;
427 	}
428 	spin_unlock(&lslist_lock);
429 
430 	if (error)
431 		goto out;
432 
433 	error = -ENOMEM;
434 
435 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
436 	if (!ls)
437 		goto out;
438 	memcpy(ls->ls_name, name, namelen);
439 	ls->ls_namelen = namelen;
440 	ls->ls_lvblen = lvblen;
441 	ls->ls_count = 0;
442 	ls->ls_flags = 0;
443 	ls->ls_scan_time = jiffies;
444 
445 	if (flags & DLM_LSFL_TIMEWARN)
446 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
447 
448 	/* ls_exflags are forced to match among nodes, and we don't
449 	   need to require all nodes to have some flags set */
450 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
451 				    DLM_LSFL_NEWEXCL));
452 
453 	size = dlm_config.ci_rsbtbl_size;
454 	ls->ls_rsbtbl_size = size;
455 
456 	ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
457 	if (!ls->ls_rsbtbl)
458 		goto out_lsfree;
459 	for (i = 0; i < size; i++) {
460 		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
461 		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
462 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
463 	}
464 
465 	idr_init(&ls->ls_lkbidr);
466 	spin_lock_init(&ls->ls_lkbidr_spin);
467 
468 	size = dlm_config.ci_dirtbl_size;
469 	ls->ls_dirtbl_size = size;
470 
471 	ls->ls_dirtbl = vmalloc(sizeof(struct dlm_dirtable) * size);
472 	if (!ls->ls_dirtbl)
473 		goto out_lkbfree;
474 	for (i = 0; i < size; i++) {
475 		INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
476 		spin_lock_init(&ls->ls_dirtbl[i].lock);
477 	}
478 
479 	INIT_LIST_HEAD(&ls->ls_waiters);
480 	mutex_init(&ls->ls_waiters_mutex);
481 	INIT_LIST_HEAD(&ls->ls_orphans);
482 	mutex_init(&ls->ls_orphans_mutex);
483 	INIT_LIST_HEAD(&ls->ls_timeout);
484 	mutex_init(&ls->ls_timeout_mutex);
485 
486 	INIT_LIST_HEAD(&ls->ls_new_rsb);
487 	spin_lock_init(&ls->ls_new_rsb_spin);
488 
489 	INIT_LIST_HEAD(&ls->ls_nodes);
490 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
491 	ls->ls_num_nodes = 0;
492 	ls->ls_low_nodeid = 0;
493 	ls->ls_total_weight = 0;
494 	ls->ls_node_array = NULL;
495 
496 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
497 	ls->ls_stub_rsb.res_ls = ls;
498 
499 	ls->ls_debug_rsb_dentry = NULL;
500 	ls->ls_debug_waiters_dentry = NULL;
501 
502 	init_waitqueue_head(&ls->ls_uevent_wait);
503 	ls->ls_uevent_result = 0;
504 	init_completion(&ls->ls_members_done);
505 	ls->ls_members_result = -1;
506 
507 	mutex_init(&ls->ls_cb_mutex);
508 	INIT_LIST_HEAD(&ls->ls_cb_delay);
509 
510 	ls->ls_recoverd_task = NULL;
511 	mutex_init(&ls->ls_recoverd_active);
512 	spin_lock_init(&ls->ls_recover_lock);
513 	spin_lock_init(&ls->ls_rcom_spin);
514 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
515 	ls->ls_recover_status = 0;
516 	ls->ls_recover_seq = 0;
517 	ls->ls_recover_args = NULL;
518 	init_rwsem(&ls->ls_in_recovery);
519 	init_rwsem(&ls->ls_recv_active);
520 	INIT_LIST_HEAD(&ls->ls_requestqueue);
521 	mutex_init(&ls->ls_requestqueue_mutex);
522 	mutex_init(&ls->ls_clear_proc_locks);
523 
524 	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
525 	if (!ls->ls_recover_buf)
526 		goto out_dirfree;
527 
528 	INIT_LIST_HEAD(&ls->ls_recover_list);
529 	spin_lock_init(&ls->ls_recover_list_lock);
530 	ls->ls_recover_list_count = 0;
531 	ls->ls_local_handle = ls;
532 	init_waitqueue_head(&ls->ls_wait_general);
533 	INIT_LIST_HEAD(&ls->ls_root_list);
534 	init_rwsem(&ls->ls_root_sem);
535 
536 	down_write(&ls->ls_in_recovery);
537 
538 	spin_lock(&lslist_lock);
539 	ls->ls_create_count = 1;
540 	list_add(&ls->ls_list, &lslist);
541 	spin_unlock(&lslist_lock);
542 
543 	if (flags & DLM_LSFL_FS) {
544 		error = dlm_callback_start(ls);
545 		if (error) {
546 			log_error(ls, "can't start dlm_callback %d", error);
547 			goto out_delist;
548 		}
549 	}
550 
551 	/* needs to find ls in lslist */
552 	error = dlm_recoverd_start(ls);
553 	if (error) {
554 		log_error(ls, "can't start dlm_recoverd %d", error);
555 		goto out_callback;
556 	}
557 
558 	ls->ls_kobj.kset = dlm_kset;
559 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
560 				     "%s", ls->ls_name);
561 	if (error)
562 		goto out_recoverd;
563 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
564 
565 	/* let kobject handle freeing of ls if there's an error */
566 	do_unreg = 1;
567 
568 	/* This uevent triggers dlm_controld in userspace to add us to the
569 	   group of nodes that are members of this lockspace (managed by the
570 	   cluster infrastructure.)  Once it's done that, it tells us who the
571 	   current lockspace members are (via configfs) and then tells the
572 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
573 
574 	error = do_uevent(ls, 1);
575 	if (error)
576 		goto out_recoverd;
577 
578 	wait_for_completion(&ls->ls_members_done);
579 	error = ls->ls_members_result;
580 	if (error)
581 		goto out_members;
582 
583 	dlm_create_debug_file(ls);
584 
585 	log_debug(ls, "join complete");
586 	*lockspace = ls;
587 	return 0;
588 
589  out_members:
590 	do_uevent(ls, 0);
591 	dlm_clear_members(ls);
592 	kfree(ls->ls_node_array);
593  out_recoverd:
594 	dlm_recoverd_stop(ls);
595  out_callback:
596 	dlm_callback_stop(ls);
597  out_delist:
598 	spin_lock(&lslist_lock);
599 	list_del(&ls->ls_list);
600 	spin_unlock(&lslist_lock);
601 	kfree(ls->ls_recover_buf);
602  out_dirfree:
603 	vfree(ls->ls_dirtbl);
604  out_lkbfree:
605 	idr_destroy(&ls->ls_lkbidr);
606 	vfree(ls->ls_rsbtbl);
607  out_lsfree:
608 	if (do_unreg)
609 		kobject_put(&ls->ls_kobj);
610 	else
611 		kfree(ls);
612  out:
613 	module_put(THIS_MODULE);
614 	return error;
615 }
616 
617 int dlm_new_lockspace(const char *name, int namelen, void **lockspace,
618 		      uint32_t flags, int lvblen)
619 {
620 	int error = 0;
621 
622 	mutex_lock(&ls_lock);
623 	if (!ls_count)
624 		error = threads_start();
625 	if (error)
626 		goto out;
627 
628 	error = new_lockspace(name, namelen, lockspace, flags, lvblen);
629 	if (!error)
630 		ls_count++;
631 	if (error > 0)
632 		error = 0;
633 	if (!ls_count)
634 		threads_stop();
635  out:
636 	mutex_unlock(&ls_lock);
637 	return error;
638 }
639 
640 static int lkb_idr_is_local(int id, void *p, void *data)
641 {
642 	struct dlm_lkb *lkb = p;
643 
644 	if (!lkb->lkb_nodeid)
645 		return 1;
646 	return 0;
647 }
648 
649 static int lkb_idr_is_any(int id, void *p, void *data)
650 {
651 	return 1;
652 }
653 
654 static int lkb_idr_free(int id, void *p, void *data)
655 {
656 	struct dlm_lkb *lkb = p;
657 
658 	if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
659 		dlm_free_lvb(lkb->lkb_lvbptr);
660 
661 	dlm_free_lkb(lkb);
662 	return 0;
663 }
664 
665 /* NOTE: We check the lkbidr here rather than the resource table.
666    This is because there may be LKBs queued as ASTs that have been unlinked
667    from their RSBs and are pending deletion once the AST has been delivered */
668 
669 static int lockspace_busy(struct dlm_ls *ls, int force)
670 {
671 	int rv;
672 
673 	spin_lock(&ls->ls_lkbidr_spin);
674 	if (force == 0) {
675 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
676 	} else if (force == 1) {
677 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
678 	} else {
679 		rv = 0;
680 	}
681 	spin_unlock(&ls->ls_lkbidr_spin);
682 	return rv;
683 }
684 
685 static int release_lockspace(struct dlm_ls *ls, int force)
686 {
687 	struct dlm_rsb *rsb;
688 	struct list_head *head;
689 	int i, busy, rv;
690 
691 	busy = lockspace_busy(ls, force);
692 
693 	spin_lock(&lslist_lock);
694 	if (ls->ls_create_count == 1) {
695 		if (busy) {
696 			rv = -EBUSY;
697 		} else {
698 			/* remove_lockspace takes ls off lslist */
699 			ls->ls_create_count = 0;
700 			rv = 0;
701 		}
702 	} else if (ls->ls_create_count > 1) {
703 		rv = --ls->ls_create_count;
704 	} else {
705 		rv = -EINVAL;
706 	}
707 	spin_unlock(&lslist_lock);
708 
709 	if (rv) {
710 		log_debug(ls, "release_lockspace no remove %d", rv);
711 		return rv;
712 	}
713 
714 	dlm_device_deregister(ls);
715 
716 	if (force < 3 && dlm_user_daemon_available())
717 		do_uevent(ls, 0);
718 
719 	dlm_recoverd_stop(ls);
720 
721 	dlm_callback_stop(ls);
722 
723 	remove_lockspace(ls);
724 
725 	dlm_delete_debug_file(ls);
726 
727 	kfree(ls->ls_recover_buf);
728 
729 	/*
730 	 * Free direntry structs.
731 	 */
732 
733 	dlm_dir_clear(ls);
734 	vfree(ls->ls_dirtbl);
735 
736 	/*
737 	 * Free all lkb's in idr
738 	 */
739 
740 	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
741 	idr_remove_all(&ls->ls_lkbidr);
742 	idr_destroy(&ls->ls_lkbidr);
743 
744 	/*
745 	 * Free all rsb's on rsbtbl[] lists
746 	 */
747 
748 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
749 		head = &ls->ls_rsbtbl[i].list;
750 		while (!list_empty(head)) {
751 			rsb = list_entry(head->next, struct dlm_rsb,
752 					 res_hashchain);
753 
754 			list_del(&rsb->res_hashchain);
755 			dlm_free_rsb(rsb);
756 		}
757 
758 		head = &ls->ls_rsbtbl[i].toss;
759 		while (!list_empty(head)) {
760 			rsb = list_entry(head->next, struct dlm_rsb,
761 					 res_hashchain);
762 			list_del(&rsb->res_hashchain);
763 			dlm_free_rsb(rsb);
764 		}
765 	}
766 
767 	vfree(ls->ls_rsbtbl);
768 
769 	while (!list_empty(&ls->ls_new_rsb)) {
770 		rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
771 				       res_hashchain);
772 		list_del(&rsb->res_hashchain);
773 		dlm_free_rsb(rsb);
774 	}
775 
776 	/*
777 	 * Free structures on any other lists
778 	 */
779 
780 	dlm_purge_requestqueue(ls);
781 	kfree(ls->ls_recover_args);
782 	dlm_clear_free_entries(ls);
783 	dlm_clear_members(ls);
784 	dlm_clear_members_gone(ls);
785 	kfree(ls->ls_node_array);
786 	log_debug(ls, "release_lockspace final free");
787 	kobject_put(&ls->ls_kobj);
788 	/* The ls structure will be freed when the kobject is done with */
789 
790 	module_put(THIS_MODULE);
791 	return 0;
792 }
793 
794 /*
795  * Called when a system has released all its locks and is not going to use the
796  * lockspace any longer.  We free everything we're managing for this lockspace.
797  * Remaining nodes will go through the recovery process as if we'd died.  The
798  * lockspace must continue to function as usual, participating in recoveries,
799  * until this returns.
800  *
801  * Force has 4 possible values:
802  * 0 - don't destroy locksapce if it has any LKBs
803  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
804  * 2 - destroy lockspace regardless of LKBs
805  * 3 - destroy lockspace as part of a forced shutdown
806  */
807 
808 int dlm_release_lockspace(void *lockspace, int force)
809 {
810 	struct dlm_ls *ls;
811 	int error;
812 
813 	ls = dlm_find_lockspace_local(lockspace);
814 	if (!ls)
815 		return -EINVAL;
816 	dlm_put_lockspace(ls);
817 
818 	mutex_lock(&ls_lock);
819 	error = release_lockspace(ls, force);
820 	if (!error)
821 		ls_count--;
822 	if (!ls_count)
823 		threads_stop();
824 	mutex_unlock(&ls_lock);
825 
826 	return error;
827 }
828 
829 void dlm_stop_lockspaces(void)
830 {
831 	struct dlm_ls *ls;
832 
833  restart:
834 	spin_lock(&lslist_lock);
835 	list_for_each_entry(ls, &lslist, ls_list) {
836 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
837 			continue;
838 		spin_unlock(&lslist_lock);
839 		log_error(ls, "no userland control daemon, stopping lockspace");
840 		dlm_ls_stop(ls);
841 		goto restart;
842 	}
843 	spin_unlock(&lslist_lock);
844 }
845 
846