xref: /openbmc/linux/fs/dlm/lockspace.c (revision 643d1f7f)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 
27 static int			ls_count;
28 static struct mutex		ls_lock;
29 static struct list_head		lslist;
30 static spinlock_t		lslist_lock;
31 static struct task_struct *	scand_task;
32 
33 
34 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
35 {
36 	ssize_t ret = len;
37 	int n = simple_strtol(buf, NULL, 0);
38 
39 	ls = dlm_find_lockspace_local(ls->ls_local_handle);
40 	if (!ls)
41 		return -EINVAL;
42 
43 	switch (n) {
44 	case 0:
45 		dlm_ls_stop(ls);
46 		break;
47 	case 1:
48 		dlm_ls_start(ls);
49 		break;
50 	default:
51 		ret = -EINVAL;
52 	}
53 	dlm_put_lockspace(ls);
54 	return ret;
55 }
56 
57 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
58 {
59 	ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
60 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
61 	wake_up(&ls->ls_uevent_wait);
62 	return len;
63 }
64 
65 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
66 {
67 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
68 }
69 
70 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
71 {
72 	ls->ls_global_id = simple_strtoul(buf, NULL, 0);
73 	return len;
74 }
75 
76 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
77 {
78 	uint32_t status = dlm_recover_status(ls);
79 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
80 }
81 
82 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
83 {
84 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
85 }
86 
87 struct dlm_attr {
88 	struct attribute attr;
89 	ssize_t (*show)(struct dlm_ls *, char *);
90 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
91 };
92 
93 static struct dlm_attr dlm_attr_control = {
94 	.attr  = {.name = "control", .mode = S_IWUSR},
95 	.store = dlm_control_store
96 };
97 
98 static struct dlm_attr dlm_attr_event = {
99 	.attr  = {.name = "event_done", .mode = S_IWUSR},
100 	.store = dlm_event_store
101 };
102 
103 static struct dlm_attr dlm_attr_id = {
104 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
105 	.show  = dlm_id_show,
106 	.store = dlm_id_store
107 };
108 
109 static struct dlm_attr dlm_attr_recover_status = {
110 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
111 	.show  = dlm_recover_status_show
112 };
113 
114 static struct dlm_attr dlm_attr_recover_nodeid = {
115 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
116 	.show  = dlm_recover_nodeid_show
117 };
118 
119 static struct attribute *dlm_attrs[] = {
120 	&dlm_attr_control.attr,
121 	&dlm_attr_event.attr,
122 	&dlm_attr_id.attr,
123 	&dlm_attr_recover_status.attr,
124 	&dlm_attr_recover_nodeid.attr,
125 	NULL,
126 };
127 
128 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
129 			     char *buf)
130 {
131 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
132 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
133 	return a->show ? a->show(ls, buf) : 0;
134 }
135 
136 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
137 			      const char *buf, size_t len)
138 {
139 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
140 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
141 	return a->store ? a->store(ls, buf, len) : len;
142 }
143 
144 static void lockspace_kobj_release(struct kobject *k)
145 {
146 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
147 	kfree(ls);
148 }
149 
150 static struct sysfs_ops dlm_attr_ops = {
151 	.show  = dlm_attr_show,
152 	.store = dlm_attr_store,
153 };
154 
155 static struct kobj_type dlm_ktype = {
156 	.default_attrs = dlm_attrs,
157 	.sysfs_ops     = &dlm_attr_ops,
158 	.release       = lockspace_kobj_release,
159 };
160 
161 static struct kset *dlm_kset;
162 
163 static int do_uevent(struct dlm_ls *ls, int in)
164 {
165 	int error;
166 
167 	if (in)
168 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
169 	else
170 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
171 
172 	log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
173 
174 	/* dlm_controld will see the uevent, do the necessary group management
175 	   and then write to sysfs to wake us */
176 
177 	error = wait_event_interruptible(ls->ls_uevent_wait,
178 			test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
179 
180 	log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
181 
182 	if (error)
183 		goto out;
184 
185 	error = ls->ls_uevent_result;
186  out:
187 	if (error)
188 		log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
189 			  error, ls->ls_uevent_result);
190 	return error;
191 }
192 
193 
194 int dlm_lockspace_init(void)
195 {
196 	ls_count = 0;
197 	mutex_init(&ls_lock);
198 	INIT_LIST_HEAD(&lslist);
199 	spin_lock_init(&lslist_lock);
200 
201 	dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj);
202 	if (!dlm_kset) {
203 		printk(KERN_WARNING "%s: can not create kset\n", __FUNCTION__);
204 		return -ENOMEM;
205 	}
206 	return 0;
207 }
208 
209 void dlm_lockspace_exit(void)
210 {
211 	kset_unregister(dlm_kset);
212 }
213 
214 static int dlm_scand(void *data)
215 {
216 	struct dlm_ls *ls;
217 
218 	while (!kthread_should_stop()) {
219 		list_for_each_entry(ls, &lslist, ls_list) {
220 			if (dlm_lock_recovery_try(ls)) {
221 				dlm_scan_rsbs(ls);
222 				dlm_scan_timeout(ls);
223 				dlm_unlock_recovery(ls);
224 			}
225 		}
226 		schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
227 	}
228 	return 0;
229 }
230 
231 static int dlm_scand_start(void)
232 {
233 	struct task_struct *p;
234 	int error = 0;
235 
236 	p = kthread_run(dlm_scand, NULL, "dlm_scand");
237 	if (IS_ERR(p))
238 		error = PTR_ERR(p);
239 	else
240 		scand_task = p;
241 	return error;
242 }
243 
244 static void dlm_scand_stop(void)
245 {
246 	kthread_stop(scand_task);
247 }
248 
249 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
250 {
251 	struct dlm_ls *ls;
252 
253 	spin_lock(&lslist_lock);
254 
255 	list_for_each_entry(ls, &lslist, ls_list) {
256 		if (ls->ls_namelen == namelen &&
257 		    memcmp(ls->ls_name, name, namelen) == 0)
258 			goto out;
259 	}
260 	ls = NULL;
261  out:
262 	spin_unlock(&lslist_lock);
263 	return ls;
264 }
265 
266 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
267 {
268 	struct dlm_ls *ls;
269 
270 	spin_lock(&lslist_lock);
271 
272 	list_for_each_entry(ls, &lslist, ls_list) {
273 		if (ls->ls_global_id == id) {
274 			ls->ls_count++;
275 			goto out;
276 		}
277 	}
278 	ls = NULL;
279  out:
280 	spin_unlock(&lslist_lock);
281 	return ls;
282 }
283 
284 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
285 {
286 	struct dlm_ls *ls;
287 
288 	spin_lock(&lslist_lock);
289 	list_for_each_entry(ls, &lslist, ls_list) {
290 		if (ls->ls_local_handle == lockspace) {
291 			ls->ls_count++;
292 			goto out;
293 		}
294 	}
295 	ls = NULL;
296  out:
297 	spin_unlock(&lslist_lock);
298 	return ls;
299 }
300 
301 struct dlm_ls *dlm_find_lockspace_device(int minor)
302 {
303 	struct dlm_ls *ls;
304 
305 	spin_lock(&lslist_lock);
306 	list_for_each_entry(ls, &lslist, ls_list) {
307 		if (ls->ls_device.minor == minor) {
308 			ls->ls_count++;
309 			goto out;
310 		}
311 	}
312 	ls = NULL;
313  out:
314 	spin_unlock(&lslist_lock);
315 	return ls;
316 }
317 
318 void dlm_put_lockspace(struct dlm_ls *ls)
319 {
320 	spin_lock(&lslist_lock);
321 	ls->ls_count--;
322 	spin_unlock(&lslist_lock);
323 }
324 
325 static void remove_lockspace(struct dlm_ls *ls)
326 {
327 	for (;;) {
328 		spin_lock(&lslist_lock);
329 		if (ls->ls_count == 0) {
330 			list_del(&ls->ls_list);
331 			spin_unlock(&lslist_lock);
332 			return;
333 		}
334 		spin_unlock(&lslist_lock);
335 		ssleep(1);
336 	}
337 }
338 
339 static int threads_start(void)
340 {
341 	int error;
342 
343 	/* Thread which process lock requests for all lockspace's */
344 	error = dlm_astd_start();
345 	if (error) {
346 		log_print("cannot start dlm_astd thread %d", error);
347 		goto fail;
348 	}
349 
350 	error = dlm_scand_start();
351 	if (error) {
352 		log_print("cannot start dlm_scand thread %d", error);
353 		goto astd_fail;
354 	}
355 
356 	/* Thread for sending/receiving messages for all lockspace's */
357 	error = dlm_lowcomms_start();
358 	if (error) {
359 		log_print("cannot start dlm lowcomms %d", error);
360 		goto scand_fail;
361 	}
362 
363 	return 0;
364 
365  scand_fail:
366 	dlm_scand_stop();
367  astd_fail:
368 	dlm_astd_stop();
369  fail:
370 	return error;
371 }
372 
373 static void threads_stop(void)
374 {
375 	dlm_scand_stop();
376 	dlm_lowcomms_stop();
377 	dlm_astd_stop();
378 }
379 
380 static int new_lockspace(char *name, int namelen, void **lockspace,
381 			 uint32_t flags, int lvblen)
382 {
383 	struct dlm_ls *ls;
384 	int i, size, error = -ENOMEM;
385 	int do_unreg = 0;
386 
387 	if (namelen > DLM_LOCKSPACE_LEN)
388 		return -EINVAL;
389 
390 	if (!lvblen || (lvblen % 8))
391 		return -EINVAL;
392 
393 	if (!try_module_get(THIS_MODULE))
394 		return -EINVAL;
395 
396 	ls = dlm_find_lockspace_name(name, namelen);
397 	if (ls) {
398 		*lockspace = ls;
399 		module_put(THIS_MODULE);
400 		return -EEXIST;
401 	}
402 
403 	ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
404 	if (!ls)
405 		goto out;
406 	memcpy(ls->ls_name, name, namelen);
407 	ls->ls_namelen = namelen;
408 	ls->ls_lvblen = lvblen;
409 	ls->ls_count = 0;
410 	ls->ls_flags = 0;
411 
412 	if (flags & DLM_LSFL_TIMEWARN)
413 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
414 
415 	if (flags & DLM_LSFL_FS)
416 		ls->ls_allocation = GFP_NOFS;
417 	else
418 		ls->ls_allocation = GFP_KERNEL;
419 
420 	/* ls_exflags are forced to match among nodes, and we don't
421 	   need to require all nodes to have TIMEWARN or FS set */
422 	ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS));
423 
424 	size = dlm_config.ci_rsbtbl_size;
425 	ls->ls_rsbtbl_size = size;
426 
427 	ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
428 	if (!ls->ls_rsbtbl)
429 		goto out_lsfree;
430 	for (i = 0; i < size; i++) {
431 		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
432 		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
433 		rwlock_init(&ls->ls_rsbtbl[i].lock);
434 	}
435 
436 	size = dlm_config.ci_lkbtbl_size;
437 	ls->ls_lkbtbl_size = size;
438 
439 	ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
440 	if (!ls->ls_lkbtbl)
441 		goto out_rsbfree;
442 	for (i = 0; i < size; i++) {
443 		INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
444 		rwlock_init(&ls->ls_lkbtbl[i].lock);
445 		ls->ls_lkbtbl[i].counter = 1;
446 	}
447 
448 	size = dlm_config.ci_dirtbl_size;
449 	ls->ls_dirtbl_size = size;
450 
451 	ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
452 	if (!ls->ls_dirtbl)
453 		goto out_lkbfree;
454 	for (i = 0; i < size; i++) {
455 		INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
456 		rwlock_init(&ls->ls_dirtbl[i].lock);
457 	}
458 
459 	INIT_LIST_HEAD(&ls->ls_waiters);
460 	mutex_init(&ls->ls_waiters_mutex);
461 	INIT_LIST_HEAD(&ls->ls_orphans);
462 	mutex_init(&ls->ls_orphans_mutex);
463 	INIT_LIST_HEAD(&ls->ls_timeout);
464 	mutex_init(&ls->ls_timeout_mutex);
465 
466 	INIT_LIST_HEAD(&ls->ls_nodes);
467 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
468 	ls->ls_num_nodes = 0;
469 	ls->ls_low_nodeid = 0;
470 	ls->ls_total_weight = 0;
471 	ls->ls_node_array = NULL;
472 
473 	memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
474 	ls->ls_stub_rsb.res_ls = ls;
475 
476 	ls->ls_debug_rsb_dentry = NULL;
477 	ls->ls_debug_waiters_dentry = NULL;
478 
479 	init_waitqueue_head(&ls->ls_uevent_wait);
480 	ls->ls_uevent_result = 0;
481 	init_completion(&ls->ls_members_done);
482 	ls->ls_members_result = -1;
483 
484 	ls->ls_recoverd_task = NULL;
485 	mutex_init(&ls->ls_recoverd_active);
486 	spin_lock_init(&ls->ls_recover_lock);
487 	spin_lock_init(&ls->ls_rcom_spin);
488 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
489 	ls->ls_recover_status = 0;
490 	ls->ls_recover_seq = 0;
491 	ls->ls_recover_args = NULL;
492 	init_rwsem(&ls->ls_in_recovery);
493 	init_rwsem(&ls->ls_recv_active);
494 	INIT_LIST_HEAD(&ls->ls_requestqueue);
495 	mutex_init(&ls->ls_requestqueue_mutex);
496 	mutex_init(&ls->ls_clear_proc_locks);
497 
498 	ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
499 	if (!ls->ls_recover_buf)
500 		goto out_dirfree;
501 
502 	INIT_LIST_HEAD(&ls->ls_recover_list);
503 	spin_lock_init(&ls->ls_recover_list_lock);
504 	ls->ls_recover_list_count = 0;
505 	ls->ls_local_handle = ls;
506 	init_waitqueue_head(&ls->ls_wait_general);
507 	INIT_LIST_HEAD(&ls->ls_root_list);
508 	init_rwsem(&ls->ls_root_sem);
509 
510 	down_write(&ls->ls_in_recovery);
511 
512 	spin_lock(&lslist_lock);
513 	list_add(&ls->ls_list, &lslist);
514 	spin_unlock(&lslist_lock);
515 
516 	/* needs to find ls in lslist */
517 	error = dlm_recoverd_start(ls);
518 	if (error) {
519 		log_error(ls, "can't start dlm_recoverd %d", error);
520 		goto out_delist;
521 	}
522 
523 	ls->ls_kobj.kset = dlm_kset;
524 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
525 				     "%s", ls->ls_name);
526 	if (error)
527 		goto out_stop;
528 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
529 
530 	/* let kobject handle freeing of ls if there's an error */
531 	do_unreg = 1;
532 
533 	/* This uevent triggers dlm_controld in userspace to add us to the
534 	   group of nodes that are members of this lockspace (managed by the
535 	   cluster infrastructure.)  Once it's done that, it tells us who the
536 	   current lockspace members are (via configfs) and then tells the
537 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
538 
539 	error = do_uevent(ls, 1);
540 	if (error)
541 		goto out_stop;
542 
543 	wait_for_completion(&ls->ls_members_done);
544 	error = ls->ls_members_result;
545 	if (error)
546 		goto out_members;
547 
548 	dlm_create_debug_file(ls);
549 
550 	log_debug(ls, "join complete");
551 
552 	*lockspace = ls;
553 	return 0;
554 
555  out_members:
556 	do_uevent(ls, 0);
557 	dlm_clear_members(ls);
558 	kfree(ls->ls_node_array);
559  out_stop:
560 	dlm_recoverd_stop(ls);
561  out_delist:
562 	spin_lock(&lslist_lock);
563 	list_del(&ls->ls_list);
564 	spin_unlock(&lslist_lock);
565 	kfree(ls->ls_recover_buf);
566  out_dirfree:
567 	kfree(ls->ls_dirtbl);
568  out_lkbfree:
569 	kfree(ls->ls_lkbtbl);
570  out_rsbfree:
571 	kfree(ls->ls_rsbtbl);
572  out_lsfree:
573 	if (do_unreg)
574 		kobject_put(&ls->ls_kobj);
575 	else
576 		kfree(ls);
577  out:
578 	module_put(THIS_MODULE);
579 	return error;
580 }
581 
582 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
583 		      uint32_t flags, int lvblen)
584 {
585 	int error = 0;
586 
587 	mutex_lock(&ls_lock);
588 	if (!ls_count)
589 		error = threads_start();
590 	if (error)
591 		goto out;
592 
593 	error = new_lockspace(name, namelen, lockspace, flags, lvblen);
594 	if (!error)
595 		ls_count++;
596 	else if (!ls_count)
597 		threads_stop();
598  out:
599 	mutex_unlock(&ls_lock);
600 	return error;
601 }
602 
603 /* Return 1 if the lockspace still has active remote locks,
604  *        2 if the lockspace still has active local locks.
605  */
606 static int lockspace_busy(struct dlm_ls *ls)
607 {
608 	int i, lkb_found = 0;
609 	struct dlm_lkb *lkb;
610 
611 	/* NOTE: We check the lockidtbl here rather than the resource table.
612 	   This is because there may be LKBs queued as ASTs that have been
613 	   unlinked from their RSBs and are pending deletion once the AST has
614 	   been delivered */
615 
616 	for (i = 0; i < ls->ls_lkbtbl_size; i++) {
617 		read_lock(&ls->ls_lkbtbl[i].lock);
618 		if (!list_empty(&ls->ls_lkbtbl[i].list)) {
619 			lkb_found = 1;
620 			list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
621 					    lkb_idtbl_list) {
622 				if (!lkb->lkb_nodeid) {
623 					read_unlock(&ls->ls_lkbtbl[i].lock);
624 					return 2;
625 				}
626 			}
627 		}
628 		read_unlock(&ls->ls_lkbtbl[i].lock);
629 	}
630 	return lkb_found;
631 }
632 
633 static int release_lockspace(struct dlm_ls *ls, int force)
634 {
635 	struct dlm_lkb *lkb;
636 	struct dlm_rsb *rsb;
637 	struct list_head *head;
638 	int i;
639 	int busy = lockspace_busy(ls);
640 
641 	if (busy > force)
642 		return -EBUSY;
643 
644 	if (force < 3)
645 		do_uevent(ls, 0);
646 
647 	dlm_recoverd_stop(ls);
648 
649 	remove_lockspace(ls);
650 
651 	dlm_delete_debug_file(ls);
652 
653 	dlm_astd_suspend();
654 
655 	kfree(ls->ls_recover_buf);
656 
657 	/*
658 	 * Free direntry structs.
659 	 */
660 
661 	dlm_dir_clear(ls);
662 	kfree(ls->ls_dirtbl);
663 
664 	/*
665 	 * Free all lkb's on lkbtbl[] lists.
666 	 */
667 
668 	for (i = 0; i < ls->ls_lkbtbl_size; i++) {
669 		head = &ls->ls_lkbtbl[i].list;
670 		while (!list_empty(head)) {
671 			lkb = list_entry(head->next, struct dlm_lkb,
672 					 lkb_idtbl_list);
673 
674 			list_del(&lkb->lkb_idtbl_list);
675 
676 			dlm_del_ast(lkb);
677 
678 			if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
679 				dlm_free_lvb(lkb->lkb_lvbptr);
680 
681 			dlm_free_lkb(lkb);
682 		}
683 	}
684 	dlm_astd_resume();
685 
686 	kfree(ls->ls_lkbtbl);
687 
688 	/*
689 	 * Free all rsb's on rsbtbl[] lists
690 	 */
691 
692 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
693 		head = &ls->ls_rsbtbl[i].list;
694 		while (!list_empty(head)) {
695 			rsb = list_entry(head->next, struct dlm_rsb,
696 					 res_hashchain);
697 
698 			list_del(&rsb->res_hashchain);
699 			dlm_free_rsb(rsb);
700 		}
701 
702 		head = &ls->ls_rsbtbl[i].toss;
703 		while (!list_empty(head)) {
704 			rsb = list_entry(head->next, struct dlm_rsb,
705 					 res_hashchain);
706 			list_del(&rsb->res_hashchain);
707 			dlm_free_rsb(rsb);
708 		}
709 	}
710 
711 	kfree(ls->ls_rsbtbl);
712 
713 	/*
714 	 * Free structures on any other lists
715 	 */
716 
717 	dlm_purge_requestqueue(ls);
718 	kfree(ls->ls_recover_args);
719 	dlm_clear_free_entries(ls);
720 	dlm_clear_members(ls);
721 	dlm_clear_members_gone(ls);
722 	kfree(ls->ls_node_array);
723 	kobject_put(&ls->ls_kobj);
724 	/* The ls structure will be freed when the kobject is done with */
725 
726 	mutex_lock(&ls_lock);
727 	ls_count--;
728 	if (!ls_count)
729 		threads_stop();
730 	mutex_unlock(&ls_lock);
731 
732 	module_put(THIS_MODULE);
733 	return 0;
734 }
735 
736 /*
737  * Called when a system has released all its locks and is not going to use the
738  * lockspace any longer.  We free everything we're managing for this lockspace.
739  * Remaining nodes will go through the recovery process as if we'd died.  The
740  * lockspace must continue to function as usual, participating in recoveries,
741  * until this returns.
742  *
743  * Force has 4 possible values:
744  * 0 - don't destroy locksapce if it has any LKBs
745  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
746  * 2 - destroy lockspace regardless of LKBs
747  * 3 - destroy lockspace as part of a forced shutdown
748  */
749 
750 int dlm_release_lockspace(void *lockspace, int force)
751 {
752 	struct dlm_ls *ls;
753 
754 	ls = dlm_find_lockspace_local(lockspace);
755 	if (!ls)
756 		return -EINVAL;
757 	dlm_put_lockspace(ls);
758 	return release_lockspace(ls, force);
759 }
760 
761