xref: /openbmc/linux/fs/dlm/lockspace.c (revision b8bb76713ec50df2f11efee386e16f93d51e1076)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2008 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 #include "user.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 static struct task_struct *	scand_task;
33 
34 
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37 	ssize_t ret = len;
38 	int n = simple_strtol(buf, NULL, 0);
39 
40 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
41 	if (!ls)
42 		return -EINVAL;
43 
44 	switch (n) {
45 	case 0:
46 		dlm_ls_stop(ls);
47 		break;
48 	case 1:
49 		dlm_ls_start(ls);
50 		break;
51 	default:
52 		ret = -EINVAL;
53 	}
54 	dlm_put_lockspace(ls);
55 	return ret;
56 }
57 
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60 	ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62 	wake_up(&ls->ls_uevent_wait);
63 	return len;
64 }
65 
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70 
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73 	ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74 	return len;
75 }
76 
77 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
78 {
79 	uint32_t status = dlm_recover_status(ls);
80 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
81 }
82 
83 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
84 {
85 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
86 }
87 
88 struct dlm_attr {
89 	struct attribute attr;
90 	ssize_t (*show)(struct dlm_ls *, char *);
91 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
92 };
93 
94 static struct dlm_attr dlm_attr_control = {
95 	.attr  = {.name = "control", .mode = S_IWUSR},
96 	.store = dlm_control_store
97 };
98 
99 static struct dlm_attr dlm_attr_event = {
100 	.attr  = {.name = "event_done", .mode = S_IWUSR},
101 	.store = dlm_event_store
102 };
103 
104 static struct dlm_attr dlm_attr_id = {
105 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
106 	.show  = dlm_id_show,
107 	.store = dlm_id_store
108 };
109 
110 static struct dlm_attr dlm_attr_recover_status = {
111 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
112 	.show  = dlm_recover_status_show
113 };
114 
115 static struct dlm_attr dlm_attr_recover_nodeid = {
116 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
117 	.show  = dlm_recover_nodeid_show
118 };
119 
120 static struct attribute *dlm_attrs[] = {
121 	&dlm_attr_control.attr,
122 	&dlm_attr_event.attr,
123 	&dlm_attr_id.attr,
124 	&dlm_attr_recover_status.attr,
125 	&dlm_attr_recover_nodeid.attr,
126 	NULL,
127 };
128 
129 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
130 			     char *buf)
131 {
132 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
133 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
134 	return a->show ? a->show(ls, buf) : 0;
135 }
136 
137 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
138 			      const char *buf, size_t len)
139 {
140 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
141 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
142 	return a->store ? a->store(ls, buf, len) : len;
143 }
144 
145 static void lockspace_kobj_release(struct kobject *k)
146 {
147 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
148 	kfree(ls);
149 }
150 
151 static struct sysfs_ops dlm_attr_ops = {
152 	.show  = dlm_attr_show,
153 	.store = dlm_attr_store,
154 };
155 
156 static struct kobj_type dlm_ktype = {
157 	.default_attrs = dlm_attrs,
158 	.sysfs_ops     = &dlm_attr_ops,
159 	.release       = lockspace_kobj_release,
160 };
161 
162 static struct kset *dlm_kset;
163 
164 static int do_uevent(struct dlm_ls *ls, int in)
165 {
166 	int error;
167 
168 	if (in)
169 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
170 	else
171 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
172 
173 	log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
174 
175 	/* dlm_controld will see the uevent, do the necessary group management
176 	   and then write to sysfs to wake us */
177 
178 	error = wait_event_interruptible(ls->ls_uevent_wait,
179 			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
180 
181 	log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
182 
183 	if (error)
184 		goto out;
185 
186 	error = ls->ls_uevent_result;
187  out:
188 	if (error)
189 		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
190 			  error, ls->ls_uevent_result);
191 	return error;
192 }
193 
194 
195 int __init dlm_lockspace_init(void)
196 {
197 	ls_count = 0;
198 	mutex_init(&ls_lock);
199 	INIT_LIST_HEAD(&lslist);
200 	spin_lock_init(&lslist_lock);
201 
202 	dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
203 	if (!dlm_kset) {
204 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
205 		return -ENOMEM;
206 	}
207 	return 0;
208 }
209 
210 void dlm_lockspace_exit(void)
211 {
212 	kset_unregister(dlm_kset);
213 }
214 
215 static struct dlm_ls *find_ls_to_scan(void)
216 {
217 	struct dlm_ls *ls;
218 
219 	spin_lock(&lslist_lock);
220 	list_for_each_entry(ls, &lslist, ls_list) {
221 		if (time_after_eq(jiffies, ls->ls_scan_time +
222 					    dlm_config.ci_scan_secs * HZ)) {
223 			spin_unlock(&lslist_lock);
224 			return ls;
225 		}
226 	}
227 	spin_unlock(&lslist_lock);
228 	return NULL;
229 }
230 
231 static int dlm_scand(void *data)
232 {
233 	struct dlm_ls *ls;
234 	int timeout_jiffies = dlm_config.ci_scan_secs * HZ;
235 
236 	while (!kthread_should_stop()) {
237 		ls = find_ls_to_scan();
238 		if (ls) {
239 			if (dlm_lock_recovery_try(ls)) {
240 				ls->ls_scan_time = jiffies;
241 				dlm_scan_rsbs(ls);
242 				dlm_scan_timeout(ls);
243 				dlm_unlock_recovery(ls);
244 			} else {
245 				ls->ls_scan_time += HZ;
246 			}
247 		} else {
248 			schedule_timeout_interruptible(timeout_jiffies);
249 		}
250 	}
251 	return 0;
252 }
253 
254 static int dlm_scand_start(void)
255 {
256 	struct task_struct *p;
257 	int error = 0;
258 
259 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
260 	if (IS_ERR(p))
261 		error = PTR_ERR(p);
262 	else
263 		scand_task = p;
264 	return error;
265 }
266 
267 static void dlm_scand_stop(void)
268 {
269 	kthread_stop(scand_task);
270 }
271 
272 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
273 {
274 	struct dlm_ls *ls;
275 
276 	spin_lock(&lslist_lock);
277 
278 	list_for_each_entry(ls, &lslist, ls_list) {
279 		if (ls->ls_global_id == id) {
280 			ls->ls_count++;
281 			goto out;
282 		}
283 	}
284 	ls = NULL;
285  out:
286 	spin_unlock(&lslist_lock);
287 	return ls;
288 }
289 
290 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
291 {
292 	struct dlm_ls *ls;
293 
294 	spin_lock(&lslist_lock);
295 	list_for_each_entry(ls, &lslist, ls_list) {
296 		if (ls->ls_local_handle == lockspace) {
297 			ls->ls_count++;
298 			goto out;
299 		}
300 	}
301 	ls = NULL;
302  out:
303 	spin_unlock(&lslist_lock);
304 	return ls;
305 }
306 
307 struct dlm_ls *dlm_find_lockspace_device(int minor)
308 {
309 	struct dlm_ls *ls;
310 
311 	spin_lock(&lslist_lock);
312 	list_for_each_entry(ls, &lslist, ls_list) {
313 		if (ls->ls_device.minor == minor) {
314 			ls->ls_count++;
315 			goto out;
316 		}
317 	}
318 	ls = NULL;
319  out:
320 	spin_unlock(&lslist_lock);
321 	return ls;
322 }
323 
324 void dlm_put_lockspace(struct dlm_ls *ls)
325 {
326 	spin_lock(&lslist_lock);
327 	ls->ls_count--;
328 	spin_unlock(&lslist_lock);
329 }
330 
331 static void remove_lockspace(struct dlm_ls *ls)
332 {
333 	for (;;) {
334 		spin_lock(&lslist_lock);
335 		if (ls->ls_count == 0) {
336 			WARN_ON(ls->ls_create_count != 0);
337 			list_del(&ls->ls_list);
338 			spin_unlock(&lslist_lock);
339 			return;
340 		}
341 		spin_unlock(&lslist_lock);
342 		ssleep(1);
343 	}
344 }
345 
346 static int threads_start(void)
347 {
348 	int error;
349 
350 	/* Thread which process lock requests for all lockspace's */
351 	error = dlm_astd_start();
352 	if (error) {
353 		log_print("cannot start dlm_astd thread %d", error);
354 		goto fail;
355 	}
356 
357 	error = dlm_scand_start();
358 	if (error) {
359 		log_print("cannot start dlm_scand thread %d", error);
360 		goto astd_fail;
361 	}
362 
363 	/* Thread for sending/receiving messages for all lockspace's */
364 	error = dlm_lowcomms_start();
365 	if (error) {
366 		log_print("cannot start dlm lowcomms %d", error);
367 		goto scand_fail;
368 	}
369 
370 	return 0;
371 
372  scand_fail:
373 	dlm_scand_stop();
374  astd_fail:
375 	dlm_astd_stop();
376  fail:
377 	return error;
378 }
379 
380 static void threads_stop(void)
381 {
382 	dlm_scand_stop();
383 	dlm_lowcomms_stop();
384 	dlm_astd_stop();
385 }
386 
387 static int new_lockspace(char *name, int namelen, void **lockspace,
388 			 uint32_t flags, int lvblen)
389 {
390 	struct dlm_ls *ls;
391 	int i, size, error;
392 	int do_unreg = 0;
393 
394 	if (namelen > DLM_LOCKSPACE_LEN)
395 		return -EINVAL;
396 
397 	if (!lvblen || (lvblen % 8))
398 		return -EINVAL;
399 
400 	if (!try_module_get(THIS_MODULE))
401 		return -EINVAL;
402 
403 	if (!dlm_user_daemon_available()) {
404 		module_put(THIS_MODULE);
405 		return -EUNATCH;
406 	}
407 
408 	error = 0;
409 
410 	spin_lock(&lslist_lock);
411 	list_for_each_entry(ls, &lslist, ls_list) {
412 		WARN_ON(ls->ls_create_count <= 0);
413 		if (ls->ls_namelen != namelen)
414 			continue;
415 		if (memcmp(ls->ls_name, name, namelen))
416 			continue;
417 		if (flags & DLM_LSFL_NEWEXCL) {
418 			error = -EEXIST;
419 			break;
420 		}
421 		ls->ls_create_count++;
422 		module_put(THIS_MODULE);
423 		error = 1; /* not an error, return 0 */
424 		break;
425 	}
426 	spin_unlock(&lslist_lock);
427 
428 	if (error < 0)
429 		goto out;
430 	if (error)
431 		goto ret_zero;
432 
433 	error = -ENOMEM;
434 
435 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
436 	if (!ls)
437 		goto out;
438 	memcpy(ls->ls_name, name, namelen);
439 	ls->ls_namelen = namelen;
440 	ls->ls_lvblen = lvblen;
441 	ls->ls_count = 0;
442 	ls->ls_flags = 0;
443 	ls->ls_scan_time = jiffies;
444 
445 	if (flags & DLM_LSFL_TIMEWARN)
446 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
447 
448 	if (flags & DLM_LSFL_FS)
449 		ls->ls_allocation = GFP_NOFS;
450 	else
451 		ls->ls_allocation = GFP_KERNEL;
452 
453 	/* ls_exflags are forced to match among nodes, and we don't
454 	   need to require all nodes to have some flags set */
455 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
456 				    DLM_LSFL_NEWEXCL));
457 
458 	size = dlm_config.ci_rsbtbl_size;
459 	ls->ls_rsbtbl_size = size;
460 
461 	ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
462 	if (!ls->ls_rsbtbl)
463 		goto out_lsfree;
464 	for (i = 0; i < size; i++) {
465 		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
466 		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
467 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
468 	}
469 
470 	size = dlm_config.ci_lkbtbl_size;
471 	ls->ls_lkbtbl_size = size;
472 
473 	ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
474 	if (!ls->ls_lkbtbl)
475 		goto out_rsbfree;
476 	for (i = 0; i < size; i++) {
477 		INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
478 		rwlock_init(&ls->ls_lkbtbl[i].lock);
479 		ls->ls_lkbtbl[i].counter = 1;
480 	}
481 
482 	size = dlm_config.ci_dirtbl_size;
483 	ls->ls_dirtbl_size = size;
484 
485 	ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
486 	if (!ls->ls_dirtbl)
487 		goto out_lkbfree;
488 	for (i = 0; i < size; i++) {
489 		INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
490 		spin_lock_init(&ls->ls_dirtbl[i].lock);
491 	}
492 
493 	INIT_LIST_HEAD(&ls->ls_waiters);
494 	mutex_init(&ls->ls_waiters_mutex);
495 	INIT_LIST_HEAD(&ls->ls_orphans);
496 	mutex_init(&ls->ls_orphans_mutex);
497 	INIT_LIST_HEAD(&ls->ls_timeout);
498 	mutex_init(&ls->ls_timeout_mutex);
499 
500 	INIT_LIST_HEAD(&ls->ls_nodes);
501 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
502 	ls->ls_num_nodes = 0;
503 	ls->ls_low_nodeid = 0;
504 	ls->ls_total_weight = 0;
505 	ls->ls_node_array = NULL;
506 
507 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
508 	ls->ls_stub_rsb.res_ls = ls;
509 
510 	ls->ls_debug_rsb_dentry = NULL;
511 	ls->ls_debug_waiters_dentry = NULL;
512 
513 	init_waitqueue_head(&ls->ls_uevent_wait);
514 	ls->ls_uevent_result = 0;
515 	init_completion(&ls->ls_members_done);
516 	ls->ls_members_result = -1;
517 
518 	ls->ls_recoverd_task = NULL;
519 	mutex_init(&ls->ls_recoverd_active);
520 	spin_lock_init(&ls->ls_recover_lock);
521 	spin_lock_init(&ls->ls_rcom_spin);
522 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
523 	ls->ls_recover_status = 0;
524 	ls->ls_recover_seq = 0;
525 	ls->ls_recover_args = NULL;
526 	init_rwsem(&ls->ls_in_recovery);
527 	init_rwsem(&ls->ls_recv_active);
528 	INIT_LIST_HEAD(&ls->ls_requestqueue);
529 	mutex_init(&ls->ls_requestqueue_mutex);
530 	mutex_init(&ls->ls_clear_proc_locks);
531 
532 	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
533 	if (!ls->ls_recover_buf)
534 		goto out_dirfree;
535 
536 	INIT_LIST_HEAD(&ls->ls_recover_list);
537 	spin_lock_init(&ls->ls_recover_list_lock);
538 	ls->ls_recover_list_count = 0;
539 	ls->ls_local_handle = ls;
540 	init_waitqueue_head(&ls->ls_wait_general);
541 	INIT_LIST_HEAD(&ls->ls_root_list);
542 	init_rwsem(&ls->ls_root_sem);
543 
544 	down_write(&ls->ls_in_recovery);
545 
546 	spin_lock(&lslist_lock);
547 	ls->ls_create_count = 1;
548 	list_add(&ls->ls_list, &lslist);
549 	spin_unlock(&lslist_lock);
550 
551 	/* needs to find ls in lslist */
552 	error = dlm_recoverd_start(ls);
553 	if (error) {
554 		log_error(ls, "can't start dlm_recoverd %d", error);
555 		goto out_delist;
556 	}
557 
558 	ls->ls_kobj.kset = dlm_kset;
559 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
560 				     "%s", ls->ls_name);
561 	if (error)
562 		goto out_stop;
563 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
564 
565 	/* let kobject handle freeing of ls if there's an error */
566 	do_unreg = 1;
567 
568 	/* This uevent triggers dlm_controld in userspace to add us to the
569 	   group of nodes that are members of this lockspace (managed by the
570 	   cluster infrastructure.)  Once it's done that, it tells us who the
571 	   current lockspace members are (via configfs) and then tells the
572 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
573 
574 	error = do_uevent(ls, 1);
575 	if (error)
576 		goto out_stop;
577 
578 	wait_for_completion(&ls->ls_members_done);
579 	error = ls->ls_members_result;
580 	if (error)
581 		goto out_members;
582 
583 	dlm_create_debug_file(ls);
584 
585 	log_debug(ls, "join complete");
586  ret_zero:
587 	*lockspace = ls;
588 	return 0;
589 
590  out_members:
591 	do_uevent(ls, 0);
592 	dlm_clear_members(ls);
593 	kfree(ls->ls_node_array);
594  out_stop:
595 	dlm_recoverd_stop(ls);
596  out_delist:
597 	spin_lock(&lslist_lock);
598 	list_del(&ls->ls_list);
599 	spin_unlock(&lslist_lock);
600 	kfree(ls->ls_recover_buf);
601  out_dirfree:
602 	kfree(ls->ls_dirtbl);
603  out_lkbfree:
604 	kfree(ls->ls_lkbtbl);
605  out_rsbfree:
606 	kfree(ls->ls_rsbtbl);
607  out_lsfree:
608 	if (do_unreg)
609 		kobject_put(&ls->ls_kobj);
610 	else
611 		kfree(ls);
612  out:
613 	module_put(THIS_MODULE);
614 	return error;
615 }
616 
617 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
618 		      uint32_t flags, int lvblen)
619 {
620 	int error = 0;
621 
622 	mutex_lock(&ls_lock);
623 	if (!ls_count)
624 		error = threads_start();
625 	if (error)
626 		goto out;
627 
628 	error = new_lockspace(name, namelen, lockspace, flags, lvblen);
629 	if (!error)
630 		ls_count++;
631 	else if (!ls_count)
632 		threads_stop();
633  out:
634 	mutex_unlock(&ls_lock);
635 	return error;
636 }
637 
638 /* Return 1 if the lockspace still has active remote locks,
639  *        2 if the lockspace still has active local locks.
640  */
641 static int lockspace_busy(struct dlm_ls *ls)
642 {
643 	int i, lkb_found = 0;
644 	struct dlm_lkb *lkb;
645 
646 	/* NOTE: We check the lockidtbl here rather than the resource table.
647 	   This is because there may be LKBs queued as ASTs that have been
648 	   unlinked from their RSBs and are pending deletion once the AST has
649 	   been delivered */
650 
651 	for (i = 0; i < ls->ls_lkbtbl_size; i++) {
652 		read_lock(&ls->ls_lkbtbl[i].lock);
653 		if (!list_empty(&ls->ls_lkbtbl[i].list)) {
654 			lkb_found = 1;
655 			list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
656 					    lkb_idtbl_list) {
657 				if (!lkb->lkb_nodeid) {
658 					read_unlock(&ls->ls_lkbtbl[i].lock);
659 					return 2;
660 				}
661 			}
662 		}
663 		read_unlock(&ls->ls_lkbtbl[i].lock);
664 	}
665 	return lkb_found;
666 }
667 
668 static int release_lockspace(struct dlm_ls *ls, int force)
669 {
670 	struct dlm_lkb *lkb;
671 	struct dlm_rsb *rsb;
672 	struct list_head *head;
673 	int i, busy, rv;
674 
675 	busy = lockspace_busy(ls);
676 
677 	spin_lock(&lslist_lock);
678 	if (ls->ls_create_count == 1) {
679 		if (busy > force)
680 			rv = -EBUSY;
681 		else {
682 			/* remove_lockspace takes ls off lslist */
683 			ls->ls_create_count = 0;
684 			rv = 0;
685 		}
686 	} else if (ls->ls_create_count > 1) {
687 		rv = --ls->ls_create_count;
688 	} else {
689 		rv = -EINVAL;
690 	}
691 	spin_unlock(&lslist_lock);
692 
693 	if (rv) {
694 		log_debug(ls, "release_lockspace no remove %d", rv);
695 		return rv;
696 	}
697 
698 	dlm_device_deregister(ls);
699 
700 	if (force < 3 && dlm_user_daemon_available())
701 		do_uevent(ls, 0);
702 
703 	dlm_recoverd_stop(ls);
704 
705 	remove_lockspace(ls);
706 
707 	dlm_delete_debug_file(ls);
708 
709 	dlm_astd_suspend();
710 
711 	kfree(ls->ls_recover_buf);
712 
713 	/*
714 	 * Free direntry structs.
715 	 */
716 
717 	dlm_dir_clear(ls);
718 	kfree(ls->ls_dirtbl);
719 
720 	/*
721 	 * Free all lkb's on lkbtbl[] lists.
722 	 */
723 
724 	for (i = 0; i < ls->ls_lkbtbl_size; i++) {
725 		head = &ls->ls_lkbtbl[i].list;
726 		while (!list_empty(head)) {
727 			lkb = list_entry(head->next, struct dlm_lkb,
728 					 lkb_idtbl_list);
729 
730 			list_del(&lkb->lkb_idtbl_list);
731 
732 			dlm_del_ast(lkb);
733 
734 			if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
735 				dlm_free_lvb(lkb->lkb_lvbptr);
736 
737 			dlm_free_lkb(lkb);
738 		}
739 	}
740 	dlm_astd_resume();
741 
742 	kfree(ls->ls_lkbtbl);
743 
744 	/*
745 	 * Free all rsb's on rsbtbl[] lists
746 	 */
747 
748 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
749 		head = &ls->ls_rsbtbl[i].list;
750 		while (!list_empty(head)) {
751 			rsb = list_entry(head->next, struct dlm_rsb,
752 					 res_hashchain);
753 
754 			list_del(&rsb->res_hashchain);
755 			dlm_free_rsb(rsb);
756 		}
757 
758 		head = &ls->ls_rsbtbl[i].toss;
759 		while (!list_empty(head)) {
760 			rsb = list_entry(head->next, struct dlm_rsb,
761 					 res_hashchain);
762 			list_del(&rsb->res_hashchain);
763 			dlm_free_rsb(rsb);
764 		}
765 	}
766 
767 	kfree(ls->ls_rsbtbl);
768 
769 	/*
770 	 * Free structures on any other lists
771 	 */
772 
773 	dlm_purge_requestqueue(ls);
774 	kfree(ls->ls_recover_args);
775 	dlm_clear_free_entries(ls);
776 	dlm_clear_members(ls);
777 	dlm_clear_members_gone(ls);
778 	kfree(ls->ls_node_array);
779 	log_debug(ls, "release_lockspace final free");
780 	kobject_put(&ls->ls_kobj);
781 	/* The ls structure will be freed when the kobject is done with */
782 
783 	module_put(THIS_MODULE);
784 	return 0;
785 }
786 
787 /*
788  * Called when a system has released all its locks and is not going to use the
789  * lockspace any longer.  We free everything we're managing for this lockspace.
790  * Remaining nodes will go through the recovery process as if we'd died.  The
791  * lockspace must continue to function as usual, participating in recoveries,
792  * until this returns.
793  *
794  * Force has 4 possible values:
795  * 0 - don't destroy locksapce if it has any LKBs
796  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
797  * 2 - destroy lockspace regardless of LKBs
798  * 3 - destroy lockspace as part of a forced shutdown
799  */
800 
801 int dlm_release_lockspace(void *lockspace, int force)
802 {
803 	struct dlm_ls *ls;
804 	int error;
805 
806 	ls = dlm_find_lockspace_local(lockspace);
807 	if (!ls)
808 		return -EINVAL;
809 	dlm_put_lockspace(ls);
810 
811 	mutex_lock(&ls_lock);
812 	error = release_lockspace(ls, force);
813 	if (!error)
814 		ls_count--;
815 	if (!ls_count)
816 		threads_stop();
817 	mutex_unlock(&ls_lock);
818 
819 	return error;
820 }
821 
822 void dlm_stop_lockspaces(void)
823 {
824 	struct dlm_ls *ls;
825 
826  restart:
827 	spin_lock(&lslist_lock);
828 	list_for_each_entry(ls, &lslist, ls_list) {
829 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
830 			continue;
831 		spin_unlock(&lslist_lock);
832 		log_error(ls, "no userland control daemon, stopping lockspace");
833 		dlm_ls_stop(ls);
834 		goto restart;
835 	}
836 	spin_unlock(&lslist_lock);
837 }
838 
839