xref: /openbmc/linux/fs/dlm/user.c (revision 93696d8f)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright (C) 2006-2010 Red Hat, Inc.  All rights reserved.
4  */
5 
6 #include <linux/miscdevice.h>
7 #include <linux/init.h>
8 #include <linux/wait.h>
9 #include <linux/file.h>
10 #include <linux/fs.h>
11 #include <linux/poll.h>
12 #include <linux/signal.h>
13 #include <linux/spinlock.h>
14 #include <linux/dlm.h>
15 #include <linux/dlm_device.h>
16 #include <linux/slab.h>
17 #include <linux/sched/signal.h>
18 
19 #include <trace/events/dlm.h>
20 
21 #include "dlm_internal.h"
22 #include "lockspace.h"
23 #include "lock.h"
24 #include "user.h"
25 #include "ast.h"
26 #include "config.h"
27 #include "memory.h"
28 
29 static const char name_prefix[] = "dlm";
30 static const struct file_operations device_fops;
31 static atomic_t dlm_monitor_opened;
32 static int dlm_monitor_unused = 1;
33 
34 #ifdef CONFIG_COMPAT
35 
36 struct dlm_lock_params32 {
37 	__u8 mode;
38 	__u8 namelen;
39 	__u16 unused;
40 	__u32 flags;
41 	__u32 lkid;
42 	__u32 parent;
43 	__u64 xid;
44 	__u64 timeout;
45 	__u32 castparam;
46 	__u32 castaddr;
47 	__u32 bastparam;
48 	__u32 bastaddr;
49 	__u32 lksb;
50 	char lvb[DLM_USER_LVB_LEN];
51 	char name[];
52 };
53 
54 struct dlm_write_request32 {
55 	__u32 version[3];
56 	__u8 cmd;
57 	__u8 is64bit;
58 	__u8 unused[2];
59 
60 	union  {
61 		struct dlm_lock_params32 lock;
62 		struct dlm_lspace_params lspace;
63 		struct dlm_purge_params purge;
64 	} i;
65 };
66 
67 struct dlm_lksb32 {
68 	__u32 sb_status;
69 	__u32 sb_lkid;
70 	__u8 sb_flags;
71 	__u32 sb_lvbptr;
72 };
73 
74 struct dlm_lock_result32 {
75 	__u32 version[3];
76 	__u32 length;
77 	__u32 user_astaddr;
78 	__u32 user_astparam;
79 	__u32 user_lksb;
80 	struct dlm_lksb32 lksb;
81 	__u8 bast_mode;
82 	__u8 unused[3];
83 	/* Offsets may be zero if no data is present */
84 	__u32 lvb_offset;
85 };
86 
87 static void compat_input(struct dlm_write_request *kb,
88 			 struct dlm_write_request32 *kb32,
89 			 int namelen)
90 {
91 	kb->version[0] = kb32->version[0];
92 	kb->version[1] = kb32->version[1];
93 	kb->version[2] = kb32->version[2];
94 
95 	kb->cmd = kb32->cmd;
96 	kb->is64bit = kb32->is64bit;
97 	if (kb->cmd == DLM_USER_CREATE_LOCKSPACE ||
98 	    kb->cmd == DLM_USER_REMOVE_LOCKSPACE) {
99 		kb->i.lspace.flags = kb32->i.lspace.flags;
100 		kb->i.lspace.minor = kb32->i.lspace.minor;
101 		memcpy(kb->i.lspace.name, kb32->i.lspace.name, namelen);
102 	} else if (kb->cmd == DLM_USER_PURGE) {
103 		kb->i.purge.nodeid = kb32->i.purge.nodeid;
104 		kb->i.purge.pid = kb32->i.purge.pid;
105 	} else {
106 		kb->i.lock.mode = kb32->i.lock.mode;
107 		kb->i.lock.namelen = kb32->i.lock.namelen;
108 		kb->i.lock.flags = kb32->i.lock.flags;
109 		kb->i.lock.lkid = kb32->i.lock.lkid;
110 		kb->i.lock.parent = kb32->i.lock.parent;
111 		kb->i.lock.xid = kb32->i.lock.xid;
112 		kb->i.lock.timeout = kb32->i.lock.timeout;
113 		kb->i.lock.castparam = (__user void *)(long)kb32->i.lock.castparam;
114 		kb->i.lock.castaddr = (__user void *)(long)kb32->i.lock.castaddr;
115 		kb->i.lock.bastparam = (__user void *)(long)kb32->i.lock.bastparam;
116 		kb->i.lock.bastaddr = (__user void *)(long)kb32->i.lock.bastaddr;
117 		kb->i.lock.lksb = (__user void *)(long)kb32->i.lock.lksb;
118 		memcpy(kb->i.lock.lvb, kb32->i.lock.lvb, DLM_USER_LVB_LEN);
119 		memcpy(kb->i.lock.name, kb32->i.lock.name, namelen);
120 	}
121 }
122 
123 static void compat_output(struct dlm_lock_result *res,
124 			  struct dlm_lock_result32 *res32)
125 {
126 	memset(res32, 0, sizeof(*res32));
127 
128 	res32->version[0] = res->version[0];
129 	res32->version[1] = res->version[1];
130 	res32->version[2] = res->version[2];
131 
132 	res32->user_astaddr = (__u32)(__force long)res->user_astaddr;
133 	res32->user_astparam = (__u32)(__force long)res->user_astparam;
134 	res32->user_lksb = (__u32)(__force long)res->user_lksb;
135 	res32->bast_mode = res->bast_mode;
136 
137 	res32->lvb_offset = res->lvb_offset;
138 	res32->length = res->length;
139 
140 	res32->lksb.sb_status = res->lksb.sb_status;
141 	res32->lksb.sb_flags = res->lksb.sb_flags;
142 	res32->lksb.sb_lkid = res->lksb.sb_lkid;
143 	res32->lksb.sb_lvbptr = (__u32)(long)res->lksb.sb_lvbptr;
144 }
145 #endif
146 
147 /* should held proc->asts_spin lock */
148 void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb)
149 {
150 	struct dlm_callback *cb, *safe;
151 
152 	list_for_each_entry_safe(cb, safe, &lkb->lkb_callbacks, list) {
153 		list_del(&cb->list);
154 		kref_put(&cb->ref, dlm_release_callback);
155 	}
156 
157 	clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
158 
159 	/* invalidate */
160 	dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL);
161 	dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL);
162 	lkb->lkb_last_bast_mode = -1;
163 }
164 
165 /* Figure out if this lock is at the end of its life and no longer
166    available for the application to use.  The lkb still exists until
167    the final ast is read.  A lock becomes EOL in three situations:
168      1. a noqueue request fails with EAGAIN
169      2. an unlock completes with EUNLOCK
170      3. a cancel of a waiting request completes with ECANCEL/EDEADLK
171    An EOL lock needs to be removed from the process's list of locks.
172    And we can't allow any new operation on an EOL lock.  This is
173    not related to the lifetime of the lkb struct which is managed
174    entirely by refcount. */
175 
176 static int lkb_is_endoflife(int mode, int status)
177 {
178 	switch (status) {
179 	case -DLM_EUNLOCK:
180 		return 1;
181 	case -DLM_ECANCEL:
182 	case -ETIMEDOUT:
183 	case -EDEADLK:
184 	case -EAGAIN:
185 		if (mode == DLM_LOCK_IV)
186 			return 1;
187 		break;
188 	}
189 	return 0;
190 }
191 
192 /* we could possibly check if the cancel of an orphan has resulted in the lkb
193    being removed and then remove that lkb from the orphans list and free it */
194 
195 void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
196 		      int status, uint32_t sbflags)
197 {
198 	struct dlm_ls *ls;
199 	struct dlm_user_args *ua;
200 	struct dlm_user_proc *proc;
201 	int rv;
202 
203 	if (test_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags) ||
204 	    test_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags))
205 		return;
206 
207 	ls = lkb->lkb_resource->res_ls;
208 	spin_lock(&ls->ls_clear_proc_locks);
209 
210 	/* If ORPHAN/DEAD flag is set, it means the process is dead so an ast
211 	   can't be delivered.  For ORPHAN's, dlm_clear_proc_locks() freed
212 	   lkb->ua so we can't try to use it.  This second check is necessary
213 	   for cases where a completion ast is received for an operation that
214 	   began before clear_proc_locks did its cancel/unlock. */
215 
216 	if (test_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags) ||
217 	    test_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags))
218 		goto out;
219 
220 	DLM_ASSERT(lkb->lkb_ua, dlm_print_lkb(lkb););
221 	ua = lkb->lkb_ua;
222 	proc = ua->proc;
223 
224 	if ((flags & DLM_CB_BAST) && ua->bastaddr == NULL)
225 		goto out;
226 
227 	if ((flags & DLM_CB_CAST) && lkb_is_endoflife(mode, status))
228 		set_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags);
229 
230 	spin_lock(&proc->asts_spin);
231 
232 	rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
233 	switch (rv) {
234 	case DLM_ENQUEUE_CALLBACK_FAILURE:
235 		spin_unlock(&proc->asts_spin);
236 		WARN_ON_ONCE(1);
237 		goto out;
238 	case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
239 		kref_get(&lkb->lkb_ref);
240 		list_add_tail(&lkb->lkb_cb_list, &proc->asts);
241 		wake_up_interruptible(&proc->wait);
242 		break;
243 	case DLM_ENQUEUE_CALLBACK_SUCCESS:
244 		break;
245 	default:
246 		WARN_ON_ONCE(1);
247 		break;
248 	}
249 	spin_unlock(&proc->asts_spin);
250 
251 	if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
252 		/* N.B. spin_lock locks_spin, not asts_spin */
253 		spin_lock(&proc->locks_spin);
254 		if (!list_empty(&lkb->lkb_ownqueue)) {
255 			list_del_init(&lkb->lkb_ownqueue);
256 			dlm_put_lkb(lkb);
257 		}
258 		spin_unlock(&proc->locks_spin);
259 	}
260  out:
261 	spin_unlock(&ls->ls_clear_proc_locks);
262 }
263 
264 static int device_user_lock(struct dlm_user_proc *proc,
265 			    struct dlm_lock_params *params)
266 {
267 	struct dlm_ls *ls;
268 	struct dlm_user_args *ua;
269 	uint32_t lkid;
270 	int error = -ENOMEM;
271 
272 	ls = dlm_find_lockspace_local(proc->lockspace);
273 	if (!ls)
274 		return -ENOENT;
275 
276 	if (!params->castaddr || !params->lksb) {
277 		error = -EINVAL;
278 		goto out;
279 	}
280 
281 	ua = kzalloc(sizeof(struct dlm_user_args), GFP_NOFS);
282 	if (!ua)
283 		goto out;
284 	ua->proc = proc;
285 	ua->user_lksb = params->lksb;
286 	ua->castparam = params->castparam;
287 	ua->castaddr = params->castaddr;
288 	ua->bastparam = params->bastparam;
289 	ua->bastaddr = params->bastaddr;
290 	ua->xid = params->xid;
291 
292 	if (params->flags & DLM_LKF_CONVERT) {
293 		error = dlm_user_convert(ls, ua,
294 					 params->mode, params->flags,
295 					 params->lkid, params->lvb);
296 	} else if (params->flags & DLM_LKF_ORPHAN) {
297 		error = dlm_user_adopt_orphan(ls, ua,
298 					 params->mode, params->flags,
299 					 params->name, params->namelen,
300 					 &lkid);
301 		if (!error)
302 			error = lkid;
303 	} else {
304 		error = dlm_user_request(ls, ua,
305 					 params->mode, params->flags,
306 					 params->name, params->namelen);
307 		if (!error)
308 			error = ua->lksb.sb_lkid;
309 	}
310  out:
311 	dlm_put_lockspace(ls);
312 	return error;
313 }
314 
315 static int device_user_unlock(struct dlm_user_proc *proc,
316 			      struct dlm_lock_params *params)
317 {
318 	struct dlm_ls *ls;
319 	struct dlm_user_args *ua;
320 	int error = -ENOMEM;
321 
322 	ls = dlm_find_lockspace_local(proc->lockspace);
323 	if (!ls)
324 		return -ENOENT;
325 
326 	ua = kzalloc(sizeof(struct dlm_user_args), GFP_NOFS);
327 	if (!ua)
328 		goto out;
329 	ua->proc = proc;
330 	ua->user_lksb = params->lksb;
331 	ua->castparam = params->castparam;
332 	ua->castaddr = params->castaddr;
333 
334 	if (params->flags & DLM_LKF_CANCEL)
335 		error = dlm_user_cancel(ls, ua, params->flags, params->lkid);
336 	else
337 		error = dlm_user_unlock(ls, ua, params->flags, params->lkid,
338 					params->lvb);
339  out:
340 	dlm_put_lockspace(ls);
341 	return error;
342 }
343 
344 static int device_user_deadlock(struct dlm_user_proc *proc,
345 				struct dlm_lock_params *params)
346 {
347 	struct dlm_ls *ls;
348 	int error;
349 
350 	ls = dlm_find_lockspace_local(proc->lockspace);
351 	if (!ls)
352 		return -ENOENT;
353 
354 	error = dlm_user_deadlock(ls, params->flags, params->lkid);
355 
356 	dlm_put_lockspace(ls);
357 	return error;
358 }
359 
360 static int dlm_device_register(struct dlm_ls *ls, char *name)
361 {
362 	int error, len;
363 
364 	/* The device is already registered.  This happens when the
365 	   lockspace is created multiple times from userspace. */
366 	if (ls->ls_device.name)
367 		return 0;
368 
369 	error = -ENOMEM;
370 	len = strlen(name) + strlen(name_prefix) + 2;
371 	ls->ls_device.name = kzalloc(len, GFP_NOFS);
372 	if (!ls->ls_device.name)
373 		goto fail;
374 
375 	snprintf((char *)ls->ls_device.name, len, "%s_%s", name_prefix,
376 		 name);
377 	ls->ls_device.fops = &device_fops;
378 	ls->ls_device.minor = MISC_DYNAMIC_MINOR;
379 
380 	error = misc_register(&ls->ls_device);
381 	if (error) {
382 		kfree(ls->ls_device.name);
383 		/* this has to be set to NULL
384 		 * to avoid a double-free in dlm_device_deregister
385 		 */
386 		ls->ls_device.name = NULL;
387 	}
388 fail:
389 	return error;
390 }
391 
392 int dlm_device_deregister(struct dlm_ls *ls)
393 {
394 	/* The device is not registered.  This happens when the lockspace
395 	   was never used from userspace, or when device_create_lockspace()
396 	   calls dlm_release_lockspace() after the register fails. */
397 	if (!ls->ls_device.name)
398 		return 0;
399 
400 	misc_deregister(&ls->ls_device);
401 	kfree(ls->ls_device.name);
402 	return 0;
403 }
404 
405 static int device_user_purge(struct dlm_user_proc *proc,
406 			     struct dlm_purge_params *params)
407 {
408 	struct dlm_ls *ls;
409 	int error;
410 
411 	ls = dlm_find_lockspace_local(proc->lockspace);
412 	if (!ls)
413 		return -ENOENT;
414 
415 	error = dlm_user_purge(ls, proc, params->nodeid, params->pid);
416 
417 	dlm_put_lockspace(ls);
418 	return error;
419 }
420 
421 static int device_create_lockspace(struct dlm_lspace_params *params)
422 {
423 	dlm_lockspace_t *lockspace;
424 	struct dlm_ls *ls;
425 	int error;
426 
427 	if (!capable(CAP_SYS_ADMIN))
428 		return -EPERM;
429 
430 	error = dlm_new_user_lockspace(params->name, dlm_config.ci_cluster_name,
431 				       params->flags, DLM_USER_LVB_LEN, NULL,
432 				       NULL, NULL, &lockspace);
433 	if (error)
434 		return error;
435 
436 	ls = dlm_find_lockspace_local(lockspace);
437 	if (!ls)
438 		return -ENOENT;
439 
440 	error = dlm_device_register(ls, params->name);
441 	dlm_put_lockspace(ls);
442 
443 	if (error)
444 		dlm_release_lockspace(lockspace, 0);
445 	else
446 		error = ls->ls_device.minor;
447 
448 	return error;
449 }
450 
451 static int device_remove_lockspace(struct dlm_lspace_params *params)
452 {
453 	dlm_lockspace_t *lockspace;
454 	struct dlm_ls *ls;
455 	int error, force = 0;
456 
457 	if (!capable(CAP_SYS_ADMIN))
458 		return -EPERM;
459 
460 	ls = dlm_find_lockspace_device(params->minor);
461 	if (!ls)
462 		return -ENOENT;
463 
464 	if (params->flags & DLM_USER_LSFLG_FORCEFREE)
465 		force = 2;
466 
467 	lockspace = ls->ls_local_handle;
468 	dlm_put_lockspace(ls);
469 
470 	/* The final dlm_release_lockspace waits for references to go to
471 	   zero, so all processes will need to close their device for the
472 	   ls before the release will proceed.  release also calls the
473 	   device_deregister above.  Converting a positive return value
474 	   from release to zero means that userspace won't know when its
475 	   release was the final one, but it shouldn't need to know. */
476 
477 	error = dlm_release_lockspace(lockspace, force);
478 	if (error > 0)
479 		error = 0;
480 	return error;
481 }
482 
483 /* Check the user's version matches ours */
484 static int check_version(struct dlm_write_request *req)
485 {
486 	if (req->version[0] != DLM_DEVICE_VERSION_MAJOR ||
487 	    (req->version[0] == DLM_DEVICE_VERSION_MAJOR &&
488 	     req->version[1] > DLM_DEVICE_VERSION_MINOR)) {
489 
490 		printk(KERN_DEBUG "dlm: process %s (%d) version mismatch "
491 		       "user (%d.%d.%d) kernel (%d.%d.%d)\n",
492 		       current->comm,
493 		       task_pid_nr(current),
494 		       req->version[0],
495 		       req->version[1],
496 		       req->version[2],
497 		       DLM_DEVICE_VERSION_MAJOR,
498 		       DLM_DEVICE_VERSION_MINOR,
499 		       DLM_DEVICE_VERSION_PATCH);
500 		return -EINVAL;
501 	}
502 	return 0;
503 }
504 
505 /*
506  * device_write
507  *
508  *   device_user_lock
509  *     dlm_user_request -> request_lock
510  *     dlm_user_convert -> convert_lock
511  *
512  *   device_user_unlock
513  *     dlm_user_unlock -> unlock_lock
514  *     dlm_user_cancel -> cancel_lock
515  *
516  *   device_create_lockspace
517  *     dlm_new_lockspace
518  *
519  *   device_remove_lockspace
520  *     dlm_release_lockspace
521  */
522 
523 /* a write to a lockspace device is a lock or unlock request, a write
524    to the control device is to create/remove a lockspace */
525 
526 static ssize_t device_write(struct file *file, const char __user *buf,
527 			    size_t count, loff_t *ppos)
528 {
529 	struct dlm_user_proc *proc = file->private_data;
530 	struct dlm_write_request *kbuf;
531 	int error;
532 
533 #ifdef CONFIG_COMPAT
534 	if (count < sizeof(struct dlm_write_request32))
535 #else
536 	if (count < sizeof(struct dlm_write_request))
537 #endif
538 		return -EINVAL;
539 
540 	/*
541 	 * can't compare against COMPAT/dlm_write_request32 because
542 	 * we don't yet know if is64bit is zero
543 	 */
544 	if (count > sizeof(struct dlm_write_request) + DLM_RESNAME_MAXLEN)
545 		return -EINVAL;
546 
547 	kbuf = memdup_user_nul(buf, count);
548 	if (IS_ERR(kbuf))
549 		return PTR_ERR(kbuf);
550 
551 	if (check_version(kbuf)) {
552 		error = -EBADE;
553 		goto out_free;
554 	}
555 
556 #ifdef CONFIG_COMPAT
557 	if (!kbuf->is64bit) {
558 		struct dlm_write_request32 *k32buf;
559 		int namelen = 0;
560 
561 		if (count > sizeof(struct dlm_write_request32))
562 			namelen = count - sizeof(struct dlm_write_request32);
563 
564 		k32buf = (struct dlm_write_request32 *)kbuf;
565 
566 		/* add 1 after namelen so that the name string is terminated */
567 		kbuf = kzalloc(sizeof(struct dlm_write_request) + namelen + 1,
568 			       GFP_NOFS);
569 		if (!kbuf) {
570 			kfree(k32buf);
571 			return -ENOMEM;
572 		}
573 
574 		if (proc)
575 			set_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags);
576 
577 		compat_input(kbuf, k32buf, namelen);
578 		kfree(k32buf);
579 	}
580 #endif
581 
582 	/* do we really need this? can a write happen after a close? */
583 	if ((kbuf->cmd == DLM_USER_LOCK || kbuf->cmd == DLM_USER_UNLOCK) &&
584 	    (proc && test_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags))) {
585 		error = -EINVAL;
586 		goto out_free;
587 	}
588 
589 	error = -EINVAL;
590 
591 	switch (kbuf->cmd)
592 	{
593 	case DLM_USER_LOCK:
594 		if (!proc) {
595 			log_print("no locking on control device");
596 			goto out_free;
597 		}
598 		error = device_user_lock(proc, &kbuf->i.lock);
599 		break;
600 
601 	case DLM_USER_UNLOCK:
602 		if (!proc) {
603 			log_print("no locking on control device");
604 			goto out_free;
605 		}
606 		error = device_user_unlock(proc, &kbuf->i.lock);
607 		break;
608 
609 	case DLM_USER_DEADLOCK:
610 		if (!proc) {
611 			log_print("no locking on control device");
612 			goto out_free;
613 		}
614 		error = device_user_deadlock(proc, &kbuf->i.lock);
615 		break;
616 
617 	case DLM_USER_CREATE_LOCKSPACE:
618 		if (proc) {
619 			log_print("create/remove only on control device");
620 			goto out_free;
621 		}
622 		error = device_create_lockspace(&kbuf->i.lspace);
623 		break;
624 
625 	case DLM_USER_REMOVE_LOCKSPACE:
626 		if (proc) {
627 			log_print("create/remove only on control device");
628 			goto out_free;
629 		}
630 		error = device_remove_lockspace(&kbuf->i.lspace);
631 		break;
632 
633 	case DLM_USER_PURGE:
634 		if (!proc) {
635 			log_print("no locking on control device");
636 			goto out_free;
637 		}
638 		error = device_user_purge(proc, &kbuf->i.purge);
639 		break;
640 
641 	default:
642 		log_print("Unknown command passed to DLM device : %d\n",
643 			  kbuf->cmd);
644 	}
645 
646  out_free:
647 	kfree(kbuf);
648 	return error;
649 }
650 
651 /* Every process that opens the lockspace device has its own "proc" structure
652    hanging off the open file that's used to keep track of locks owned by the
653    process and asts that need to be delivered to the process. */
654 
655 static int device_open(struct inode *inode, struct file *file)
656 {
657 	struct dlm_user_proc *proc;
658 	struct dlm_ls *ls;
659 
660 	ls = dlm_find_lockspace_device(iminor(inode));
661 	if (!ls)
662 		return -ENOENT;
663 
664 	proc = kzalloc(sizeof(struct dlm_user_proc), GFP_NOFS);
665 	if (!proc) {
666 		dlm_put_lockspace(ls);
667 		return -ENOMEM;
668 	}
669 
670 	proc->lockspace = ls->ls_local_handle;
671 	INIT_LIST_HEAD(&proc->asts);
672 	INIT_LIST_HEAD(&proc->locks);
673 	INIT_LIST_HEAD(&proc->unlocking);
674 	spin_lock_init(&proc->asts_spin);
675 	spin_lock_init(&proc->locks_spin);
676 	init_waitqueue_head(&proc->wait);
677 	file->private_data = proc;
678 
679 	return 0;
680 }
681 
682 static int device_close(struct inode *inode, struct file *file)
683 {
684 	struct dlm_user_proc *proc = file->private_data;
685 	struct dlm_ls *ls;
686 
687 	ls = dlm_find_lockspace_local(proc->lockspace);
688 	if (!ls)
689 		return -ENOENT;
690 
691 	set_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags);
692 
693 	dlm_clear_proc_locks(ls, proc);
694 
695 	/* at this point no more lkb's should exist for this lockspace,
696 	   so there's no chance of dlm_user_add_ast() being called and
697 	   looking for lkb->ua->proc */
698 
699 	kfree(proc);
700 	file->private_data = NULL;
701 
702 	dlm_put_lockspace(ls);
703 	dlm_put_lockspace(ls);  /* for the find in device_open() */
704 
705 	/* FIXME: AUTOFREE: if this ls is no longer used do
706 	   device_remove_lockspace() */
707 
708 	return 0;
709 }
710 
711 static int copy_result_to_user(struct dlm_user_args *ua, int compat,
712 			       uint32_t flags, int mode, int copy_lvb,
713 			       char __user *buf, size_t count)
714 {
715 #ifdef CONFIG_COMPAT
716 	struct dlm_lock_result32 result32;
717 #endif
718 	struct dlm_lock_result result;
719 	void *resultptr;
720 	int error=0;
721 	int len;
722 	int struct_len;
723 
724 	memset(&result, 0, sizeof(struct dlm_lock_result));
725 	result.version[0] = DLM_DEVICE_VERSION_MAJOR;
726 	result.version[1] = DLM_DEVICE_VERSION_MINOR;
727 	result.version[2] = DLM_DEVICE_VERSION_PATCH;
728 	memcpy(&result.lksb, &ua->lksb, offsetof(struct dlm_lksb, sb_lvbptr));
729 	result.user_lksb = ua->user_lksb;
730 
731 	/* FIXME: dlm1 provides for the user's bastparam/addr to not be updated
732 	   in a conversion unless the conversion is successful.  See code
733 	   in dlm_user_convert() for updating ua from ua_tmp.  OpenVMS, though,
734 	   notes that a new blocking AST address and parameter are set even if
735 	   the conversion fails, so maybe we should just do that. */
736 
737 	if (flags & DLM_CB_BAST) {
738 		result.user_astaddr = ua->bastaddr;
739 		result.user_astparam = ua->bastparam;
740 		result.bast_mode = mode;
741 	} else {
742 		result.user_astaddr = ua->castaddr;
743 		result.user_astparam = ua->castparam;
744 	}
745 
746 #ifdef CONFIG_COMPAT
747 	if (compat)
748 		len = sizeof(struct dlm_lock_result32);
749 	else
750 #endif
751 		len = sizeof(struct dlm_lock_result);
752 	struct_len = len;
753 
754 	/* copy lvb to userspace if there is one, it's been updated, and
755 	   the user buffer has space for it */
756 
757 	if (copy_lvb && ua->lksb.sb_lvbptr && count >= len + DLM_USER_LVB_LEN) {
758 		if (copy_to_user(buf+len, ua->lksb.sb_lvbptr,
759 				 DLM_USER_LVB_LEN)) {
760 			error = -EFAULT;
761 			goto out;
762 		}
763 
764 		result.lvb_offset = len;
765 		len += DLM_USER_LVB_LEN;
766 	}
767 
768 	result.length = len;
769 	resultptr = &result;
770 #ifdef CONFIG_COMPAT
771 	if (compat) {
772 		compat_output(&result, &result32);
773 		resultptr = &result32;
774 	}
775 #endif
776 
777 	if (copy_to_user(buf, resultptr, struct_len))
778 		error = -EFAULT;
779 	else
780 		error = len;
781  out:
782 	return error;
783 }
784 
785 static int copy_version_to_user(char __user *buf, size_t count)
786 {
787 	struct dlm_device_version ver;
788 
789 	memset(&ver, 0, sizeof(struct dlm_device_version));
790 	ver.version[0] = DLM_DEVICE_VERSION_MAJOR;
791 	ver.version[1] = DLM_DEVICE_VERSION_MINOR;
792 	ver.version[2] = DLM_DEVICE_VERSION_PATCH;
793 
794 	if (copy_to_user(buf, &ver, sizeof(struct dlm_device_version)))
795 		return -EFAULT;
796 	return sizeof(struct dlm_device_version);
797 }
798 
799 /* a read returns a single ast described in a struct dlm_lock_result */
800 
801 static ssize_t device_read(struct file *file, char __user *buf, size_t count,
802 			   loff_t *ppos)
803 {
804 	struct dlm_user_proc *proc = file->private_data;
805 	struct dlm_lkb *lkb;
806 	DECLARE_WAITQUEUE(wait, current);
807 	struct dlm_callback *cb;
808 	int rv, ret;
809 
810 	if (count == sizeof(struct dlm_device_version)) {
811 		rv = copy_version_to_user(buf, count);
812 		return rv;
813 	}
814 
815 	if (!proc) {
816 		log_print("non-version read from control device %zu", count);
817 		return -EINVAL;
818 	}
819 
820 #ifdef CONFIG_COMPAT
821 	if (count < sizeof(struct dlm_lock_result32))
822 #else
823 	if (count < sizeof(struct dlm_lock_result))
824 #endif
825 		return -EINVAL;
826 
827  try_another:
828 
829 	/* do we really need this? can a read happen after a close? */
830 	if (test_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags))
831 		return -EINVAL;
832 
833 	spin_lock(&proc->asts_spin);
834 	if (list_empty(&proc->asts)) {
835 		if (file->f_flags & O_NONBLOCK) {
836 			spin_unlock(&proc->asts_spin);
837 			return -EAGAIN;
838 		}
839 
840 		add_wait_queue(&proc->wait, &wait);
841 
842 	repeat:
843 		set_current_state(TASK_INTERRUPTIBLE);
844 		if (list_empty(&proc->asts) && !signal_pending(current)) {
845 			spin_unlock(&proc->asts_spin);
846 			schedule();
847 			spin_lock(&proc->asts_spin);
848 			goto repeat;
849 		}
850 		set_current_state(TASK_RUNNING);
851 		remove_wait_queue(&proc->wait, &wait);
852 
853 		if (signal_pending(current)) {
854 			spin_unlock(&proc->asts_spin);
855 			return -ERESTARTSYS;
856 		}
857 	}
858 
859 	/* if we empty lkb_callbacks, we don't want to unlock the spinlock
860 	   without removing lkb_cb_list; so empty lkb_cb_list is always
861 	   consistent with empty lkb_callbacks */
862 
863 	lkb = list_first_entry(&proc->asts, struct dlm_lkb, lkb_cb_list);
864 
865 	rv = dlm_dequeue_lkb_callback(lkb, &cb);
866 	switch (rv) {
867 	case DLM_DEQUEUE_CALLBACK_EMPTY:
868 		/* this shouldn't happen; lkb should have been removed from
869 		 * list when last item was dequeued
870 		 */
871 		log_print("dlm_rem_lkb_callback empty %x", lkb->lkb_id);
872 		list_del_init(&lkb->lkb_cb_list);
873 		spin_unlock(&proc->asts_spin);
874 		/* removes ref for proc->asts, may cause lkb to be freed */
875 		dlm_put_lkb(lkb);
876 		WARN_ON_ONCE(1);
877 		goto try_another;
878 	case DLM_DEQUEUE_CALLBACK_LAST:
879 		list_del_init(&lkb->lkb_cb_list);
880 		clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
881 		break;
882 	case DLM_DEQUEUE_CALLBACK_SUCCESS:
883 		break;
884 	default:
885 		WARN_ON_ONCE(1);
886 		break;
887 	}
888 	spin_unlock(&proc->asts_spin);
889 
890 	if (cb->flags & DLM_CB_BAST) {
891 		trace_dlm_bast(lkb->lkb_resource->res_ls, lkb, cb->mode);
892 	} else if (cb->flags & DLM_CB_CAST) {
893 		lkb->lkb_lksb->sb_status = cb->sb_status;
894 		lkb->lkb_lksb->sb_flags = cb->sb_flags;
895 		trace_dlm_ast(lkb->lkb_resource->res_ls, lkb);
896 	}
897 
898 	ret = copy_result_to_user(lkb->lkb_ua,
899 				  test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags),
900 				  cb->flags, cb->mode, cb->copy_lvb, buf, count);
901 
902 	kref_put(&cb->ref, dlm_release_callback);
903 
904 	/* removes ref for proc->asts, may cause lkb to be freed */
905 	if (rv == DLM_DEQUEUE_CALLBACK_LAST)
906 		dlm_put_lkb(lkb);
907 
908 	return ret;
909 }
910 
911 static __poll_t device_poll(struct file *file, poll_table *wait)
912 {
913 	struct dlm_user_proc *proc = file->private_data;
914 
915 	poll_wait(file, &proc->wait, wait);
916 
917 	spin_lock(&proc->asts_spin);
918 	if (!list_empty(&proc->asts)) {
919 		spin_unlock(&proc->asts_spin);
920 		return EPOLLIN | EPOLLRDNORM;
921 	}
922 	spin_unlock(&proc->asts_spin);
923 	return 0;
924 }
925 
926 int dlm_user_daemon_available(void)
927 {
928 	/* dlm_controld hasn't started (or, has started, but not
929 	   properly populated configfs) */
930 
931 	if (!dlm_our_nodeid())
932 		return 0;
933 
934 	/* This is to deal with versions of dlm_controld that don't
935 	   know about the monitor device.  We assume that if the
936 	   dlm_controld was started (above), but the monitor device
937 	   was never opened, that it's an old version.  dlm_controld
938 	   should open the monitor device before populating configfs. */
939 
940 	if (dlm_monitor_unused)
941 		return 1;
942 
943 	return atomic_read(&dlm_monitor_opened) ? 1 : 0;
944 }
945 
946 static int ctl_device_open(struct inode *inode, struct file *file)
947 {
948 	file->private_data = NULL;
949 	return 0;
950 }
951 
952 static int ctl_device_close(struct inode *inode, struct file *file)
953 {
954 	return 0;
955 }
956 
957 static int monitor_device_open(struct inode *inode, struct file *file)
958 {
959 	atomic_inc(&dlm_monitor_opened);
960 	dlm_monitor_unused = 0;
961 	return 0;
962 }
963 
964 static int monitor_device_close(struct inode *inode, struct file *file)
965 {
966 	if (atomic_dec_and_test(&dlm_monitor_opened))
967 		dlm_stop_lockspaces();
968 	return 0;
969 }
970 
971 static const struct file_operations device_fops = {
972 	.open    = device_open,
973 	.release = device_close,
974 	.read    = device_read,
975 	.write   = device_write,
976 	.poll    = device_poll,
977 	.owner   = THIS_MODULE,
978 	.llseek  = noop_llseek,
979 };
980 
981 static const struct file_operations ctl_device_fops = {
982 	.open    = ctl_device_open,
983 	.release = ctl_device_close,
984 	.read    = device_read,
985 	.write   = device_write,
986 	.owner   = THIS_MODULE,
987 	.llseek  = noop_llseek,
988 };
989 
990 static struct miscdevice ctl_device = {
991 	.name  = "dlm-control",
992 	.fops  = &ctl_device_fops,
993 	.minor = MISC_DYNAMIC_MINOR,
994 };
995 
996 static const struct file_operations monitor_device_fops = {
997 	.open    = monitor_device_open,
998 	.release = monitor_device_close,
999 	.owner   = THIS_MODULE,
1000 	.llseek  = noop_llseek,
1001 };
1002 
1003 static struct miscdevice monitor_device = {
1004 	.name  = "dlm-monitor",
1005 	.fops  = &monitor_device_fops,
1006 	.minor = MISC_DYNAMIC_MINOR,
1007 };
1008 
1009 int __init dlm_user_init(void)
1010 {
1011 	int error;
1012 
1013 	atomic_set(&dlm_monitor_opened, 0);
1014 
1015 	error = misc_register(&ctl_device);
1016 	if (error) {
1017 		log_print("misc_register failed for control device");
1018 		goto out;
1019 	}
1020 
1021 	error = misc_register(&monitor_device);
1022 	if (error) {
1023 		log_print("misc_register failed for monitor device");
1024 		misc_deregister(&ctl_device);
1025 	}
1026  out:
1027 	return error;
1028 }
1029 
1030 void dlm_user_exit(void)
1031 {
1032 	misc_deregister(&ctl_device);
1033 	misc_deregister(&monitor_device);
1034 }
1035 
1036