xref: /openbmc/linux/ipc/msg.c (revision c21b37f6)
1 /*
2  * linux/ipc/msg.c
3  * Copyright (C) 1992 Krishna Balasubramanian
4  *
5  * Removed all the remaining kerneld mess
6  * Catch the -EFAULT stuff properly
7  * Use GFP_KERNEL for messages as in 1.2
8  * Fixed up the unchecked user space derefs
9  * Copyright (C) 1998 Alan Cox & Andi Kleen
10  *
11  * /proc/sysvipc/msg support (c) 1999 Dragos Acostachioaie <dragos@iname.com>
12  *
13  * mostly rewritten, threaded and wake-one semantics added
14  * MSGMAX limit removed, sysctl's added
15  * (c) 1999 Manfred Spraul <manfred@colorfullife.com>
16  *
17  * support for audit of ipc object properties and permission changes
18  * Dustin Kirkland <dustin.kirkland@us.ibm.com>
19  *
20  * namespaces support
21  * OpenVZ, SWsoft Inc.
22  * Pavel Emelianov <xemul@openvz.org>
23  */
24 
25 #include <linux/capability.h>
26 #include <linux/slab.h>
27 #include <linux/msg.h>
28 #include <linux/spinlock.h>
29 #include <linux/init.h>
30 #include <linux/proc_fs.h>
31 #include <linux/list.h>
32 #include <linux/security.h>
33 #include <linux/sched.h>
34 #include <linux/syscalls.h>
35 #include <linux/audit.h>
36 #include <linux/seq_file.h>
37 #include <linux/mutex.h>
38 #include <linux/nsproxy.h>
39 
40 #include <asm/current.h>
41 #include <asm/uaccess.h>
42 #include "util.h"
43 
44 /*
45  * one msg_receiver structure for each sleeping receiver:
46  */
47 struct msg_receiver {
48 	struct list_head	r_list;
49 	struct task_struct	*r_tsk;
50 
51 	int			r_mode;
52 	long			r_msgtype;
53 	long			r_maxsize;
54 
55 	struct msg_msg		*volatile r_msg;
56 };
57 
58 /* one msg_sender for each sleeping sender */
59 struct msg_sender {
60 	struct list_head	list;
61 	struct task_struct	*tsk;
62 };
63 
64 #define SEARCH_ANY		1
65 #define SEARCH_EQUAL		2
66 #define SEARCH_NOTEQUAL		3
67 #define SEARCH_LESSEQUAL	4
68 
69 static atomic_t msg_bytes =	ATOMIC_INIT(0);
70 static atomic_t msg_hdrs =	ATOMIC_INIT(0);
71 
72 static struct ipc_ids init_msg_ids;
73 
74 #define msg_ids(ns)	(*((ns)->ids[IPC_MSG_IDS]))
75 
76 #define msg_lock(ns, id)	((struct msg_queue*)ipc_lock(&msg_ids(ns), id))
77 #define msg_unlock(msq)		ipc_unlock(&(msq)->q_perm)
78 #define msg_rmid(ns, id)	((struct msg_queue*)ipc_rmid(&msg_ids(ns), id))
79 #define msg_checkid(ns, msq, msgid)	\
80 	ipc_checkid(&msg_ids(ns), &msq->q_perm, msgid)
81 #define msg_buildid(ns, id, seq) \
82 	ipc_buildid(&msg_ids(ns), id, seq)
83 
84 static void freeque (struct ipc_namespace *ns, struct msg_queue *msq, int id);
85 static int newque (struct ipc_namespace *ns, key_t key, int msgflg);
86 #ifdef CONFIG_PROC_FS
87 static int sysvipc_msg_proc_show(struct seq_file *s, void *it);
88 #endif
89 
90 static void __msg_init_ns(struct ipc_namespace *ns, struct ipc_ids *ids)
91 {
92 	ns->ids[IPC_MSG_IDS] = ids;
93 	ns->msg_ctlmax = MSGMAX;
94 	ns->msg_ctlmnb = MSGMNB;
95 	ns->msg_ctlmni = MSGMNI;
96 	ipc_init_ids(ids, ns->msg_ctlmni);
97 }
98 
99 int msg_init_ns(struct ipc_namespace *ns)
100 {
101 	struct ipc_ids *ids;
102 
103 	ids = kmalloc(sizeof(struct ipc_ids), GFP_KERNEL);
104 	if (ids == NULL)
105 		return -ENOMEM;
106 
107 	__msg_init_ns(ns, ids);
108 	return 0;
109 }
110 
111 void msg_exit_ns(struct ipc_namespace *ns)
112 {
113 	int i;
114 	struct msg_queue *msq;
115 
116 	mutex_lock(&msg_ids(ns).mutex);
117 	for (i = 0; i <= msg_ids(ns).max_id; i++) {
118 		msq = msg_lock(ns, i);
119 		if (msq == NULL)
120 			continue;
121 
122 		freeque(ns, msq, i);
123 	}
124 	mutex_unlock(&msg_ids(ns).mutex);
125 
126 	ipc_fini_ids(ns->ids[IPC_MSG_IDS]);
127 	kfree(ns->ids[IPC_MSG_IDS]);
128 	ns->ids[IPC_MSG_IDS] = NULL;
129 }
130 
131 void __init msg_init(void)
132 {
133 	__msg_init_ns(&init_ipc_ns, &init_msg_ids);
134 	ipc_init_proc_interface("sysvipc/msg",
135 				"       key      msqid perms      cbytes       qnum lspid lrpid   uid   gid  cuid  cgid      stime      rtime      ctime\n",
136 				IPC_MSG_IDS, sysvipc_msg_proc_show);
137 }
138 
139 static int newque (struct ipc_namespace *ns, key_t key, int msgflg)
140 {
141 	struct msg_queue *msq;
142 	int id, retval;
143 
144 	msq = ipc_rcu_alloc(sizeof(*msq));
145 	if (!msq)
146 		return -ENOMEM;
147 
148 	msq->q_perm.mode = msgflg & S_IRWXUGO;
149 	msq->q_perm.key = key;
150 
151 	msq->q_perm.security = NULL;
152 	retval = security_msg_queue_alloc(msq);
153 	if (retval) {
154 		ipc_rcu_putref(msq);
155 		return retval;
156 	}
157 
158 	id = ipc_addid(&msg_ids(ns), &msq->q_perm, ns->msg_ctlmni);
159 	if (id == -1) {
160 		security_msg_queue_free(msq);
161 		ipc_rcu_putref(msq);
162 		return -ENOSPC;
163 	}
164 
165 	msq->q_id = msg_buildid(ns, id, msq->q_perm.seq);
166 	msq->q_stime = msq->q_rtime = 0;
167 	msq->q_ctime = get_seconds();
168 	msq->q_cbytes = msq->q_qnum = 0;
169 	msq->q_qbytes = ns->msg_ctlmnb;
170 	msq->q_lspid = msq->q_lrpid = 0;
171 	INIT_LIST_HEAD(&msq->q_messages);
172 	INIT_LIST_HEAD(&msq->q_receivers);
173 	INIT_LIST_HEAD(&msq->q_senders);
174 	msg_unlock(msq);
175 
176 	return msq->q_id;
177 }
178 
179 static inline void ss_add(struct msg_queue *msq, struct msg_sender *mss)
180 {
181 	mss->tsk = current;
182 	current->state = TASK_INTERRUPTIBLE;
183 	list_add_tail(&mss->list, &msq->q_senders);
184 }
185 
186 static inline void ss_del(struct msg_sender *mss)
187 {
188 	if (mss->list.next != NULL)
189 		list_del(&mss->list);
190 }
191 
192 static void ss_wakeup(struct list_head *h, int kill)
193 {
194 	struct list_head *tmp;
195 
196 	tmp = h->next;
197 	while (tmp != h) {
198 		struct msg_sender *mss;
199 
200 		mss = list_entry(tmp, struct msg_sender, list);
201 		tmp = tmp->next;
202 		if (kill)
203 			mss->list.next = NULL;
204 		wake_up_process(mss->tsk);
205 	}
206 }
207 
208 static void expunge_all(struct msg_queue *msq, int res)
209 {
210 	struct list_head *tmp;
211 
212 	tmp = msq->q_receivers.next;
213 	while (tmp != &msq->q_receivers) {
214 		struct msg_receiver *msr;
215 
216 		msr = list_entry(tmp, struct msg_receiver, r_list);
217 		tmp = tmp->next;
218 		msr->r_msg = NULL;
219 		wake_up_process(msr->r_tsk);
220 		smp_mb();
221 		msr->r_msg = ERR_PTR(res);
222 	}
223 }
224 
225 /*
226  * freeque() wakes up waiters on the sender and receiver waiting queue,
227  * removes the message queue from message queue ID
228  * array, and cleans up all the messages associated with this queue.
229  *
230  * msg_ids.mutex and the spinlock for this message queue is hold
231  * before freeque() is called. msg_ids.mutex remains locked on exit.
232  */
233 static void freeque(struct ipc_namespace *ns, struct msg_queue *msq, int id)
234 {
235 	struct list_head *tmp;
236 
237 	expunge_all(msq, -EIDRM);
238 	ss_wakeup(&msq->q_senders, 1);
239 	msq = msg_rmid(ns, id);
240 	msg_unlock(msq);
241 
242 	tmp = msq->q_messages.next;
243 	while (tmp != &msq->q_messages) {
244 		struct msg_msg *msg = list_entry(tmp, struct msg_msg, m_list);
245 
246 		tmp = tmp->next;
247 		atomic_dec(&msg_hdrs);
248 		free_msg(msg);
249 	}
250 	atomic_sub(msq->q_cbytes, &msg_bytes);
251 	security_msg_queue_free(msq);
252 	ipc_rcu_putref(msq);
253 }
254 
255 asmlinkage long sys_msgget(key_t key, int msgflg)
256 {
257 	struct msg_queue *msq;
258 	int id, ret = -EPERM;
259 	struct ipc_namespace *ns;
260 
261 	ns = current->nsproxy->ipc_ns;
262 
263 	mutex_lock(&msg_ids(ns).mutex);
264 	if (key == IPC_PRIVATE)
265 		ret = newque(ns, key, msgflg);
266 	else if ((id = ipc_findkey(&msg_ids(ns), key)) == -1) { /* key not used */
267 		if (!(msgflg & IPC_CREAT))
268 			ret = -ENOENT;
269 		else
270 			ret = newque(ns, key, msgflg);
271 	} else if (msgflg & IPC_CREAT && msgflg & IPC_EXCL) {
272 		ret = -EEXIST;
273 	} else {
274 		msq = msg_lock(ns, id);
275 		BUG_ON(msq == NULL);
276 		if (ipcperms(&msq->q_perm, msgflg))
277 			ret = -EACCES;
278 		else {
279 			int qid = msg_buildid(ns, id, msq->q_perm.seq);
280 
281 			ret = security_msg_queue_associate(msq, msgflg);
282 			if (!ret)
283 				ret = qid;
284 		}
285 		msg_unlock(msq);
286 	}
287 	mutex_unlock(&msg_ids(ns).mutex);
288 
289 	return ret;
290 }
291 
292 static inline unsigned long
293 copy_msqid_to_user(void __user *buf, struct msqid64_ds *in, int version)
294 {
295 	switch(version) {
296 	case IPC_64:
297 		return copy_to_user(buf, in, sizeof(*in));
298 	case IPC_OLD:
299 	{
300 		struct msqid_ds out;
301 
302 		memset(&out, 0, sizeof(out));
303 
304 		ipc64_perm_to_ipc_perm(&in->msg_perm, &out.msg_perm);
305 
306 		out.msg_stime		= in->msg_stime;
307 		out.msg_rtime		= in->msg_rtime;
308 		out.msg_ctime		= in->msg_ctime;
309 
310 		if (in->msg_cbytes > USHRT_MAX)
311 			out.msg_cbytes	= USHRT_MAX;
312 		else
313 			out.msg_cbytes	= in->msg_cbytes;
314 		out.msg_lcbytes		= in->msg_cbytes;
315 
316 		if (in->msg_qnum > USHRT_MAX)
317 			out.msg_qnum	= USHRT_MAX;
318 		else
319 			out.msg_qnum	= in->msg_qnum;
320 
321 		if (in->msg_qbytes > USHRT_MAX)
322 			out.msg_qbytes	= USHRT_MAX;
323 		else
324 			out.msg_qbytes	= in->msg_qbytes;
325 		out.msg_lqbytes		= in->msg_qbytes;
326 
327 		out.msg_lspid		= in->msg_lspid;
328 		out.msg_lrpid		= in->msg_lrpid;
329 
330 		return copy_to_user(buf, &out, sizeof(out));
331 	}
332 	default:
333 		return -EINVAL;
334 	}
335 }
336 
337 struct msq_setbuf {
338 	unsigned long	qbytes;
339 	uid_t		uid;
340 	gid_t		gid;
341 	mode_t		mode;
342 };
343 
344 static inline unsigned long
345 copy_msqid_from_user(struct msq_setbuf *out, void __user *buf, int version)
346 {
347 	switch(version) {
348 	case IPC_64:
349 	{
350 		struct msqid64_ds tbuf;
351 
352 		if (copy_from_user(&tbuf, buf, sizeof(tbuf)))
353 			return -EFAULT;
354 
355 		out->qbytes		= tbuf.msg_qbytes;
356 		out->uid		= tbuf.msg_perm.uid;
357 		out->gid		= tbuf.msg_perm.gid;
358 		out->mode		= tbuf.msg_perm.mode;
359 
360 		return 0;
361 	}
362 	case IPC_OLD:
363 	{
364 		struct msqid_ds tbuf_old;
365 
366 		if (copy_from_user(&tbuf_old, buf, sizeof(tbuf_old)))
367 			return -EFAULT;
368 
369 		out->uid		= tbuf_old.msg_perm.uid;
370 		out->gid		= tbuf_old.msg_perm.gid;
371 		out->mode		= tbuf_old.msg_perm.mode;
372 
373 		if (tbuf_old.msg_qbytes == 0)
374 			out->qbytes	= tbuf_old.msg_lqbytes;
375 		else
376 			out->qbytes	= tbuf_old.msg_qbytes;
377 
378 		return 0;
379 	}
380 	default:
381 		return -EINVAL;
382 	}
383 }
384 
385 asmlinkage long sys_msgctl(int msqid, int cmd, struct msqid_ds __user *buf)
386 {
387 	struct kern_ipc_perm *ipcp;
388 	struct msq_setbuf uninitialized_var(setbuf);
389 	struct msg_queue *msq;
390 	int err, version;
391 	struct ipc_namespace *ns;
392 
393 	if (msqid < 0 || cmd < 0)
394 		return -EINVAL;
395 
396 	version = ipc_parse_version(&cmd);
397 	ns = current->nsproxy->ipc_ns;
398 
399 	switch (cmd) {
400 	case IPC_INFO:
401 	case MSG_INFO:
402 	{
403 		struct msginfo msginfo;
404 		int max_id;
405 
406 		if (!buf)
407 			return -EFAULT;
408 		/*
409 		 * We must not return kernel stack data.
410 		 * due to padding, it's not enough
411 		 * to set all member fields.
412 		 */
413 		err = security_msg_queue_msgctl(NULL, cmd);
414 		if (err)
415 			return err;
416 
417 		memset(&msginfo, 0, sizeof(msginfo));
418 		msginfo.msgmni = ns->msg_ctlmni;
419 		msginfo.msgmax = ns->msg_ctlmax;
420 		msginfo.msgmnb = ns->msg_ctlmnb;
421 		msginfo.msgssz = MSGSSZ;
422 		msginfo.msgseg = MSGSEG;
423 		mutex_lock(&msg_ids(ns).mutex);
424 		if (cmd == MSG_INFO) {
425 			msginfo.msgpool = msg_ids(ns).in_use;
426 			msginfo.msgmap = atomic_read(&msg_hdrs);
427 			msginfo.msgtql = atomic_read(&msg_bytes);
428 		} else {
429 			msginfo.msgmap = MSGMAP;
430 			msginfo.msgpool = MSGPOOL;
431 			msginfo.msgtql = MSGTQL;
432 		}
433 		max_id = msg_ids(ns).max_id;
434 		mutex_unlock(&msg_ids(ns).mutex);
435 		if (copy_to_user(buf, &msginfo, sizeof(struct msginfo)))
436 			return -EFAULT;
437 		return (max_id < 0) ? 0 : max_id;
438 	}
439 	case MSG_STAT:
440 	case IPC_STAT:
441 	{
442 		struct msqid64_ds tbuf;
443 		int success_return;
444 
445 		if (!buf)
446 			return -EFAULT;
447 		if (cmd == MSG_STAT && msqid >= msg_ids(ns).entries->size)
448 			return -EINVAL;
449 
450 		memset(&tbuf, 0, sizeof(tbuf));
451 
452 		msq = msg_lock(ns, msqid);
453 		if (msq == NULL)
454 			return -EINVAL;
455 
456 		if (cmd == MSG_STAT) {
457 			success_return = msg_buildid(ns, msqid, msq->q_perm.seq);
458 		} else {
459 			err = -EIDRM;
460 			if (msg_checkid(ns, msq, msqid))
461 				goto out_unlock;
462 			success_return = 0;
463 		}
464 		err = -EACCES;
465 		if (ipcperms(&msq->q_perm, S_IRUGO))
466 			goto out_unlock;
467 
468 		err = security_msg_queue_msgctl(msq, cmd);
469 		if (err)
470 			goto out_unlock;
471 
472 		kernel_to_ipc64_perm(&msq->q_perm, &tbuf.msg_perm);
473 		tbuf.msg_stime  = msq->q_stime;
474 		tbuf.msg_rtime  = msq->q_rtime;
475 		tbuf.msg_ctime  = msq->q_ctime;
476 		tbuf.msg_cbytes = msq->q_cbytes;
477 		tbuf.msg_qnum   = msq->q_qnum;
478 		tbuf.msg_qbytes = msq->q_qbytes;
479 		tbuf.msg_lspid  = msq->q_lspid;
480 		tbuf.msg_lrpid  = msq->q_lrpid;
481 		msg_unlock(msq);
482 		if (copy_msqid_to_user(buf, &tbuf, version))
483 			return -EFAULT;
484 		return success_return;
485 	}
486 	case IPC_SET:
487 		if (!buf)
488 			return -EFAULT;
489 		if (copy_msqid_from_user(&setbuf, buf, version))
490 			return -EFAULT;
491 		break;
492 	case IPC_RMID:
493 		break;
494 	default:
495 		return  -EINVAL;
496 	}
497 
498 	mutex_lock(&msg_ids(ns).mutex);
499 	msq = msg_lock(ns, msqid);
500 	err = -EINVAL;
501 	if (msq == NULL)
502 		goto out_up;
503 
504 	err = -EIDRM;
505 	if (msg_checkid(ns, msq, msqid))
506 		goto out_unlock_up;
507 	ipcp = &msq->q_perm;
508 
509 	err = audit_ipc_obj(ipcp);
510 	if (err)
511 		goto out_unlock_up;
512 	if (cmd == IPC_SET) {
513 		err = audit_ipc_set_perm(setbuf.qbytes, setbuf.uid, setbuf.gid,
514 					 setbuf.mode);
515 		if (err)
516 			goto out_unlock_up;
517 	}
518 
519 	err = -EPERM;
520 	if (current->euid != ipcp->cuid &&
521 	    current->euid != ipcp->uid && !capable(CAP_SYS_ADMIN))
522 		/* We _could_ check for CAP_CHOWN above, but we don't */
523 		goto out_unlock_up;
524 
525 	err = security_msg_queue_msgctl(msq, cmd);
526 	if (err)
527 		goto out_unlock_up;
528 
529 	switch (cmd) {
530 	case IPC_SET:
531 	{
532 		err = -EPERM;
533 		if (setbuf.qbytes > ns->msg_ctlmnb && !capable(CAP_SYS_RESOURCE))
534 			goto out_unlock_up;
535 
536 		msq->q_qbytes = setbuf.qbytes;
537 
538 		ipcp->uid = setbuf.uid;
539 		ipcp->gid = setbuf.gid;
540 		ipcp->mode = (ipcp->mode & ~S_IRWXUGO) |
541 			     (S_IRWXUGO & setbuf.mode);
542 		msq->q_ctime = get_seconds();
543 		/* sleeping receivers might be excluded by
544 		 * stricter permissions.
545 		 */
546 		expunge_all(msq, -EAGAIN);
547 		/* sleeping senders might be able to send
548 		 * due to a larger queue size.
549 		 */
550 		ss_wakeup(&msq->q_senders, 0);
551 		msg_unlock(msq);
552 		break;
553 	}
554 	case IPC_RMID:
555 		freeque(ns, msq, msqid);
556 		break;
557 	}
558 	err = 0;
559 out_up:
560 	mutex_unlock(&msg_ids(ns).mutex);
561 	return err;
562 out_unlock_up:
563 	msg_unlock(msq);
564 	goto out_up;
565 out_unlock:
566 	msg_unlock(msq);
567 	return err;
568 }
569 
570 static int testmsg(struct msg_msg *msg, long type, int mode)
571 {
572 	switch(mode)
573 	{
574 		case SEARCH_ANY:
575 			return 1;
576 		case SEARCH_LESSEQUAL:
577 			if (msg->m_type <=type)
578 				return 1;
579 			break;
580 		case SEARCH_EQUAL:
581 			if (msg->m_type == type)
582 				return 1;
583 			break;
584 		case SEARCH_NOTEQUAL:
585 			if (msg->m_type != type)
586 				return 1;
587 			break;
588 	}
589 	return 0;
590 }
591 
592 static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
593 {
594 	struct list_head *tmp;
595 
596 	tmp = msq->q_receivers.next;
597 	while (tmp != &msq->q_receivers) {
598 		struct msg_receiver *msr;
599 
600 		msr = list_entry(tmp, struct msg_receiver, r_list);
601 		tmp = tmp->next;
602 		if (testmsg(msg, msr->r_msgtype, msr->r_mode) &&
603 		    !security_msg_queue_msgrcv(msq, msg, msr->r_tsk,
604 					       msr->r_msgtype, msr->r_mode)) {
605 
606 			list_del(&msr->r_list);
607 			if (msr->r_maxsize < msg->m_ts) {
608 				msr->r_msg = NULL;
609 				wake_up_process(msr->r_tsk);
610 				smp_mb();
611 				msr->r_msg = ERR_PTR(-E2BIG);
612 			} else {
613 				msr->r_msg = NULL;
614 				msq->q_lrpid = msr->r_tsk->pid;
615 				msq->q_rtime = get_seconds();
616 				wake_up_process(msr->r_tsk);
617 				smp_mb();
618 				msr->r_msg = msg;
619 
620 				return 1;
621 			}
622 		}
623 	}
624 	return 0;
625 }
626 
627 long do_msgsnd(int msqid, long mtype, void __user *mtext,
628 		size_t msgsz, int msgflg)
629 {
630 	struct msg_queue *msq;
631 	struct msg_msg *msg;
632 	int err;
633 	struct ipc_namespace *ns;
634 
635 	ns = current->nsproxy->ipc_ns;
636 
637 	if (msgsz > ns->msg_ctlmax || (long) msgsz < 0 || msqid < 0)
638 		return -EINVAL;
639 	if (mtype < 1)
640 		return -EINVAL;
641 
642 	msg = load_msg(mtext, msgsz);
643 	if (IS_ERR(msg))
644 		return PTR_ERR(msg);
645 
646 	msg->m_type = mtype;
647 	msg->m_ts = msgsz;
648 
649 	msq = msg_lock(ns, msqid);
650 	err = -EINVAL;
651 	if (msq == NULL)
652 		goto out_free;
653 
654 	err= -EIDRM;
655 	if (msg_checkid(ns, msq, msqid))
656 		goto out_unlock_free;
657 
658 	for (;;) {
659 		struct msg_sender s;
660 
661 		err = -EACCES;
662 		if (ipcperms(&msq->q_perm, S_IWUGO))
663 			goto out_unlock_free;
664 
665 		err = security_msg_queue_msgsnd(msq, msg, msgflg);
666 		if (err)
667 			goto out_unlock_free;
668 
669 		if (msgsz + msq->q_cbytes <= msq->q_qbytes &&
670 				1 + msq->q_qnum <= msq->q_qbytes) {
671 			break;
672 		}
673 
674 		/* queue full, wait: */
675 		if (msgflg & IPC_NOWAIT) {
676 			err = -EAGAIN;
677 			goto out_unlock_free;
678 		}
679 		ss_add(msq, &s);
680 		ipc_rcu_getref(msq);
681 		msg_unlock(msq);
682 		schedule();
683 
684 		ipc_lock_by_ptr(&msq->q_perm);
685 		ipc_rcu_putref(msq);
686 		if (msq->q_perm.deleted) {
687 			err = -EIDRM;
688 			goto out_unlock_free;
689 		}
690 		ss_del(&s);
691 
692 		if (signal_pending(current)) {
693 			err = -ERESTARTNOHAND;
694 			goto out_unlock_free;
695 		}
696 	}
697 
698 	msq->q_lspid = current->tgid;
699 	msq->q_stime = get_seconds();
700 
701 	if (!pipelined_send(msq, msg)) {
702 		/* noone is waiting for this message, enqueue it */
703 		list_add_tail(&msg->m_list, &msq->q_messages);
704 		msq->q_cbytes += msgsz;
705 		msq->q_qnum++;
706 		atomic_add(msgsz, &msg_bytes);
707 		atomic_inc(&msg_hdrs);
708 	}
709 
710 	err = 0;
711 	msg = NULL;
712 
713 out_unlock_free:
714 	msg_unlock(msq);
715 out_free:
716 	if (msg != NULL)
717 		free_msg(msg);
718 	return err;
719 }
720 
721 asmlinkage long
722 sys_msgsnd(int msqid, struct msgbuf __user *msgp, size_t msgsz, int msgflg)
723 {
724 	long mtype;
725 
726 	if (get_user(mtype, &msgp->mtype))
727 		return -EFAULT;
728 	return do_msgsnd(msqid, mtype, msgp->mtext, msgsz, msgflg);
729 }
730 
731 static inline int convert_mode(long *msgtyp, int msgflg)
732 {
733 	/*
734 	 *  find message of correct type.
735 	 *  msgtyp = 0 => get first.
736 	 *  msgtyp > 0 => get first message of matching type.
737 	 *  msgtyp < 0 => get message with least type must be < abs(msgtype).
738 	 */
739 	if (*msgtyp == 0)
740 		return SEARCH_ANY;
741 	if (*msgtyp < 0) {
742 		*msgtyp = -*msgtyp;
743 		return SEARCH_LESSEQUAL;
744 	}
745 	if (msgflg & MSG_EXCEPT)
746 		return SEARCH_NOTEQUAL;
747 	return SEARCH_EQUAL;
748 }
749 
750 long do_msgrcv(int msqid, long *pmtype, void __user *mtext,
751 		size_t msgsz, long msgtyp, int msgflg)
752 {
753 	struct msg_queue *msq;
754 	struct msg_msg *msg;
755 	int mode;
756 	struct ipc_namespace *ns;
757 
758 	if (msqid < 0 || (long) msgsz < 0)
759 		return -EINVAL;
760 	mode = convert_mode(&msgtyp, msgflg);
761 	ns = current->nsproxy->ipc_ns;
762 
763 	msq = msg_lock(ns, msqid);
764 	if (msq == NULL)
765 		return -EINVAL;
766 
767 	msg = ERR_PTR(-EIDRM);
768 	if (msg_checkid(ns, msq, msqid))
769 		goto out_unlock;
770 
771 	for (;;) {
772 		struct msg_receiver msr_d;
773 		struct list_head *tmp;
774 
775 		msg = ERR_PTR(-EACCES);
776 		if (ipcperms(&msq->q_perm, S_IRUGO))
777 			goto out_unlock;
778 
779 		msg = ERR_PTR(-EAGAIN);
780 		tmp = msq->q_messages.next;
781 		while (tmp != &msq->q_messages) {
782 			struct msg_msg *walk_msg;
783 
784 			walk_msg = list_entry(tmp, struct msg_msg, m_list);
785 			if (testmsg(walk_msg, msgtyp, mode) &&
786 			    !security_msg_queue_msgrcv(msq, walk_msg, current,
787 						       msgtyp, mode)) {
788 
789 				msg = walk_msg;
790 				if (mode == SEARCH_LESSEQUAL &&
791 						walk_msg->m_type != 1) {
792 					msg = walk_msg;
793 					msgtyp = walk_msg->m_type - 1;
794 				} else {
795 					msg = walk_msg;
796 					break;
797 				}
798 			}
799 			tmp = tmp->next;
800 		}
801 		if (!IS_ERR(msg)) {
802 			/*
803 			 * Found a suitable message.
804 			 * Unlink it from the queue.
805 			 */
806 			if ((msgsz < msg->m_ts) && !(msgflg & MSG_NOERROR)) {
807 				msg = ERR_PTR(-E2BIG);
808 				goto out_unlock;
809 			}
810 			list_del(&msg->m_list);
811 			msq->q_qnum--;
812 			msq->q_rtime = get_seconds();
813 			msq->q_lrpid = current->tgid;
814 			msq->q_cbytes -= msg->m_ts;
815 			atomic_sub(msg->m_ts, &msg_bytes);
816 			atomic_dec(&msg_hdrs);
817 			ss_wakeup(&msq->q_senders, 0);
818 			msg_unlock(msq);
819 			break;
820 		}
821 		/* No message waiting. Wait for a message */
822 		if (msgflg & IPC_NOWAIT) {
823 			msg = ERR_PTR(-ENOMSG);
824 			goto out_unlock;
825 		}
826 		list_add_tail(&msr_d.r_list, &msq->q_receivers);
827 		msr_d.r_tsk = current;
828 		msr_d.r_msgtype = msgtyp;
829 		msr_d.r_mode = mode;
830 		if (msgflg & MSG_NOERROR)
831 			msr_d.r_maxsize = INT_MAX;
832 		else
833 			msr_d.r_maxsize = msgsz;
834 		msr_d.r_msg = ERR_PTR(-EAGAIN);
835 		current->state = TASK_INTERRUPTIBLE;
836 		msg_unlock(msq);
837 
838 		schedule();
839 
840 		/* Lockless receive, part 1:
841 		 * Disable preemption.  We don't hold a reference to the queue
842 		 * and getting a reference would defeat the idea of a lockless
843 		 * operation, thus the code relies on rcu to guarantee the
844 		 * existance of msq:
845 		 * Prior to destruction, expunge_all(-EIRDM) changes r_msg.
846 		 * Thus if r_msg is -EAGAIN, then the queue not yet destroyed.
847 		 * rcu_read_lock() prevents preemption between reading r_msg
848 		 * and the spin_lock() inside ipc_lock_by_ptr().
849 		 */
850 		rcu_read_lock();
851 
852 		/* Lockless receive, part 2:
853 		 * Wait until pipelined_send or expunge_all are outside of
854 		 * wake_up_process(). There is a race with exit(), see
855 		 * ipc/mqueue.c for the details.
856 		 */
857 		msg = (struct msg_msg*)msr_d.r_msg;
858 		while (msg == NULL) {
859 			cpu_relax();
860 			msg = (struct msg_msg *)msr_d.r_msg;
861 		}
862 
863 		/* Lockless receive, part 3:
864 		 * If there is a message or an error then accept it without
865 		 * locking.
866 		 */
867 		if (msg != ERR_PTR(-EAGAIN)) {
868 			rcu_read_unlock();
869 			break;
870 		}
871 
872 		/* Lockless receive, part 3:
873 		 * Acquire the queue spinlock.
874 		 */
875 		ipc_lock_by_ptr(&msq->q_perm);
876 		rcu_read_unlock();
877 
878 		/* Lockless receive, part 4:
879 		 * Repeat test after acquiring the spinlock.
880 		 */
881 		msg = (struct msg_msg*)msr_d.r_msg;
882 		if (msg != ERR_PTR(-EAGAIN))
883 			goto out_unlock;
884 
885 		list_del(&msr_d.r_list);
886 		if (signal_pending(current)) {
887 			msg = ERR_PTR(-ERESTARTNOHAND);
888 out_unlock:
889 			msg_unlock(msq);
890 			break;
891 		}
892 	}
893 	if (IS_ERR(msg))
894 		return PTR_ERR(msg);
895 
896 	msgsz = (msgsz > msg->m_ts) ? msg->m_ts : msgsz;
897 	*pmtype = msg->m_type;
898 	if (store_msg(mtext, msg, msgsz))
899 		msgsz = -EFAULT;
900 
901 	free_msg(msg);
902 
903 	return msgsz;
904 }
905 
906 asmlinkage long sys_msgrcv(int msqid, struct msgbuf __user *msgp, size_t msgsz,
907 			   long msgtyp, int msgflg)
908 {
909 	long err, mtype;
910 
911 	err =  do_msgrcv(msqid, &mtype, msgp->mtext, msgsz, msgtyp, msgflg);
912 	if (err < 0)
913 		goto out;
914 
915 	if (put_user(mtype, &msgp->mtype))
916 		err = -EFAULT;
917 out:
918 	return err;
919 }
920 
921 #ifdef CONFIG_PROC_FS
922 static int sysvipc_msg_proc_show(struct seq_file *s, void *it)
923 {
924 	struct msg_queue *msq = it;
925 
926 	return seq_printf(s,
927 			"%10d %10d  %4o  %10lu %10lu %5u %5u %5u %5u %5u %5u %10lu %10lu %10lu\n",
928 			msq->q_perm.key,
929 			msq->q_id,
930 			msq->q_perm.mode,
931 			msq->q_cbytes,
932 			msq->q_qnum,
933 			msq->q_lspid,
934 			msq->q_lrpid,
935 			msq->q_perm.uid,
936 			msq->q_perm.gid,
937 			msq->q_perm.cuid,
938 			msq->q_perm.cgid,
939 			msq->q_stime,
940 			msq->q_rtime,
941 			msq->q_ctime);
942 }
943 #endif
944