xref: /openbmc/linux/fs/dlm/lockspace.c (revision a09d2831)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2008 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 #include "user.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 static struct task_struct *	scand_task;
33 
34 
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37 	ssize_t ret = len;
38 	int n = simple_strtol(buf, NULL, 0);
39 
40 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
41 	if (!ls)
42 		return -EINVAL;
43 
44 	switch (n) {
45 	case 0:
46 		dlm_ls_stop(ls);
47 		break;
48 	case 1:
49 		dlm_ls_start(ls);
50 		break;
51 	default:
52 		ret = -EINVAL;
53 	}
54 	dlm_put_lockspace(ls);
55 	return ret;
56 }
57 
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60 	ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62 	wake_up(&ls->ls_uevent_wait);
63 	return len;
64 }
65 
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70 
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73 	ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74 	return len;
75 }
76 
77 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
78 {
79 	uint32_t status = dlm_recover_status(ls);
80 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
81 }
82 
83 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
84 {
85 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
86 }
87 
88 struct dlm_attr {
89 	struct attribute attr;
90 	ssize_t (*show)(struct dlm_ls *, char *);
91 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
92 };
93 
94 static struct dlm_attr dlm_attr_control = {
95 	.attr  = {.name = "control", .mode = S_IWUSR},
96 	.store = dlm_control_store
97 };
98 
99 static struct dlm_attr dlm_attr_event = {
100 	.attr  = {.name = "event_done", .mode = S_IWUSR},
101 	.store = dlm_event_store
102 };
103 
104 static struct dlm_attr dlm_attr_id = {
105 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
106 	.show  = dlm_id_show,
107 	.store = dlm_id_store
108 };
109 
110 static struct dlm_attr dlm_attr_recover_status = {
111 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
112 	.show  = dlm_recover_status_show
113 };
114 
115 static struct dlm_attr dlm_attr_recover_nodeid = {
116 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
117 	.show  = dlm_recover_nodeid_show
118 };
119 
120 static struct attribute *dlm_attrs[] = {
121 	&dlm_attr_control.attr,
122 	&dlm_attr_event.attr,
123 	&dlm_attr_id.attr,
124 	&dlm_attr_recover_status.attr,
125 	&dlm_attr_recover_nodeid.attr,
126 	NULL,
127 };
128 
129 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
130 			     char *buf)
131 {
132 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
133 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
134 	return a->show ? a->show(ls, buf) : 0;
135 }
136 
137 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
138 			      const char *buf, size_t len)
139 {
140 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
141 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
142 	return a->store ? a->store(ls, buf, len) : len;
143 }
144 
145 static void lockspace_kobj_release(struct kobject *k)
146 {
147 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
148 	kfree(ls);
149 }
150 
151 static struct sysfs_ops dlm_attr_ops = {
152 	.show  = dlm_attr_show,
153 	.store = dlm_attr_store,
154 };
155 
156 static struct kobj_type dlm_ktype = {
157 	.default_attrs = dlm_attrs,
158 	.sysfs_ops     = &dlm_attr_ops,
159 	.release       = lockspace_kobj_release,
160 };
161 
162 static struct kset *dlm_kset;
163 
164 static int do_uevent(struct dlm_ls *ls, int in)
165 {
166 	int error;
167 
168 	if (in)
169 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
170 	else
171 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
172 
173 	log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
174 
175 	/* dlm_controld will see the uevent, do the necessary group management
176 	   and then write to sysfs to wake us */
177 
178 	error = wait_event_interruptible(ls->ls_uevent_wait,
179 			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
180 
181 	log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
182 
183 	if (error)
184 		goto out;
185 
186 	error = ls->ls_uevent_result;
187  out:
188 	if (error)
189 		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
190 			  error, ls->ls_uevent_result);
191 	return error;
192 }
193 
194 
195 int __init dlm_lockspace_init(void)
196 {
197 	ls_count = 0;
198 	mutex_init(&ls_lock);
199 	INIT_LIST_HEAD(&lslist);
200 	spin_lock_init(&lslist_lock);
201 
202 	dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
203 	if (!dlm_kset) {
204 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
205 		return -ENOMEM;
206 	}
207 	return 0;
208 }
209 
210 void dlm_lockspace_exit(void)
211 {
212 	kset_unregister(dlm_kset);
213 }
214 
215 static struct dlm_ls *find_ls_to_scan(void)
216 {
217 	struct dlm_ls *ls;
218 
219 	spin_lock(&lslist_lock);
220 	list_for_each_entry(ls, &lslist, ls_list) {
221 		if (time_after_eq(jiffies, ls->ls_scan_time +
222 					    dlm_config.ci_scan_secs * HZ)) {
223 			spin_unlock(&lslist_lock);
224 			return ls;
225 		}
226 	}
227 	spin_unlock(&lslist_lock);
228 	return NULL;
229 }
230 
231 static int dlm_scand(void *data)
232 {
233 	struct dlm_ls *ls;
234 	int timeout_jiffies = dlm_config.ci_scan_secs * HZ;
235 
236 	while (!kthread_should_stop()) {
237 		ls = find_ls_to_scan();
238 		if (ls) {
239 			if (dlm_lock_recovery_try(ls)) {
240 				ls->ls_scan_time = jiffies;
241 				dlm_scan_rsbs(ls);
242 				dlm_scan_timeout(ls);
243 				dlm_unlock_recovery(ls);
244 			} else {
245 				ls->ls_scan_time += HZ;
246 			}
247 		} else {
248 			schedule_timeout_interruptible(timeout_jiffies);
249 		}
250 	}
251 	return 0;
252 }
253 
254 static int dlm_scand_start(void)
255 {
256 	struct task_struct *p;
257 	int error = 0;
258 
259 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
260 	if (IS_ERR(p))
261 		error = PTR_ERR(p);
262 	else
263 		scand_task = p;
264 	return error;
265 }
266 
267 static void dlm_scand_stop(void)
268 {
269 	kthread_stop(scand_task);
270 }
271 
272 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
273 {
274 	struct dlm_ls *ls;
275 
276 	spin_lock(&lslist_lock);
277 
278 	list_for_each_entry(ls, &lslist, ls_list) {
279 		if (ls->ls_global_id == id) {
280 			ls->ls_count++;
281 			goto out;
282 		}
283 	}
284 	ls = NULL;
285  out:
286 	spin_unlock(&lslist_lock);
287 	return ls;
288 }
289 
290 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
291 {
292 	struct dlm_ls *ls;
293 
294 	spin_lock(&lslist_lock);
295 	list_for_each_entry(ls, &lslist, ls_list) {
296 		if (ls->ls_local_handle == lockspace) {
297 			ls->ls_count++;
298 			goto out;
299 		}
300 	}
301 	ls = NULL;
302  out:
303 	spin_unlock(&lslist_lock);
304 	return ls;
305 }
306 
307 struct dlm_ls *dlm_find_lockspace_device(int minor)
308 {
309 	struct dlm_ls *ls;
310 
311 	spin_lock(&lslist_lock);
312 	list_for_each_entry(ls, &lslist, ls_list) {
313 		if (ls->ls_device.minor == minor) {
314 			ls->ls_count++;
315 			goto out;
316 		}
317 	}
318 	ls = NULL;
319  out:
320 	spin_unlock(&lslist_lock);
321 	return ls;
322 }
323 
324 void dlm_put_lockspace(struct dlm_ls *ls)
325 {
326 	spin_lock(&lslist_lock);
327 	ls->ls_count--;
328 	spin_unlock(&lslist_lock);
329 }
330 
331 static void remove_lockspace(struct dlm_ls *ls)
332 {
333 	for (;;) {
334 		spin_lock(&lslist_lock);
335 		if (ls->ls_count == 0) {
336 			WARN_ON(ls->ls_create_count != 0);
337 			list_del(&ls->ls_list);
338 			spin_unlock(&lslist_lock);
339 			return;
340 		}
341 		spin_unlock(&lslist_lock);
342 		ssleep(1);
343 	}
344 }
345 
346 static int threads_start(void)
347 {
348 	int error;
349 
350 	/* Thread which process lock requests for all lockspace's */
351 	error = dlm_astd_start();
352 	if (error) {
353 		log_print("cannot start dlm_astd thread %d", error);
354 		goto fail;
355 	}
356 
357 	error = dlm_scand_start();
358 	if (error) {
359 		log_print("cannot start dlm_scand thread %d", error);
360 		goto astd_fail;
361 	}
362 
363 	/* Thread for sending/receiving messages for all lockspace's */
364 	error = dlm_lowcomms_start();
365 	if (error) {
366 		log_print("cannot start dlm lowcomms %d", error);
367 		goto scand_fail;
368 	}
369 
370 	return 0;
371 
372  scand_fail:
373 	dlm_scand_stop();
374  astd_fail:
375 	dlm_astd_stop();
376  fail:
377 	return error;
378 }
379 
380 static void threads_stop(void)
381 {
382 	dlm_scand_stop();
383 	dlm_lowcomms_stop();
384 	dlm_astd_stop();
385 }
386 
387 static int new_lockspace(const char *name, int namelen, void **lockspace,
388 			 uint32_t flags, int lvblen)
389 {
390 	struct dlm_ls *ls;
391 	int i, size, error;
392 	int do_unreg = 0;
393 
394 	if (namelen > DLM_LOCKSPACE_LEN)
395 		return -EINVAL;
396 
397 	if (!lvblen || (lvblen % 8))
398 		return -EINVAL;
399 
400 	if (!try_module_get(THIS_MODULE))
401 		return -EINVAL;
402 
403 	if (!dlm_user_daemon_available()) {
404 		module_put(THIS_MODULE);
405 		return -EUNATCH;
406 	}
407 
408 	error = 0;
409 
410 	spin_lock(&lslist_lock);
411 	list_for_each_entry(ls, &lslist, ls_list) {
412 		WARN_ON(ls->ls_create_count <= 0);
413 		if (ls->ls_namelen != namelen)
414 			continue;
415 		if (memcmp(ls->ls_name, name, namelen))
416 			continue;
417 		if (flags & DLM_LSFL_NEWEXCL) {
418 			error = -EEXIST;
419 			break;
420 		}
421 		ls->ls_create_count++;
422 		*lockspace = ls;
423 		error = 1;
424 		break;
425 	}
426 	spin_unlock(&lslist_lock);
427 
428 	if (error)
429 		goto out;
430 
431 	error = -ENOMEM;
432 
433 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
434 	if (!ls)
435 		goto out;
436 	memcpy(ls->ls_name, name, namelen);
437 	ls->ls_namelen = namelen;
438 	ls->ls_lvblen = lvblen;
439 	ls->ls_count = 0;
440 	ls->ls_flags = 0;
441 	ls->ls_scan_time = jiffies;
442 
443 	if (flags & DLM_LSFL_TIMEWARN)
444 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
445 
446 	/* ls_exflags are forced to match among nodes, and we don't
447 	   need to require all nodes to have some flags set */
448 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
449 				    DLM_LSFL_NEWEXCL));
450 
451 	size = dlm_config.ci_rsbtbl_size;
452 	ls->ls_rsbtbl_size = size;
453 
454 	ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_NOFS);
455 	if (!ls->ls_rsbtbl)
456 		goto out_lsfree;
457 	for (i = 0; i < size; i++) {
458 		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
459 		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
460 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
461 	}
462 
463 	size = dlm_config.ci_lkbtbl_size;
464 	ls->ls_lkbtbl_size = size;
465 
466 	ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_NOFS);
467 	if (!ls->ls_lkbtbl)
468 		goto out_rsbfree;
469 	for (i = 0; i < size; i++) {
470 		INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
471 		rwlock_init(&ls->ls_lkbtbl[i].lock);
472 		ls->ls_lkbtbl[i].counter = 1;
473 	}
474 
475 	size = dlm_config.ci_dirtbl_size;
476 	ls->ls_dirtbl_size = size;
477 
478 	ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_NOFS);
479 	if (!ls->ls_dirtbl)
480 		goto out_lkbfree;
481 	for (i = 0; i < size; i++) {
482 		INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
483 		spin_lock_init(&ls->ls_dirtbl[i].lock);
484 	}
485 
486 	INIT_LIST_HEAD(&ls->ls_waiters);
487 	mutex_init(&ls->ls_waiters_mutex);
488 	INIT_LIST_HEAD(&ls->ls_orphans);
489 	mutex_init(&ls->ls_orphans_mutex);
490 	INIT_LIST_HEAD(&ls->ls_timeout);
491 	mutex_init(&ls->ls_timeout_mutex);
492 
493 	INIT_LIST_HEAD(&ls->ls_nodes);
494 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
495 	ls->ls_num_nodes = 0;
496 	ls->ls_low_nodeid = 0;
497 	ls->ls_total_weight = 0;
498 	ls->ls_node_array = NULL;
499 
500 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
501 	ls->ls_stub_rsb.res_ls = ls;
502 
503 	ls->ls_debug_rsb_dentry = NULL;
504 	ls->ls_debug_waiters_dentry = NULL;
505 
506 	init_waitqueue_head(&ls->ls_uevent_wait);
507 	ls->ls_uevent_result = 0;
508 	init_completion(&ls->ls_members_done);
509 	ls->ls_members_result = -1;
510 
511 	ls->ls_recoverd_task = NULL;
512 	mutex_init(&ls->ls_recoverd_active);
513 	spin_lock_init(&ls->ls_recover_lock);
514 	spin_lock_init(&ls->ls_rcom_spin);
515 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
516 	ls->ls_recover_status = 0;
517 	ls->ls_recover_seq = 0;
518 	ls->ls_recover_args = NULL;
519 	init_rwsem(&ls->ls_in_recovery);
520 	init_rwsem(&ls->ls_recv_active);
521 	INIT_LIST_HEAD(&ls->ls_requestqueue);
522 	mutex_init(&ls->ls_requestqueue_mutex);
523 	mutex_init(&ls->ls_clear_proc_locks);
524 
525 	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
526 	if (!ls->ls_recover_buf)
527 		goto out_dirfree;
528 
529 	INIT_LIST_HEAD(&ls->ls_recover_list);
530 	spin_lock_init(&ls->ls_recover_list_lock);
531 	ls->ls_recover_list_count = 0;
532 	ls->ls_local_handle = ls;
533 	init_waitqueue_head(&ls->ls_wait_general);
534 	INIT_LIST_HEAD(&ls->ls_root_list);
535 	init_rwsem(&ls->ls_root_sem);
536 
537 	down_write(&ls->ls_in_recovery);
538 
539 	spin_lock(&lslist_lock);
540 	ls->ls_create_count = 1;
541 	list_add(&ls->ls_list, &lslist);
542 	spin_unlock(&lslist_lock);
543 
544 	/* needs to find ls in lslist */
545 	error = dlm_recoverd_start(ls);
546 	if (error) {
547 		log_error(ls, "can't start dlm_recoverd %d", error);
548 		goto out_delist;
549 	}
550 
551 	ls->ls_kobj.kset = dlm_kset;
552 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
553 				     "%s", ls->ls_name);
554 	if (error)
555 		goto out_stop;
556 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
557 
558 	/* let kobject handle freeing of ls if there's an error */
559 	do_unreg = 1;
560 
561 	/* This uevent triggers dlm_controld in userspace to add us to the
562 	   group of nodes that are members of this lockspace (managed by the
563 	   cluster infrastructure.)  Once it's done that, it tells us who the
564 	   current lockspace members are (via configfs) and then tells the
565 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
566 
567 	error = do_uevent(ls, 1);
568 	if (error)
569 		goto out_stop;
570 
571 	wait_for_completion(&ls->ls_members_done);
572 	error = ls->ls_members_result;
573 	if (error)
574 		goto out_members;
575 
576 	dlm_create_debug_file(ls);
577 
578 	log_debug(ls, "join complete");
579 	*lockspace = ls;
580 	return 0;
581 
582  out_members:
583 	do_uevent(ls, 0);
584 	dlm_clear_members(ls);
585 	kfree(ls->ls_node_array);
586  out_stop:
587 	dlm_recoverd_stop(ls);
588  out_delist:
589 	spin_lock(&lslist_lock);
590 	list_del(&ls->ls_list);
591 	spin_unlock(&lslist_lock);
592 	kfree(ls->ls_recover_buf);
593  out_dirfree:
594 	kfree(ls->ls_dirtbl);
595  out_lkbfree:
596 	kfree(ls->ls_lkbtbl);
597  out_rsbfree:
598 	kfree(ls->ls_rsbtbl);
599  out_lsfree:
600 	if (do_unreg)
601 		kobject_put(&ls->ls_kobj);
602 	else
603 		kfree(ls);
604  out:
605 	module_put(THIS_MODULE);
606 	return error;
607 }
608 
609 int dlm_new_lockspace(const char *name, int namelen, void **lockspace,
610 		      uint32_t flags, int lvblen)
611 {
612 	int error = 0;
613 
614 	mutex_lock(&ls_lock);
615 	if (!ls_count)
616 		error = threads_start();
617 	if (error)
618 		goto out;
619 
620 	error = new_lockspace(name, namelen, lockspace, flags, lvblen);
621 	if (!error)
622 		ls_count++;
623 	if (error > 0)
624 		error = 0;
625 	if (!ls_count)
626 		threads_stop();
627  out:
628 	mutex_unlock(&ls_lock);
629 	return error;
630 }
631 
632 /* Return 1 if the lockspace still has active remote locks,
633  *        2 if the lockspace still has active local locks.
634  */
635 static int lockspace_busy(struct dlm_ls *ls)
636 {
637 	int i, lkb_found = 0;
638 	struct dlm_lkb *lkb;
639 
640 	/* NOTE: We check the lockidtbl here rather than the resource table.
641 	   This is because there may be LKBs queued as ASTs that have been
642 	   unlinked from their RSBs and are pending deletion once the AST has
643 	   been delivered */
644 
645 	for (i = 0; i < ls->ls_lkbtbl_size; i++) {
646 		read_lock(&ls->ls_lkbtbl[i].lock);
647 		if (!list_empty(&ls->ls_lkbtbl[i].list)) {
648 			lkb_found = 1;
649 			list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
650 					    lkb_idtbl_list) {
651 				if (!lkb->lkb_nodeid) {
652 					read_unlock(&ls->ls_lkbtbl[i].lock);
653 					return 2;
654 				}
655 			}
656 		}
657 		read_unlock(&ls->ls_lkbtbl[i].lock);
658 	}
659 	return lkb_found;
660 }
661 
662 static int release_lockspace(struct dlm_ls *ls, int force)
663 {
664 	struct dlm_lkb *lkb;
665 	struct dlm_rsb *rsb;
666 	struct list_head *head;
667 	int i, busy, rv;
668 
669 	busy = lockspace_busy(ls);
670 
671 	spin_lock(&lslist_lock);
672 	if (ls->ls_create_count == 1) {
673 		if (busy > force)
674 			rv = -EBUSY;
675 		else {
676 			/* remove_lockspace takes ls off lslist */
677 			ls->ls_create_count = 0;
678 			rv = 0;
679 		}
680 	} else if (ls->ls_create_count > 1) {
681 		rv = --ls->ls_create_count;
682 	} else {
683 		rv = -EINVAL;
684 	}
685 	spin_unlock(&lslist_lock);
686 
687 	if (rv) {
688 		log_debug(ls, "release_lockspace no remove %d", rv);
689 		return rv;
690 	}
691 
692 	dlm_device_deregister(ls);
693 
694 	if (force < 3 && dlm_user_daemon_available())
695 		do_uevent(ls, 0);
696 
697 	dlm_recoverd_stop(ls);
698 
699 	remove_lockspace(ls);
700 
701 	dlm_delete_debug_file(ls);
702 
703 	dlm_astd_suspend();
704 
705 	kfree(ls->ls_recover_buf);
706 
707 	/*
708 	 * Free direntry structs.
709 	 */
710 
711 	dlm_dir_clear(ls);
712 	kfree(ls->ls_dirtbl);
713 
714 	/*
715 	 * Free all lkb's on lkbtbl[] lists.
716 	 */
717 
718 	for (i = 0; i < ls->ls_lkbtbl_size; i++) {
719 		head = &ls->ls_lkbtbl[i].list;
720 		while (!list_empty(head)) {
721 			lkb = list_entry(head->next, struct dlm_lkb,
722 					 lkb_idtbl_list);
723 
724 			list_del(&lkb->lkb_idtbl_list);
725 
726 			dlm_del_ast(lkb);
727 
728 			if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
729 				dlm_free_lvb(lkb->lkb_lvbptr);
730 
731 			dlm_free_lkb(lkb);
732 		}
733 	}
734 	dlm_astd_resume();
735 
736 	kfree(ls->ls_lkbtbl);
737 
738 	/*
739 	 * Free all rsb's on rsbtbl[] lists
740 	 */
741 
742 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
743 		head = &ls->ls_rsbtbl[i].list;
744 		while (!list_empty(head)) {
745 			rsb = list_entry(head->next, struct dlm_rsb,
746 					 res_hashchain);
747 
748 			list_del(&rsb->res_hashchain);
749 			dlm_free_rsb(rsb);
750 		}
751 
752 		head = &ls->ls_rsbtbl[i].toss;
753 		while (!list_empty(head)) {
754 			rsb = list_entry(head->next, struct dlm_rsb,
755 					 res_hashchain);
756 			list_del(&rsb->res_hashchain);
757 			dlm_free_rsb(rsb);
758 		}
759 	}
760 
761 	kfree(ls->ls_rsbtbl);
762 
763 	/*
764 	 * Free structures on any other lists
765 	 */
766 
767 	dlm_purge_requestqueue(ls);
768 	kfree(ls->ls_recover_args);
769 	dlm_clear_free_entries(ls);
770 	dlm_clear_members(ls);
771 	dlm_clear_members_gone(ls);
772 	kfree(ls->ls_node_array);
773 	log_debug(ls, "release_lockspace final free");
774 	kobject_put(&ls->ls_kobj);
775 	/* The ls structure will be freed when the kobject is done with */
776 
777 	module_put(THIS_MODULE);
778 	return 0;
779 }
780 
781 /*
782  * Called when a system has released all its locks and is not going to use the
783  * lockspace any longer.  We free everything we're managing for this lockspace.
784  * Remaining nodes will go through the recovery process as if we'd died.  The
785  * lockspace must continue to function as usual, participating in recoveries,
786  * until this returns.
787  *
788  * Force has 4 possible values:
789  * 0 - don't destroy locksapce if it has any LKBs
790  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
791  * 2 - destroy lockspace regardless of LKBs
792  * 3 - destroy lockspace as part of a forced shutdown
793  */
794 
795 int dlm_release_lockspace(void *lockspace, int force)
796 {
797 	struct dlm_ls *ls;
798 	int error;
799 
800 	ls = dlm_find_lockspace_local(lockspace);
801 	if (!ls)
802 		return -EINVAL;
803 	dlm_put_lockspace(ls);
804 
805 	mutex_lock(&ls_lock);
806 	error = release_lockspace(ls, force);
807 	if (!error)
808 		ls_count--;
809 	if (!ls_count)
810 		threads_stop();
811 	mutex_unlock(&ls_lock);
812 
813 	return error;
814 }
815 
816 void dlm_stop_lockspaces(void)
817 {
818 	struct dlm_ls *ls;
819 
820  restart:
821 	spin_lock(&lslist_lock);
822 	list_for_each_entry(ls, &lslist, ls_list) {
823 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
824 			continue;
825 		spin_unlock(&lslist_lock);
826 		log_error(ls, "no userland control daemon, stopping lockspace");
827 		dlm_ls_stop(ls);
828 		goto restart;
829 	}
830 	spin_unlock(&lslist_lock);
831 }
832 
833