xref: /openbmc/linux/fs/dlm/lockspace.c (revision e8e0929d)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2008 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 #include "user.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 static struct task_struct *	scand_task;
33 
34 
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37 	ssize_t ret = len;
38 	int n = simple_strtol(buf, NULL, 0);
39 
40 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
41 	if (!ls)
42 		return -EINVAL;
43 
44 	switch (n) {
45 	case 0:
46 		dlm_ls_stop(ls);
47 		break;
48 	case 1:
49 		dlm_ls_start(ls);
50 		break;
51 	default:
52 		ret = -EINVAL;
53 	}
54 	dlm_put_lockspace(ls);
55 	return ret;
56 }
57 
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60 	ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62 	wake_up(&ls->ls_uevent_wait);
63 	return len;
64 }
65 
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70 
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73 	ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74 	return len;
75 }
76 
77 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
78 {
79 	uint32_t status = dlm_recover_status(ls);
80 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
81 }
82 
83 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
84 {
85 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
86 }
87 
88 struct dlm_attr {
89 	struct attribute attr;
90 	ssize_t (*show)(struct dlm_ls *, char *);
91 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
92 };
93 
94 static struct dlm_attr dlm_attr_control = {
95 	.attr  = {.name = "control", .mode = S_IWUSR},
96 	.store = dlm_control_store
97 };
98 
99 static struct dlm_attr dlm_attr_event = {
100 	.attr  = {.name = "event_done", .mode = S_IWUSR},
101 	.store = dlm_event_store
102 };
103 
104 static struct dlm_attr dlm_attr_id = {
105 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
106 	.show  = dlm_id_show,
107 	.store = dlm_id_store
108 };
109 
110 static struct dlm_attr dlm_attr_recover_status = {
111 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
112 	.show  = dlm_recover_status_show
113 };
114 
115 static struct dlm_attr dlm_attr_recover_nodeid = {
116 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
117 	.show  = dlm_recover_nodeid_show
118 };
119 
120 static struct attribute *dlm_attrs[] = {
121 	&dlm_attr_control.attr,
122 	&dlm_attr_event.attr,
123 	&dlm_attr_id.attr,
124 	&dlm_attr_recover_status.attr,
125 	&dlm_attr_recover_nodeid.attr,
126 	NULL,
127 };
128 
129 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
130 			     char *buf)
131 {
132 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
133 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
134 	return a->show ? a->show(ls, buf) : 0;
135 }
136 
137 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
138 			      const char *buf, size_t len)
139 {
140 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
141 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
142 	return a->store ? a->store(ls, buf, len) : len;
143 }
144 
145 static void lockspace_kobj_release(struct kobject *k)
146 {
147 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
148 	kfree(ls);
149 }
150 
151 static struct sysfs_ops dlm_attr_ops = {
152 	.show  = dlm_attr_show,
153 	.store = dlm_attr_store,
154 };
155 
156 static struct kobj_type dlm_ktype = {
157 	.default_attrs = dlm_attrs,
158 	.sysfs_ops     = &dlm_attr_ops,
159 	.release       = lockspace_kobj_release,
160 };
161 
162 static struct kset *dlm_kset;
163 
164 static int do_uevent(struct dlm_ls *ls, int in)
165 {
166 	int error;
167 
168 	if (in)
169 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
170 	else
171 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
172 
173 	log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
174 
175 	/* dlm_controld will see the uevent, do the necessary group management
176 	   and then write to sysfs to wake us */
177 
178 	error = wait_event_interruptible(ls->ls_uevent_wait,
179 			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
180 
181 	log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
182 
183 	if (error)
184 		goto out;
185 
186 	error = ls->ls_uevent_result;
187  out:
188 	if (error)
189 		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
190 			  error, ls->ls_uevent_result);
191 	return error;
192 }
193 
194 
195 int __init dlm_lockspace_init(void)
196 {
197 	ls_count = 0;
198 	mutex_init(&ls_lock);
199 	INIT_LIST_HEAD(&lslist);
200 	spin_lock_init(&lslist_lock);
201 
202 	dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
203 	if (!dlm_kset) {
204 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
205 		return -ENOMEM;
206 	}
207 	return 0;
208 }
209 
210 void dlm_lockspace_exit(void)
211 {
212 	kset_unregister(dlm_kset);
213 }
214 
215 static struct dlm_ls *find_ls_to_scan(void)
216 {
217 	struct dlm_ls *ls;
218 
219 	spin_lock(&lslist_lock);
220 	list_for_each_entry(ls, &lslist, ls_list) {
221 		if (time_after_eq(jiffies, ls->ls_scan_time +
222 					    dlm_config.ci_scan_secs * HZ)) {
223 			spin_unlock(&lslist_lock);
224 			return ls;
225 		}
226 	}
227 	spin_unlock(&lslist_lock);
228 	return NULL;
229 }
230 
231 static int dlm_scand(void *data)
232 {
233 	struct dlm_ls *ls;
234 	int timeout_jiffies = dlm_config.ci_scan_secs * HZ;
235 
236 	while (!kthread_should_stop()) {
237 		ls = find_ls_to_scan();
238 		if (ls) {
239 			if (dlm_lock_recovery_try(ls)) {
240 				ls->ls_scan_time = jiffies;
241 				dlm_scan_rsbs(ls);
242 				dlm_scan_timeout(ls);
243 				dlm_unlock_recovery(ls);
244 			} else {
245 				ls->ls_scan_time += HZ;
246 			}
247 		} else {
248 			schedule_timeout_interruptible(timeout_jiffies);
249 		}
250 	}
251 	return 0;
252 }
253 
254 static int dlm_scand_start(void)
255 {
256 	struct task_struct *p;
257 	int error = 0;
258 
259 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
260 	if (IS_ERR(p))
261 		error = PTR_ERR(p);
262 	else
263 		scand_task = p;
264 	return error;
265 }
266 
267 static void dlm_scand_stop(void)
268 {
269 	kthread_stop(scand_task);
270 }
271 
272 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
273 {
274 	struct dlm_ls *ls;
275 
276 	spin_lock(&lslist_lock);
277 
278 	list_for_each_entry(ls, &lslist, ls_list) {
279 		if (ls->ls_global_id == id) {
280 			ls->ls_count++;
281 			goto out;
282 		}
283 	}
284 	ls = NULL;
285  out:
286 	spin_unlock(&lslist_lock);
287 	return ls;
288 }
289 
290 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
291 {
292 	struct dlm_ls *ls;
293 
294 	spin_lock(&lslist_lock);
295 	list_for_each_entry(ls, &lslist, ls_list) {
296 		if (ls->ls_local_handle == lockspace) {
297 			ls->ls_count++;
298 			goto out;
299 		}
300 	}
301 	ls = NULL;
302  out:
303 	spin_unlock(&lslist_lock);
304 	return ls;
305 }
306 
307 struct dlm_ls *dlm_find_lockspace_device(int minor)
308 {
309 	struct dlm_ls *ls;
310 
311 	spin_lock(&lslist_lock);
312 	list_for_each_entry(ls, &lslist, ls_list) {
313 		if (ls->ls_device.minor == minor) {
314 			ls->ls_count++;
315 			goto out;
316 		}
317 	}
318 	ls = NULL;
319  out:
320 	spin_unlock(&lslist_lock);
321 	return ls;
322 }
323 
324 void dlm_put_lockspace(struct dlm_ls *ls)
325 {
326 	spin_lock(&lslist_lock);
327 	ls->ls_count--;
328 	spin_unlock(&lslist_lock);
329 }
330 
331 static void remove_lockspace(struct dlm_ls *ls)
332 {
333 	for (;;) {
334 		spin_lock(&lslist_lock);
335 		if (ls->ls_count == 0) {
336 			WARN_ON(ls->ls_create_count != 0);
337 			list_del(&ls->ls_list);
338 			spin_unlock(&lslist_lock);
339 			return;
340 		}
341 		spin_unlock(&lslist_lock);
342 		ssleep(1);
343 	}
344 }
345 
346 static int threads_start(void)
347 {
348 	int error;
349 
350 	/* Thread which process lock requests for all lockspace's */
351 	error = dlm_astd_start();
352 	if (error) {
353 		log_print("cannot start dlm_astd thread %d", error);
354 		goto fail;
355 	}
356 
357 	error = dlm_scand_start();
358 	if (error) {
359 		log_print("cannot start dlm_scand thread %d", error);
360 		goto astd_fail;
361 	}
362 
363 	/* Thread for sending/receiving messages for all lockspace's */
364 	error = dlm_lowcomms_start();
365 	if (error) {
366 		log_print("cannot start dlm lowcomms %d", error);
367 		goto scand_fail;
368 	}
369 
370 	return 0;
371 
372  scand_fail:
373 	dlm_scand_stop();
374  astd_fail:
375 	dlm_astd_stop();
376  fail:
377 	return error;
378 }
379 
380 static void threads_stop(void)
381 {
382 	dlm_scand_stop();
383 	dlm_lowcomms_stop();
384 	dlm_astd_stop();
385 }
386 
387 static int new_lockspace(const char *name, int namelen, void **lockspace,
388 			 uint32_t flags, int lvblen)
389 {
390 	struct dlm_ls *ls;
391 	int i, size, error;
392 	int do_unreg = 0;
393 
394 	if (namelen > DLM_LOCKSPACE_LEN)
395 		return -EINVAL;
396 
397 	if (!lvblen || (lvblen % 8))
398 		return -EINVAL;
399 
400 	if (!try_module_get(THIS_MODULE))
401 		return -EINVAL;
402 
403 	if (!dlm_user_daemon_available()) {
404 		module_put(THIS_MODULE);
405 		return -EUNATCH;
406 	}
407 
408 	error = 0;
409 
410 	spin_lock(&lslist_lock);
411 	list_for_each_entry(ls, &lslist, ls_list) {
412 		WARN_ON(ls->ls_create_count <= 0);
413 		if (ls->ls_namelen != namelen)
414 			continue;
415 		if (memcmp(ls->ls_name, name, namelen))
416 			continue;
417 		if (flags & DLM_LSFL_NEWEXCL) {
418 			error = -EEXIST;
419 			break;
420 		}
421 		ls->ls_create_count++;
422 		*lockspace = ls;
423 		error = 1;
424 		break;
425 	}
426 	spin_unlock(&lslist_lock);
427 
428 	if (error)
429 		goto out;
430 
431 	error = -ENOMEM;
432 
433 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
434 	if (!ls)
435 		goto out;
436 	memcpy(ls->ls_name, name, namelen);
437 	ls->ls_namelen = namelen;
438 	ls->ls_lvblen = lvblen;
439 	ls->ls_count = 0;
440 	ls->ls_flags = 0;
441 	ls->ls_scan_time = jiffies;
442 
443 	if (flags & DLM_LSFL_TIMEWARN)
444 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
445 
446 	if (flags & DLM_LSFL_FS)
447 		ls->ls_allocation = GFP_NOFS;
448 	else
449 		ls->ls_allocation = GFP_KERNEL;
450 
451 	/* ls_exflags are forced to match among nodes, and we don't
452 	   need to require all nodes to have some flags set */
453 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
454 				    DLM_LSFL_NEWEXCL));
455 
456 	size = dlm_config.ci_rsbtbl_size;
457 	ls->ls_rsbtbl_size = size;
458 
459 	ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
460 	if (!ls->ls_rsbtbl)
461 		goto out_lsfree;
462 	for (i = 0; i < size; i++) {
463 		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
464 		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
465 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
466 	}
467 
468 	size = dlm_config.ci_lkbtbl_size;
469 	ls->ls_lkbtbl_size = size;
470 
471 	ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
472 	if (!ls->ls_lkbtbl)
473 		goto out_rsbfree;
474 	for (i = 0; i < size; i++) {
475 		INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
476 		rwlock_init(&ls->ls_lkbtbl[i].lock);
477 		ls->ls_lkbtbl[i].counter = 1;
478 	}
479 
480 	size = dlm_config.ci_dirtbl_size;
481 	ls->ls_dirtbl_size = size;
482 
483 	ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
484 	if (!ls->ls_dirtbl)
485 		goto out_lkbfree;
486 	for (i = 0; i < size; i++) {
487 		INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
488 		spin_lock_init(&ls->ls_dirtbl[i].lock);
489 	}
490 
491 	INIT_LIST_HEAD(&ls->ls_waiters);
492 	mutex_init(&ls->ls_waiters_mutex);
493 	INIT_LIST_HEAD(&ls->ls_orphans);
494 	mutex_init(&ls->ls_orphans_mutex);
495 	INIT_LIST_HEAD(&ls->ls_timeout);
496 	mutex_init(&ls->ls_timeout_mutex);
497 
498 	INIT_LIST_HEAD(&ls->ls_nodes);
499 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
500 	ls->ls_num_nodes = 0;
501 	ls->ls_low_nodeid = 0;
502 	ls->ls_total_weight = 0;
503 	ls->ls_node_array = NULL;
504 
505 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
506 	ls->ls_stub_rsb.res_ls = ls;
507 
508 	ls->ls_debug_rsb_dentry = NULL;
509 	ls->ls_debug_waiters_dentry = NULL;
510 
511 	init_waitqueue_head(&ls->ls_uevent_wait);
512 	ls->ls_uevent_result = 0;
513 	init_completion(&ls->ls_members_done);
514 	ls->ls_members_result = -1;
515 
516 	ls->ls_recoverd_task = NULL;
517 	mutex_init(&ls->ls_recoverd_active);
518 	spin_lock_init(&ls->ls_recover_lock);
519 	spin_lock_init(&ls->ls_rcom_spin);
520 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
521 	ls->ls_recover_status = 0;
522 	ls->ls_recover_seq = 0;
523 	ls->ls_recover_args = NULL;
524 	init_rwsem(&ls->ls_in_recovery);
525 	init_rwsem(&ls->ls_recv_active);
526 	INIT_LIST_HEAD(&ls->ls_requestqueue);
527 	mutex_init(&ls->ls_requestqueue_mutex);
528 	mutex_init(&ls->ls_clear_proc_locks);
529 
530 	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
531 	if (!ls->ls_recover_buf)
532 		goto out_dirfree;
533 
534 	INIT_LIST_HEAD(&ls->ls_recover_list);
535 	spin_lock_init(&ls->ls_recover_list_lock);
536 	ls->ls_recover_list_count = 0;
537 	ls->ls_local_handle = ls;
538 	init_waitqueue_head(&ls->ls_wait_general);
539 	INIT_LIST_HEAD(&ls->ls_root_list);
540 	init_rwsem(&ls->ls_root_sem);
541 
542 	down_write(&ls->ls_in_recovery);
543 
544 	spin_lock(&lslist_lock);
545 	ls->ls_create_count = 1;
546 	list_add(&ls->ls_list, &lslist);
547 	spin_unlock(&lslist_lock);
548 
549 	/* needs to find ls in lslist */
550 	error = dlm_recoverd_start(ls);
551 	if (error) {
552 		log_error(ls, "can't start dlm_recoverd %d", error);
553 		goto out_delist;
554 	}
555 
556 	ls->ls_kobj.kset = dlm_kset;
557 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
558 				     "%s", ls->ls_name);
559 	if (error)
560 		goto out_stop;
561 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
562 
563 	/* let kobject handle freeing of ls if there's an error */
564 	do_unreg = 1;
565 
566 	/* This uevent triggers dlm_controld in userspace to add us to the
567 	   group of nodes that are members of this lockspace (managed by the
568 	   cluster infrastructure.)  Once it's done that, it tells us who the
569 	   current lockspace members are (via configfs) and then tells the
570 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
571 
572 	error = do_uevent(ls, 1);
573 	if (error)
574 		goto out_stop;
575 
576 	wait_for_completion(&ls->ls_members_done);
577 	error = ls->ls_members_result;
578 	if (error)
579 		goto out_members;
580 
581 	dlm_create_debug_file(ls);
582 
583 	log_debug(ls, "join complete");
584 	*lockspace = ls;
585 	return 0;
586 
587  out_members:
588 	do_uevent(ls, 0);
589 	dlm_clear_members(ls);
590 	kfree(ls->ls_node_array);
591  out_stop:
592 	dlm_recoverd_stop(ls);
593  out_delist:
594 	spin_lock(&lslist_lock);
595 	list_del(&ls->ls_list);
596 	spin_unlock(&lslist_lock);
597 	kfree(ls->ls_recover_buf);
598  out_dirfree:
599 	kfree(ls->ls_dirtbl);
600  out_lkbfree:
601 	kfree(ls->ls_lkbtbl);
602  out_rsbfree:
603 	kfree(ls->ls_rsbtbl);
604  out_lsfree:
605 	if (do_unreg)
606 		kobject_put(&ls->ls_kobj);
607 	else
608 		kfree(ls);
609  out:
610 	module_put(THIS_MODULE);
611 	return error;
612 }
613 
614 int dlm_new_lockspace(const char *name, int namelen, void **lockspace,
615 		      uint32_t flags, int lvblen)
616 {
617 	int error = 0;
618 
619 	mutex_lock(&ls_lock);
620 	if (!ls_count)
621 		error = threads_start();
622 	if (error)
623 		goto out;
624 
625 	error = new_lockspace(name, namelen, lockspace, flags, lvblen);
626 	if (!error)
627 		ls_count++;
628 	if (error > 0)
629 		error = 0;
630 	if (!ls_count)
631 		threads_stop();
632  out:
633 	mutex_unlock(&ls_lock);
634 	return error;
635 }
636 
637 /* Return 1 if the lockspace still has active remote locks,
638  *        2 if the lockspace still has active local locks.
639  */
640 static int lockspace_busy(struct dlm_ls *ls)
641 {
642 	int i, lkb_found = 0;
643 	struct dlm_lkb *lkb;
644 
645 	/* NOTE: We check the lockidtbl here rather than the resource table.
646 	   This is because there may be LKBs queued as ASTs that have been
647 	   unlinked from their RSBs and are pending deletion once the AST has
648 	   been delivered */
649 
650 	for (i = 0; i < ls->ls_lkbtbl_size; i++) {
651 		read_lock(&ls->ls_lkbtbl[i].lock);
652 		if (!list_empty(&ls->ls_lkbtbl[i].list)) {
653 			lkb_found = 1;
654 			list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
655 					    lkb_idtbl_list) {
656 				if (!lkb->lkb_nodeid) {
657 					read_unlock(&ls->ls_lkbtbl[i].lock);
658 					return 2;
659 				}
660 			}
661 		}
662 		read_unlock(&ls->ls_lkbtbl[i].lock);
663 	}
664 	return lkb_found;
665 }
666 
667 static int release_lockspace(struct dlm_ls *ls, int force)
668 {
669 	struct dlm_lkb *lkb;
670 	struct dlm_rsb *rsb;
671 	struct list_head *head;
672 	int i, busy, rv;
673 
674 	busy = lockspace_busy(ls);
675 
676 	spin_lock(&lslist_lock);
677 	if (ls->ls_create_count == 1) {
678 		if (busy > force)
679 			rv = -EBUSY;
680 		else {
681 			/* remove_lockspace takes ls off lslist */
682 			ls->ls_create_count = 0;
683 			rv = 0;
684 		}
685 	} else if (ls->ls_create_count > 1) {
686 		rv = --ls->ls_create_count;
687 	} else {
688 		rv = -EINVAL;
689 	}
690 	spin_unlock(&lslist_lock);
691 
692 	if (rv) {
693 		log_debug(ls, "release_lockspace no remove %d", rv);
694 		return rv;
695 	}
696 
697 	dlm_device_deregister(ls);
698 
699 	if (force < 3 && dlm_user_daemon_available())
700 		do_uevent(ls, 0);
701 
702 	dlm_recoverd_stop(ls);
703 
704 	remove_lockspace(ls);
705 
706 	dlm_delete_debug_file(ls);
707 
708 	dlm_astd_suspend();
709 
710 	kfree(ls->ls_recover_buf);
711 
712 	/*
713 	 * Free direntry structs.
714 	 */
715 
716 	dlm_dir_clear(ls);
717 	kfree(ls->ls_dirtbl);
718 
719 	/*
720 	 * Free all lkb's on lkbtbl[] lists.
721 	 */
722 
723 	for (i = 0; i < ls->ls_lkbtbl_size; i++) {
724 		head = &ls->ls_lkbtbl[i].list;
725 		while (!list_empty(head)) {
726 			lkb = list_entry(head->next, struct dlm_lkb,
727 					 lkb_idtbl_list);
728 
729 			list_del(&lkb->lkb_idtbl_list);
730 
731 			dlm_del_ast(lkb);
732 
733 			if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
734 				dlm_free_lvb(lkb->lkb_lvbptr);
735 
736 			dlm_free_lkb(lkb);
737 		}
738 	}
739 	dlm_astd_resume();
740 
741 	kfree(ls->ls_lkbtbl);
742 
743 	/*
744 	 * Free all rsb's on rsbtbl[] lists
745 	 */
746 
747 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
748 		head = &ls->ls_rsbtbl[i].list;
749 		while (!list_empty(head)) {
750 			rsb = list_entry(head->next, struct dlm_rsb,
751 					 res_hashchain);
752 
753 			list_del(&rsb->res_hashchain);
754 			dlm_free_rsb(rsb);
755 		}
756 
757 		head = &ls->ls_rsbtbl[i].toss;
758 		while (!list_empty(head)) {
759 			rsb = list_entry(head->next, struct dlm_rsb,
760 					 res_hashchain);
761 			list_del(&rsb->res_hashchain);
762 			dlm_free_rsb(rsb);
763 		}
764 	}
765 
766 	kfree(ls->ls_rsbtbl);
767 
768 	/*
769 	 * Free structures on any other lists
770 	 */
771 
772 	dlm_purge_requestqueue(ls);
773 	kfree(ls->ls_recover_args);
774 	dlm_clear_free_entries(ls);
775 	dlm_clear_members(ls);
776 	dlm_clear_members_gone(ls);
777 	kfree(ls->ls_node_array);
778 	log_debug(ls, "release_lockspace final free");
779 	kobject_put(&ls->ls_kobj);
780 	/* The ls structure will be freed when the kobject is done with */
781 
782 	module_put(THIS_MODULE);
783 	return 0;
784 }
785 
786 /*
787  * Called when a system has released all its locks and is not going to use the
788  * lockspace any longer.  We free everything we're managing for this lockspace.
789  * Remaining nodes will go through the recovery process as if we'd died.  The
790  * lockspace must continue to function as usual, participating in recoveries,
791  * until this returns.
792  *
793  * Force has 4 possible values:
794  * 0 - don't destroy locksapce if it has any LKBs
795  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
796  * 2 - destroy lockspace regardless of LKBs
797  * 3 - destroy lockspace as part of a forced shutdown
798  */
799 
800 int dlm_release_lockspace(void *lockspace, int force)
801 {
802 	struct dlm_ls *ls;
803 	int error;
804 
805 	ls = dlm_find_lockspace_local(lockspace);
806 	if (!ls)
807 		return -EINVAL;
808 	dlm_put_lockspace(ls);
809 
810 	mutex_lock(&ls_lock);
811 	error = release_lockspace(ls, force);
812 	if (!error)
813 		ls_count--;
814 	if (!ls_count)
815 		threads_stop();
816 	mutex_unlock(&ls_lock);
817 
818 	return error;
819 }
820 
821 void dlm_stop_lockspaces(void)
822 {
823 	struct dlm_ls *ls;
824 
825  restart:
826 	spin_lock(&lslist_lock);
827 	list_for_each_entry(ls, &lslist, ls_list) {
828 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
829 			continue;
830 		spin_unlock(&lslist_lock);
831 		log_error(ls, "no userland control daemon, stopping lockspace");
832 		dlm_ls_stop(ls);
833 		goto restart;
834 	}
835 	spin_unlock(&lslist_lock);
836 }
837 
838