xref: /openbmc/linux/fs/dlm/lockspace.c (revision 96de0e252cedffad61b3cb5e05662c591898e69a)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 
27 #ifdef CONFIG_DLM_DEBUG
28 int dlm_create_debug_file(struct dlm_ls *ls);
29 void dlm_delete_debug_file(struct dlm_ls *ls);
30 #else
31 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
32 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
33 #endif
34 
35 static int			ls_count;
36 static struct mutex		ls_lock;
37 static struct list_head		lslist;
38 static spinlock_t		lslist_lock;
39 static struct task_struct *	scand_task;
40 
41 
42 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
43 {
44 	ssize_t ret = len;
45 	int n = simple_strtol(buf, NULL, 0);
46 
47 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
48 	if (!ls)
49 		return -EINVAL;
50 
51 	switch (n) {
52 	case 0:
53 		dlm_ls_stop(ls);
54 		break;
55 	case 1:
56 		dlm_ls_start(ls);
57 		break;
58 	default:
59 		ret = -EINVAL;
60 	}
61 	dlm_put_lockspace(ls);
62 	return ret;
63 }
64 
65 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
66 {
67 	ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
68 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
69 	wake_up(&ls->ls_uevent_wait);
70 	return len;
71 }
72 
73 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
74 {
75 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
76 }
77 
78 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
79 {
80 	ls->ls_global_id = simple_strtoul(buf, NULL, 0);
81 	return len;
82 }
83 
84 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
85 {
86 	uint32_t status = dlm_recover_status(ls);
87 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
88 }
89 
90 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
91 {
92 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
93 }
94 
95 struct dlm_attr {
96 	struct attribute attr;
97 	ssize_t (*show)(struct dlm_ls *, char *);
98 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
99 };
100 
101 static struct dlm_attr dlm_attr_control = {
102 	.attr  = {.name = "control", .mode = S_IWUSR},
103 	.store = dlm_control_store
104 };
105 
106 static struct dlm_attr dlm_attr_event = {
107 	.attr  = {.name = "event_done", .mode = S_IWUSR},
108 	.store = dlm_event_store
109 };
110 
111 static struct dlm_attr dlm_attr_id = {
112 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
113 	.show  = dlm_id_show,
114 	.store = dlm_id_store
115 };
116 
117 static struct dlm_attr dlm_attr_recover_status = {
118 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
119 	.show  = dlm_recover_status_show
120 };
121 
122 static struct dlm_attr dlm_attr_recover_nodeid = {
123 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
124 	.show  = dlm_recover_nodeid_show
125 };
126 
127 static struct attribute *dlm_attrs[] = {
128 	&dlm_attr_control.attr,
129 	&dlm_attr_event.attr,
130 	&dlm_attr_id.attr,
131 	&dlm_attr_recover_status.attr,
132 	&dlm_attr_recover_nodeid.attr,
133 	NULL,
134 };
135 
136 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
137 			     char *buf)
138 {
139 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
140 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
141 	return a->show ? a->show(ls, buf) : 0;
142 }
143 
144 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
145 			      const char *buf, size_t len)
146 {
147 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
148 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
149 	return a->store ? a->store(ls, buf, len) : len;
150 }
151 
152 static void lockspace_kobj_release(struct kobject *k)
153 {
154 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
155 	kfree(ls);
156 }
157 
158 static struct sysfs_ops dlm_attr_ops = {
159 	.show  = dlm_attr_show,
160 	.store = dlm_attr_store,
161 };
162 
163 static struct kobj_type dlm_ktype = {
164 	.default_attrs = dlm_attrs,
165 	.sysfs_ops     = &dlm_attr_ops,
166 	.release       = lockspace_kobj_release,
167 };
168 
169 static struct kset dlm_kset = {
170 	.ktype  = &dlm_ktype,
171 };
172 
173 static int kobject_setup(struct dlm_ls *ls)
174 {
175 	char lsname[DLM_LOCKSPACE_LEN];
176 	int error;
177 
178 	memset(lsname, 0, DLM_LOCKSPACE_LEN);
179 	snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name);
180 
181 	error = kobject_set_name(&ls->ls_kobj, "%s", lsname);
182 	if (error)
183 		return error;
184 
185 	ls->ls_kobj.kset = &dlm_kset;
186 	ls->ls_kobj.ktype = &dlm_ktype;
187 	return 0;
188 }
189 
190 static int do_uevent(struct dlm_ls *ls, int in)
191 {
192 	int error;
193 
194 	if (in)
195 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
196 	else
197 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
198 
199 	log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
200 
201 	/* dlm_controld will see the uevent, do the necessary group management
202 	   and then write to sysfs to wake us */
203 
204 	error = wait_event_interruptible(ls->ls_uevent_wait,
205 			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
206 
207 	log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
208 
209 	if (error)
210 		goto out;
211 
212 	error = ls->ls_uevent_result;
213  out:
214 	if (error)
215 		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
216 			  error, ls->ls_uevent_result);
217 	return error;
218 }
219 
220 
221 int dlm_lockspace_init(void)
222 {
223 	int error;
224 
225 	ls_count = 0;
226 	mutex_init(&ls_lock);
227 	INIT_LIST_HEAD(&lslist);
228 	spin_lock_init(&lslist_lock);
229 
230 	kobject_set_name(&dlm_kset.kobj, "dlm");
231 	kobj_set_kset_s(&dlm_kset, kernel_subsys);
232 	error = kset_register(&dlm_kset);
233 	if (error)
234 		printk("dlm_lockspace_init: cannot register kset %d\n", error);
235 	return error;
236 }
237 
238 void dlm_lockspace_exit(void)
239 {
240 	kset_unregister(&dlm_kset);
241 }
242 
243 static int dlm_scand(void *data)
244 {
245 	struct dlm_ls *ls;
246 
247 	while (!kthread_should_stop()) {
248 		list_for_each_entry(ls, &lslist, ls_list) {
249 			if (dlm_lock_recovery_try(ls)) {
250 				dlm_scan_rsbs(ls);
251 				dlm_scan_timeout(ls);
252 				dlm_unlock_recovery(ls);
253 			}
254 		}
255 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
256 	}
257 	return 0;
258 }
259 
260 static int dlm_scand_start(void)
261 {
262 	struct task_struct *p;
263 	int error = 0;
264 
265 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
266 	if (IS_ERR(p))
267 		error = PTR_ERR(p);
268 	else
269 		scand_task = p;
270 	return error;
271 }
272 
273 static void dlm_scand_stop(void)
274 {
275 	kthread_stop(scand_task);
276 }
277 
278 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
279 {
280 	struct dlm_ls *ls;
281 
282 	spin_lock(&lslist_lock);
283 
284 	list_for_each_entry(ls, &lslist, ls_list) {
285 		if (ls->ls_namelen == namelen &&
286 		    memcmp(ls->ls_name, name, namelen) == 0)
287 			goto out;
288 	}
289 	ls = NULL;
290  out:
291 	spin_unlock(&lslist_lock);
292 	return ls;
293 }
294 
295 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
296 {
297 	struct dlm_ls *ls;
298 
299 	spin_lock(&lslist_lock);
300 
301 	list_for_each_entry(ls, &lslist, ls_list) {
302 		if (ls->ls_global_id == id) {
303 			ls->ls_count++;
304 			goto out;
305 		}
306 	}
307 	ls = NULL;
308  out:
309 	spin_unlock(&lslist_lock);
310 	return ls;
311 }
312 
313 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
314 {
315 	struct dlm_ls *ls;
316 
317 	spin_lock(&lslist_lock);
318 	list_for_each_entry(ls, &lslist, ls_list) {
319 		if (ls->ls_local_handle == lockspace) {
320 			ls->ls_count++;
321 			goto out;
322 		}
323 	}
324 	ls = NULL;
325  out:
326 	spin_unlock(&lslist_lock);
327 	return ls;
328 }
329 
330 struct dlm_ls *dlm_find_lockspace_device(int minor)
331 {
332 	struct dlm_ls *ls;
333 
334 	spin_lock(&lslist_lock);
335 	list_for_each_entry(ls, &lslist, ls_list) {
336 		if (ls->ls_device.minor == minor) {
337 			ls->ls_count++;
338 			goto out;
339 		}
340 	}
341 	ls = NULL;
342  out:
343 	spin_unlock(&lslist_lock);
344 	return ls;
345 }
346 
347 void dlm_put_lockspace(struct dlm_ls *ls)
348 {
349 	spin_lock(&lslist_lock);
350 	ls->ls_count--;
351 	spin_unlock(&lslist_lock);
352 }
353 
354 static void remove_lockspace(struct dlm_ls *ls)
355 {
356 	for (;;) {
357 		spin_lock(&lslist_lock);
358 		if (ls->ls_count == 0) {
359 			list_del(&ls->ls_list);
360 			spin_unlock(&lslist_lock);
361 			return;
362 		}
363 		spin_unlock(&lslist_lock);
364 		ssleep(1);
365 	}
366 }
367 
368 static int threads_start(void)
369 {
370 	int error;
371 
372 	/* Thread which process lock requests for all lockspace's */
373 	error = dlm_astd_start();
374 	if (error) {
375 		log_print("cannot start dlm_astd thread %d", error);
376 		goto fail;
377 	}
378 
379 	error = dlm_scand_start();
380 	if (error) {
381 		log_print("cannot start dlm_scand thread %d", error);
382 		goto astd_fail;
383 	}
384 
385 	/* Thread for sending/receiving messages for all lockspace's */
386 	error = dlm_lowcomms_start();
387 	if (error) {
388 		log_print("cannot start dlm lowcomms %d", error);
389 		goto scand_fail;
390 	}
391 
392 	return 0;
393 
394  scand_fail:
395 	dlm_scand_stop();
396  astd_fail:
397 	dlm_astd_stop();
398  fail:
399 	return error;
400 }
401 
402 static void threads_stop(void)
403 {
404 	dlm_scand_stop();
405 	dlm_lowcomms_stop();
406 	dlm_astd_stop();
407 }
408 
409 static int new_lockspace(char *name, int namelen, void **lockspace,
410 			 uint32_t flags, int lvblen)
411 {
412 	struct dlm_ls *ls;
413 	int i, size, error = -ENOMEM;
414 	int do_unreg = 0;
415 
416 	if (namelen > DLM_LOCKSPACE_LEN)
417 		return -EINVAL;
418 
419 	if (!lvblen || (lvblen % 8))
420 		return -EINVAL;
421 
422 	if (!try_module_get(THIS_MODULE))
423 		return -EINVAL;
424 
425 	ls = dlm_find_lockspace_name(name, namelen);
426 	if (ls) {
427 		*lockspace = ls;
428 		module_put(THIS_MODULE);
429 		return -EEXIST;
430 	}
431 
432 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
433 	if (!ls)
434 		goto out;
435 	memcpy(ls->ls_name, name, namelen);
436 	ls->ls_namelen = namelen;
437 	ls->ls_lvblen = lvblen;
438 	ls->ls_count = 0;
439 	ls->ls_flags = 0;
440 
441 	if (flags & DLM_LSFL_TIMEWARN)
442 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
443 
444 	if (flags & DLM_LSFL_FS)
445 		ls->ls_allocation = GFP_NOFS;
446 	else
447 		ls->ls_allocation = GFP_KERNEL;
448 
449 	/* ls_exflags are forced to match among nodes, and we don't
450 	   need to require all nodes to have TIMEWARN or FS set */
451 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS));
452 
453 	size = dlm_config.ci_rsbtbl_size;
454 	ls->ls_rsbtbl_size = size;
455 
456 	ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
457 	if (!ls->ls_rsbtbl)
458 		goto out_lsfree;
459 	for (i = 0; i < size; i++) {
460 		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
461 		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
462 		rwlock_init(&ls->ls_rsbtbl[i].lock);
463 	}
464 
465 	size = dlm_config.ci_lkbtbl_size;
466 	ls->ls_lkbtbl_size = size;
467 
468 	ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
469 	if (!ls->ls_lkbtbl)
470 		goto out_rsbfree;
471 	for (i = 0; i < size; i++) {
472 		INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
473 		rwlock_init(&ls->ls_lkbtbl[i].lock);
474 		ls->ls_lkbtbl[i].counter = 1;
475 	}
476 
477 	size = dlm_config.ci_dirtbl_size;
478 	ls->ls_dirtbl_size = size;
479 
480 	ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
481 	if (!ls->ls_dirtbl)
482 		goto out_lkbfree;
483 	for (i = 0; i < size; i++) {
484 		INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
485 		rwlock_init(&ls->ls_dirtbl[i].lock);
486 	}
487 
488 	INIT_LIST_HEAD(&ls->ls_waiters);
489 	mutex_init(&ls->ls_waiters_mutex);
490 	INIT_LIST_HEAD(&ls->ls_orphans);
491 	mutex_init(&ls->ls_orphans_mutex);
492 	INIT_LIST_HEAD(&ls->ls_timeout);
493 	mutex_init(&ls->ls_timeout_mutex);
494 
495 	INIT_LIST_HEAD(&ls->ls_nodes);
496 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
497 	ls->ls_num_nodes = 0;
498 	ls->ls_low_nodeid = 0;
499 	ls->ls_total_weight = 0;
500 	ls->ls_node_array = NULL;
501 
502 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
503 	ls->ls_stub_rsb.res_ls = ls;
504 
505 	ls->ls_debug_rsb_dentry = NULL;
506 	ls->ls_debug_waiters_dentry = NULL;
507 
508 	init_waitqueue_head(&ls->ls_uevent_wait);
509 	ls->ls_uevent_result = 0;
510 	init_completion(&ls->ls_members_done);
511 	ls->ls_members_result = -1;
512 
513 	ls->ls_recoverd_task = NULL;
514 	mutex_init(&ls->ls_recoverd_active);
515 	spin_lock_init(&ls->ls_recover_lock);
516 	spin_lock_init(&ls->ls_rcom_spin);
517 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
518 	ls->ls_recover_status = 0;
519 	ls->ls_recover_seq = 0;
520 	ls->ls_recover_args = NULL;
521 	init_rwsem(&ls->ls_in_recovery);
522 	init_rwsem(&ls->ls_recv_active);
523 	INIT_LIST_HEAD(&ls->ls_requestqueue);
524 	mutex_init(&ls->ls_requestqueue_mutex);
525 	mutex_init(&ls->ls_clear_proc_locks);
526 
527 	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
528 	if (!ls->ls_recover_buf)
529 		goto out_dirfree;
530 
531 	INIT_LIST_HEAD(&ls->ls_recover_list);
532 	spin_lock_init(&ls->ls_recover_list_lock);
533 	ls->ls_recover_list_count = 0;
534 	ls->ls_local_handle = ls;
535 	init_waitqueue_head(&ls->ls_wait_general);
536 	INIT_LIST_HEAD(&ls->ls_root_list);
537 	init_rwsem(&ls->ls_root_sem);
538 
539 	down_write(&ls->ls_in_recovery);
540 
541 	spin_lock(&lslist_lock);
542 	list_add(&ls->ls_list, &lslist);
543 	spin_unlock(&lslist_lock);
544 
545 	/* needs to find ls in lslist */
546 	error = dlm_recoverd_start(ls);
547 	if (error) {
548 		log_error(ls, "can't start dlm_recoverd %d", error);
549 		goto out_delist;
550 	}
551 
552 	error = kobject_setup(ls);
553 	if (error)
554 		goto out_stop;
555 
556 	error = kobject_register(&ls->ls_kobj);
557 	if (error)
558 		goto out_stop;
559 
560 	/* let kobject handle freeing of ls if there's an error */
561 	do_unreg = 1;
562 
563 	/* This uevent triggers dlm_controld in userspace to add us to the
564 	   group of nodes that are members of this lockspace (managed by the
565 	   cluster infrastructure.)  Once it's done that, it tells us who the
566 	   current lockspace members are (via configfs) and then tells the
567 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
568 
569 	error = do_uevent(ls, 1);
570 	if (error)
571 		goto out_stop;
572 
573 	wait_for_completion(&ls->ls_members_done);
574 	error = ls->ls_members_result;
575 	if (error)
576 		goto out_members;
577 
578 	dlm_create_debug_file(ls);
579 
580 	log_debug(ls, "join complete");
581 
582 	*lockspace = ls;
583 	return 0;
584 
585  out_members:
586 	do_uevent(ls, 0);
587 	dlm_clear_members(ls);
588 	kfree(ls->ls_node_array);
589  out_stop:
590 	dlm_recoverd_stop(ls);
591  out_delist:
592 	spin_lock(&lslist_lock);
593 	list_del(&ls->ls_list);
594 	spin_unlock(&lslist_lock);
595 	kfree(ls->ls_recover_buf);
596  out_dirfree:
597 	kfree(ls->ls_dirtbl);
598  out_lkbfree:
599 	kfree(ls->ls_lkbtbl);
600  out_rsbfree:
601 	kfree(ls->ls_rsbtbl);
602  out_lsfree:
603 	if (do_unreg)
604 		kobject_unregister(&ls->ls_kobj);
605 	else
606 		kfree(ls);
607  out:
608 	module_put(THIS_MODULE);
609 	return error;
610 }
611 
612 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
613 		      uint32_t flags, int lvblen)
614 {
615 	int error = 0;
616 
617 	mutex_lock(&ls_lock);
618 	if (!ls_count)
619 		error = threads_start();
620 	if (error)
621 		goto out;
622 
623 	error = new_lockspace(name, namelen, lockspace, flags, lvblen);
624 	if (!error)
625 		ls_count++;
626 	else if (!ls_count)
627 		threads_stop();
628  out:
629 	mutex_unlock(&ls_lock);
630 	return error;
631 }
632 
633 /* Return 1 if the lockspace still has active remote locks,
634  *        2 if the lockspace still has active local locks.
635  */
636 static int lockspace_busy(struct dlm_ls *ls)
637 {
638 	int i, lkb_found = 0;
639 	struct dlm_lkb *lkb;
640 
641 	/* NOTE: We check the lockidtbl here rather than the resource table.
642 	   This is because there may be LKBs queued as ASTs that have been
643 	   unlinked from their RSBs and are pending deletion once the AST has
644 	   been delivered */
645 
646 	for (i = 0; i < ls->ls_lkbtbl_size; i++) {
647 		read_lock(&ls->ls_lkbtbl[i].lock);
648 		if (!list_empty(&ls->ls_lkbtbl[i].list)) {
649 			lkb_found = 1;
650 			list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
651 					    lkb_idtbl_list) {
652 				if (!lkb->lkb_nodeid) {
653 					read_unlock(&ls->ls_lkbtbl[i].lock);
654 					return 2;
655 				}
656 			}
657 		}
658 		read_unlock(&ls->ls_lkbtbl[i].lock);
659 	}
660 	return lkb_found;
661 }
662 
663 static int release_lockspace(struct dlm_ls *ls, int force)
664 {
665 	struct dlm_lkb *lkb;
666 	struct dlm_rsb *rsb;
667 	struct list_head *head;
668 	int i;
669 	int busy = lockspace_busy(ls);
670 
671 	if (busy > force)
672 		return -EBUSY;
673 
674 	if (force < 3)
675 		do_uevent(ls, 0);
676 
677 	dlm_recoverd_stop(ls);
678 
679 	remove_lockspace(ls);
680 
681 	dlm_delete_debug_file(ls);
682 
683 	dlm_astd_suspend();
684 
685 	kfree(ls->ls_recover_buf);
686 
687 	/*
688 	 * Free direntry structs.
689 	 */
690 
691 	dlm_dir_clear(ls);
692 	kfree(ls->ls_dirtbl);
693 
694 	/*
695 	 * Free all lkb's on lkbtbl[] lists.
696 	 */
697 
698 	for (i = 0; i < ls->ls_lkbtbl_size; i++) {
699 		head = &ls->ls_lkbtbl[i].list;
700 		while (!list_empty(head)) {
701 			lkb = list_entry(head->next, struct dlm_lkb,
702 					 lkb_idtbl_list);
703 
704 			list_del(&lkb->lkb_idtbl_list);
705 
706 			dlm_del_ast(lkb);
707 
708 			if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
709 				free_lvb(lkb->lkb_lvbptr);
710 
711 			free_lkb(lkb);
712 		}
713 	}
714 	dlm_astd_resume();
715 
716 	kfree(ls->ls_lkbtbl);
717 
718 	/*
719 	 * Free all rsb's on rsbtbl[] lists
720 	 */
721 
722 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
723 		head = &ls->ls_rsbtbl[i].list;
724 		while (!list_empty(head)) {
725 			rsb = list_entry(head->next, struct dlm_rsb,
726 					 res_hashchain);
727 
728 			list_del(&rsb->res_hashchain);
729 			free_rsb(rsb);
730 		}
731 
732 		head = &ls->ls_rsbtbl[i].toss;
733 		while (!list_empty(head)) {
734 			rsb = list_entry(head->next, struct dlm_rsb,
735 					 res_hashchain);
736 			list_del(&rsb->res_hashchain);
737 			free_rsb(rsb);
738 		}
739 	}
740 
741 	kfree(ls->ls_rsbtbl);
742 
743 	/*
744 	 * Free structures on any other lists
745 	 */
746 
747 	dlm_purge_requestqueue(ls);
748 	kfree(ls->ls_recover_args);
749 	dlm_clear_free_entries(ls);
750 	dlm_clear_members(ls);
751 	dlm_clear_members_gone(ls);
752 	kfree(ls->ls_node_array);
753 	kobject_unregister(&ls->ls_kobj);
754 	/* The ls structure will be freed when the kobject is done with */
755 
756 	mutex_lock(&ls_lock);
757 	ls_count--;
758 	if (!ls_count)
759 		threads_stop();
760 	mutex_unlock(&ls_lock);
761 
762 	module_put(THIS_MODULE);
763 	return 0;
764 }
765 
766 /*
767  * Called when a system has released all its locks and is not going to use the
768  * lockspace any longer.  We free everything we're managing for this lockspace.
769  * Remaining nodes will go through the recovery process as if we'd died.  The
770  * lockspace must continue to function as usual, participating in recoveries,
771  * until this returns.
772  *
773  * Force has 4 possible values:
774  * 0 - don't destroy locksapce if it has any LKBs
775  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
776  * 2 - destroy lockspace regardless of LKBs
777  * 3 - destroy lockspace as part of a forced shutdown
778  */
779 
780 int dlm_release_lockspace(void *lockspace, int force)
781 {
782 	struct dlm_ls *ls;
783 
784 	ls = dlm_find_lockspace_local(lockspace);
785 	if (!ls)
786 		return -EINVAL;
787 	dlm_put_lockspace(ls);
788 	return release_lockspace(ls, force);
789 }
790 
791