xref: /openbmc/linux/net/rds/recv.c (revision da2ef666)
1 /*
2  * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * OpenIB.org BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  *
32  */
33 #include <linux/kernel.h>
34 #include <linux/slab.h>
35 #include <net/sock.h>
36 #include <linux/in.h>
37 #include <linux/export.h>
38 #include <linux/time.h>
39 #include <linux/rds.h>
40 
41 #include "rds.h"
42 
43 void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
44 		 struct in6_addr *saddr)
45 {
46 	int i;
47 
48 	refcount_set(&inc->i_refcount, 1);
49 	INIT_LIST_HEAD(&inc->i_item);
50 	inc->i_conn = conn;
51 	inc->i_saddr = *saddr;
52 	inc->i_rdma_cookie = 0;
53 	inc->i_rx_tstamp.tv_sec = 0;
54 	inc->i_rx_tstamp.tv_usec = 0;
55 
56 	for (i = 0; i < RDS_RX_MAX_TRACES; i++)
57 		inc->i_rx_lat_trace[i] = 0;
58 }
59 EXPORT_SYMBOL_GPL(rds_inc_init);
60 
61 void rds_inc_path_init(struct rds_incoming *inc, struct rds_conn_path *cp,
62 		       struct in6_addr  *saddr)
63 {
64 	refcount_set(&inc->i_refcount, 1);
65 	INIT_LIST_HEAD(&inc->i_item);
66 	inc->i_conn = cp->cp_conn;
67 	inc->i_conn_path = cp;
68 	inc->i_saddr = *saddr;
69 	inc->i_rdma_cookie = 0;
70 	inc->i_rx_tstamp.tv_sec = 0;
71 	inc->i_rx_tstamp.tv_usec = 0;
72 }
73 EXPORT_SYMBOL_GPL(rds_inc_path_init);
74 
75 static void rds_inc_addref(struct rds_incoming *inc)
76 {
77 	rdsdebug("addref inc %p ref %d\n", inc, refcount_read(&inc->i_refcount));
78 	refcount_inc(&inc->i_refcount);
79 }
80 
81 void rds_inc_put(struct rds_incoming *inc)
82 {
83 	rdsdebug("put inc %p ref %d\n", inc, refcount_read(&inc->i_refcount));
84 	if (refcount_dec_and_test(&inc->i_refcount)) {
85 		BUG_ON(!list_empty(&inc->i_item));
86 
87 		inc->i_conn->c_trans->inc_free(inc);
88 	}
89 }
90 EXPORT_SYMBOL_GPL(rds_inc_put);
91 
92 static void rds_recv_rcvbuf_delta(struct rds_sock *rs, struct sock *sk,
93 				  struct rds_cong_map *map,
94 				  int delta, __be16 port)
95 {
96 	int now_congested;
97 
98 	if (delta == 0)
99 		return;
100 
101 	rs->rs_rcv_bytes += delta;
102 	if (delta > 0)
103 		rds_stats_add(s_recv_bytes_added_to_socket, delta);
104 	else
105 		rds_stats_add(s_recv_bytes_removed_from_socket, -delta);
106 
107 	/* loop transport doesn't send/recv congestion updates */
108 	if (rs->rs_transport->t_type == RDS_TRANS_LOOP)
109 		return;
110 
111 	now_congested = rs->rs_rcv_bytes > rds_sk_rcvbuf(rs);
112 
113 	rdsdebug("rs %p (%pI6c:%u) recv bytes %d buf %d "
114 	  "now_cong %d delta %d\n",
115 	  rs, &rs->rs_bound_addr,
116 	  ntohs(rs->rs_bound_port), rs->rs_rcv_bytes,
117 	  rds_sk_rcvbuf(rs), now_congested, delta);
118 
119 	/* wasn't -> am congested */
120 	if (!rs->rs_congested && now_congested) {
121 		rs->rs_congested = 1;
122 		rds_cong_set_bit(map, port);
123 		rds_cong_queue_updates(map);
124 	}
125 	/* was -> aren't congested */
126 	/* Require more free space before reporting uncongested to prevent
127 	   bouncing cong/uncong state too often */
128 	else if (rs->rs_congested && (rs->rs_rcv_bytes < (rds_sk_rcvbuf(rs)/2))) {
129 		rs->rs_congested = 0;
130 		rds_cong_clear_bit(map, port);
131 		rds_cong_queue_updates(map);
132 	}
133 
134 	/* do nothing if no change in cong state */
135 }
136 
137 static void rds_conn_peer_gen_update(struct rds_connection *conn,
138 				     u32 peer_gen_num)
139 {
140 	int i;
141 	struct rds_message *rm, *tmp;
142 	unsigned long flags;
143 
144 	WARN_ON(conn->c_trans->t_type != RDS_TRANS_TCP);
145 	if (peer_gen_num != 0) {
146 		if (conn->c_peer_gen_num != 0 &&
147 		    peer_gen_num != conn->c_peer_gen_num) {
148 			for (i = 0; i < RDS_MPATH_WORKERS; i++) {
149 				struct rds_conn_path *cp;
150 
151 				cp = &conn->c_path[i];
152 				spin_lock_irqsave(&cp->cp_lock, flags);
153 				cp->cp_next_tx_seq = 1;
154 				cp->cp_next_rx_seq = 0;
155 				list_for_each_entry_safe(rm, tmp,
156 							 &cp->cp_retrans,
157 							 m_conn_item) {
158 					set_bit(RDS_MSG_FLUSH, &rm->m_flags);
159 				}
160 				spin_unlock_irqrestore(&cp->cp_lock, flags);
161 			}
162 		}
163 		conn->c_peer_gen_num = peer_gen_num;
164 	}
165 }
166 
167 /*
168  * Process all extension headers that come with this message.
169  */
170 static void rds_recv_incoming_exthdrs(struct rds_incoming *inc, struct rds_sock *rs)
171 {
172 	struct rds_header *hdr = &inc->i_hdr;
173 	unsigned int pos = 0, type, len;
174 	union {
175 		struct rds_ext_header_version version;
176 		struct rds_ext_header_rdma rdma;
177 		struct rds_ext_header_rdma_dest rdma_dest;
178 	} buffer;
179 
180 	while (1) {
181 		len = sizeof(buffer);
182 		type = rds_message_next_extension(hdr, &pos, &buffer, &len);
183 		if (type == RDS_EXTHDR_NONE)
184 			break;
185 		/* Process extension header here */
186 		switch (type) {
187 		case RDS_EXTHDR_RDMA:
188 			rds_rdma_unuse(rs, be32_to_cpu(buffer.rdma.h_rdma_rkey), 0);
189 			break;
190 
191 		case RDS_EXTHDR_RDMA_DEST:
192 			/* We ignore the size for now. We could stash it
193 			 * somewhere and use it for error checking. */
194 			inc->i_rdma_cookie = rds_rdma_make_cookie(
195 					be32_to_cpu(buffer.rdma_dest.h_rdma_rkey),
196 					be32_to_cpu(buffer.rdma_dest.h_rdma_offset));
197 
198 			break;
199 		}
200 	}
201 }
202 
203 static void rds_recv_hs_exthdrs(struct rds_header *hdr,
204 				struct rds_connection *conn)
205 {
206 	unsigned int pos = 0, type, len;
207 	union {
208 		struct rds_ext_header_version version;
209 		u16 rds_npaths;
210 		u32 rds_gen_num;
211 	} buffer;
212 	u32 new_peer_gen_num = 0;
213 
214 	while (1) {
215 		len = sizeof(buffer);
216 		type = rds_message_next_extension(hdr, &pos, &buffer, &len);
217 		if (type == RDS_EXTHDR_NONE)
218 			break;
219 		/* Process extension header here */
220 		switch (type) {
221 		case RDS_EXTHDR_NPATHS:
222 			conn->c_npaths = min_t(int, RDS_MPATH_WORKERS,
223 					       be16_to_cpu(buffer.rds_npaths));
224 			break;
225 		case RDS_EXTHDR_GEN_NUM:
226 			new_peer_gen_num = be32_to_cpu(buffer.rds_gen_num);
227 			break;
228 		default:
229 			pr_warn_ratelimited("ignoring unknown exthdr type "
230 					     "0x%x\n", type);
231 		}
232 	}
233 	/* if RDS_EXTHDR_NPATHS was not found, default to a single-path */
234 	conn->c_npaths = max_t(int, conn->c_npaths, 1);
235 	conn->c_ping_triggered = 0;
236 	rds_conn_peer_gen_update(conn, new_peer_gen_num);
237 }
238 
239 /* rds_start_mprds() will synchronously start multiple paths when appropriate.
240  * The scheme is based on the following rules:
241  *
242  * 1. rds_sendmsg on first connect attempt sends the probe ping, with the
243  *    sender's npaths (s_npaths)
244  * 2. rcvr of probe-ping knows the mprds_paths = min(s_npaths, r_npaths). It
245  *    sends back a probe-pong with r_npaths. After that, if rcvr is the
246  *    smaller ip addr, it starts rds_conn_path_connect_if_down on all
247  *    mprds_paths.
248  * 3. sender gets woken up, and can move to rds_conn_path_connect_if_down.
249  *    If it is the smaller ipaddr, rds_conn_path_connect_if_down can be
250  *    called after reception of the probe-pong on all mprds_paths.
251  *    Otherwise (sender of probe-ping is not the smaller ip addr): just call
252  *    rds_conn_path_connect_if_down on the hashed path. (see rule 4)
253  * 4. rds_connect_worker must only trigger a connection if laddr < faddr.
254  * 5. sender may end up queuing the packet on the cp. will get sent out later.
255  *    when connection is completed.
256  */
257 static void rds_start_mprds(struct rds_connection *conn)
258 {
259 	int i;
260 	struct rds_conn_path *cp;
261 
262 	if (conn->c_npaths > 1 &&
263 	    rds_addr_cmp(&conn->c_laddr, &conn->c_faddr) < 0) {
264 		for (i = 0; i < conn->c_npaths; i++) {
265 			cp = &conn->c_path[i];
266 			rds_conn_path_connect_if_down(cp);
267 		}
268 	}
269 }
270 
271 /*
272  * The transport must make sure that this is serialized against other
273  * rx and conn reset on this specific conn.
274  *
275  * We currently assert that only one fragmented message will be sent
276  * down a connection at a time.  This lets us reassemble in the conn
277  * instead of per-flow which means that we don't have to go digging through
278  * flows to tear down partial reassembly progress on conn failure and
279  * we save flow lookup and locking for each frag arrival.  It does mean
280  * that small messages will wait behind large ones.  Fragmenting at all
281  * is only to reduce the memory consumption of pre-posted buffers.
282  *
283  * The caller passes in saddr and daddr instead of us getting it from the
284  * conn.  This lets loopback, who only has one conn for both directions,
285  * tell us which roles the addrs in the conn are playing for this message.
286  */
287 void rds_recv_incoming(struct rds_connection *conn, struct in6_addr *saddr,
288 		       struct in6_addr *daddr,
289 		       struct rds_incoming *inc, gfp_t gfp)
290 {
291 	struct rds_sock *rs = NULL;
292 	struct sock *sk;
293 	unsigned long flags;
294 	struct rds_conn_path *cp;
295 
296 	inc->i_conn = conn;
297 	inc->i_rx_jiffies = jiffies;
298 	if (conn->c_trans->t_mp_capable)
299 		cp = inc->i_conn_path;
300 	else
301 		cp = &conn->c_path[0];
302 
303 	rdsdebug("conn %p next %llu inc %p seq %llu len %u sport %u dport %u "
304 		 "flags 0x%x rx_jiffies %lu\n", conn,
305 		 (unsigned long long)cp->cp_next_rx_seq,
306 		 inc,
307 		 (unsigned long long)be64_to_cpu(inc->i_hdr.h_sequence),
308 		 be32_to_cpu(inc->i_hdr.h_len),
309 		 be16_to_cpu(inc->i_hdr.h_sport),
310 		 be16_to_cpu(inc->i_hdr.h_dport),
311 		 inc->i_hdr.h_flags,
312 		 inc->i_rx_jiffies);
313 
314 	/*
315 	 * Sequence numbers should only increase.  Messages get their
316 	 * sequence number as they're queued in a sending conn.  They
317 	 * can be dropped, though, if the sending socket is closed before
318 	 * they hit the wire.  So sequence numbers can skip forward
319 	 * under normal operation.  They can also drop back in the conn
320 	 * failover case as previously sent messages are resent down the
321 	 * new instance of a conn.  We drop those, otherwise we have
322 	 * to assume that the next valid seq does not come after a
323 	 * hole in the fragment stream.
324 	 *
325 	 * The headers don't give us a way to realize if fragments of
326 	 * a message have been dropped.  We assume that frags that arrive
327 	 * to a flow are part of the current message on the flow that is
328 	 * being reassembled.  This means that senders can't drop messages
329 	 * from the sending conn until all their frags are sent.
330 	 *
331 	 * XXX we could spend more on the wire to get more robust failure
332 	 * detection, arguably worth it to avoid data corruption.
333 	 */
334 	if (be64_to_cpu(inc->i_hdr.h_sequence) < cp->cp_next_rx_seq &&
335 	    (inc->i_hdr.h_flags & RDS_FLAG_RETRANSMITTED)) {
336 		rds_stats_inc(s_recv_drop_old_seq);
337 		goto out;
338 	}
339 	cp->cp_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1;
340 
341 	if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) {
342 		if (inc->i_hdr.h_sport == 0) {
343 			rdsdebug("ignore ping with 0 sport from %pI6c\n",
344 				 saddr);
345 			goto out;
346 		}
347 		rds_stats_inc(s_recv_ping);
348 		rds_send_pong(cp, inc->i_hdr.h_sport);
349 		/* if this is a handshake ping, start multipath if necessary */
350 		if (RDS_HS_PROBE(be16_to_cpu(inc->i_hdr.h_sport),
351 				 be16_to_cpu(inc->i_hdr.h_dport))) {
352 			rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn);
353 			rds_start_mprds(cp->cp_conn);
354 		}
355 		goto out;
356 	}
357 
358 	if (be16_to_cpu(inc->i_hdr.h_dport) ==  RDS_FLAG_PROBE_PORT &&
359 	    inc->i_hdr.h_sport == 0) {
360 		rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn);
361 		/* if this is a handshake pong, start multipath if necessary */
362 		rds_start_mprds(cp->cp_conn);
363 		wake_up(&cp->cp_conn->c_hs_waitq);
364 		goto out;
365 	}
366 
367 	rs = rds_find_bound(daddr, inc->i_hdr.h_dport, conn->c_bound_if);
368 	if (!rs) {
369 		rds_stats_inc(s_recv_drop_no_sock);
370 		goto out;
371 	}
372 
373 	/* Process extension headers */
374 	rds_recv_incoming_exthdrs(inc, rs);
375 
376 	/* We can be racing with rds_release() which marks the socket dead. */
377 	sk = rds_rs_to_sk(rs);
378 
379 	/* serialize with rds_release -> sock_orphan */
380 	write_lock_irqsave(&rs->rs_recv_lock, flags);
381 	if (!sock_flag(sk, SOCK_DEAD)) {
382 		rdsdebug("adding inc %p to rs %p's recv queue\n", inc, rs);
383 		rds_stats_inc(s_recv_queued);
384 		rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
385 				      be32_to_cpu(inc->i_hdr.h_len),
386 				      inc->i_hdr.h_dport);
387 		if (sock_flag(sk, SOCK_RCVTSTAMP))
388 			do_gettimeofday(&inc->i_rx_tstamp);
389 		rds_inc_addref(inc);
390 		inc->i_rx_lat_trace[RDS_MSG_RX_END] = local_clock();
391 		list_add_tail(&inc->i_item, &rs->rs_recv_queue);
392 		__rds_wake_sk_sleep(sk);
393 	} else {
394 		rds_stats_inc(s_recv_drop_dead_sock);
395 	}
396 	write_unlock_irqrestore(&rs->rs_recv_lock, flags);
397 
398 out:
399 	if (rs)
400 		rds_sock_put(rs);
401 }
402 EXPORT_SYMBOL_GPL(rds_recv_incoming);
403 
404 /*
405  * be very careful here.  This is being called as the condition in
406  * wait_event_*() needs to cope with being called many times.
407  */
408 static int rds_next_incoming(struct rds_sock *rs, struct rds_incoming **inc)
409 {
410 	unsigned long flags;
411 
412 	if (!*inc) {
413 		read_lock_irqsave(&rs->rs_recv_lock, flags);
414 		if (!list_empty(&rs->rs_recv_queue)) {
415 			*inc = list_entry(rs->rs_recv_queue.next,
416 					  struct rds_incoming,
417 					  i_item);
418 			rds_inc_addref(*inc);
419 		}
420 		read_unlock_irqrestore(&rs->rs_recv_lock, flags);
421 	}
422 
423 	return *inc != NULL;
424 }
425 
426 static int rds_still_queued(struct rds_sock *rs, struct rds_incoming *inc,
427 			    int drop)
428 {
429 	struct sock *sk = rds_rs_to_sk(rs);
430 	int ret = 0;
431 	unsigned long flags;
432 
433 	write_lock_irqsave(&rs->rs_recv_lock, flags);
434 	if (!list_empty(&inc->i_item)) {
435 		ret = 1;
436 		if (drop) {
437 			/* XXX make sure this i_conn is reliable */
438 			rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
439 					      -be32_to_cpu(inc->i_hdr.h_len),
440 					      inc->i_hdr.h_dport);
441 			list_del_init(&inc->i_item);
442 			rds_inc_put(inc);
443 		}
444 	}
445 	write_unlock_irqrestore(&rs->rs_recv_lock, flags);
446 
447 	rdsdebug("inc %p rs %p still %d dropped %d\n", inc, rs, ret, drop);
448 	return ret;
449 }
450 
451 /*
452  * Pull errors off the error queue.
453  * If msghdr is NULL, we will just purge the error queue.
454  */
455 int rds_notify_queue_get(struct rds_sock *rs, struct msghdr *msghdr)
456 {
457 	struct rds_notifier *notifier;
458 	struct rds_rdma_notify cmsg = { 0 }; /* fill holes with zero */
459 	unsigned int count = 0, max_messages = ~0U;
460 	unsigned long flags;
461 	LIST_HEAD(copy);
462 	int err = 0;
463 
464 
465 	/* put_cmsg copies to user space and thus may sleep. We can't do this
466 	 * with rs_lock held, so first grab as many notifications as we can stuff
467 	 * in the user provided cmsg buffer. We don't try to copy more, to avoid
468 	 * losing notifications - except when the buffer is so small that it wouldn't
469 	 * even hold a single notification. Then we give him as much of this single
470 	 * msg as we can squeeze in, and set MSG_CTRUNC.
471 	 */
472 	if (msghdr) {
473 		max_messages = msghdr->msg_controllen / CMSG_SPACE(sizeof(cmsg));
474 		if (!max_messages)
475 			max_messages = 1;
476 	}
477 
478 	spin_lock_irqsave(&rs->rs_lock, flags);
479 	while (!list_empty(&rs->rs_notify_queue) && count < max_messages) {
480 		notifier = list_entry(rs->rs_notify_queue.next,
481 				struct rds_notifier, n_list);
482 		list_move(&notifier->n_list, &copy);
483 		count++;
484 	}
485 	spin_unlock_irqrestore(&rs->rs_lock, flags);
486 
487 	if (!count)
488 		return 0;
489 
490 	while (!list_empty(&copy)) {
491 		notifier = list_entry(copy.next, struct rds_notifier, n_list);
492 
493 		if (msghdr) {
494 			cmsg.user_token = notifier->n_user_token;
495 			cmsg.status = notifier->n_status;
496 
497 			err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_RDMA_STATUS,
498 				       sizeof(cmsg), &cmsg);
499 			if (err)
500 				break;
501 		}
502 
503 		list_del_init(&notifier->n_list);
504 		kfree(notifier);
505 	}
506 
507 	/* If we bailed out because of an error in put_cmsg,
508 	 * we may be left with one or more notifications that we
509 	 * didn't process. Return them to the head of the list. */
510 	if (!list_empty(&copy)) {
511 		spin_lock_irqsave(&rs->rs_lock, flags);
512 		list_splice(&copy, &rs->rs_notify_queue);
513 		spin_unlock_irqrestore(&rs->rs_lock, flags);
514 	}
515 
516 	return err;
517 }
518 
519 /*
520  * Queue a congestion notification
521  */
522 static int rds_notify_cong(struct rds_sock *rs, struct msghdr *msghdr)
523 {
524 	uint64_t notify = rs->rs_cong_notify;
525 	unsigned long flags;
526 	int err;
527 
528 	err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_CONG_UPDATE,
529 			sizeof(notify), &notify);
530 	if (err)
531 		return err;
532 
533 	spin_lock_irqsave(&rs->rs_lock, flags);
534 	rs->rs_cong_notify &= ~notify;
535 	spin_unlock_irqrestore(&rs->rs_lock, flags);
536 
537 	return 0;
538 }
539 
540 /*
541  * Receive any control messages.
542  */
543 static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg,
544 			 struct rds_sock *rs)
545 {
546 	int ret = 0;
547 
548 	if (inc->i_rdma_cookie) {
549 		ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RDMA_DEST,
550 				sizeof(inc->i_rdma_cookie), &inc->i_rdma_cookie);
551 		if (ret)
552 			goto out;
553 	}
554 
555 	if ((inc->i_rx_tstamp.tv_sec != 0) &&
556 	    sock_flag(rds_rs_to_sk(rs), SOCK_RCVTSTAMP)) {
557 		ret = put_cmsg(msg, SOL_SOCKET, SCM_TIMESTAMP,
558 			       sizeof(struct timeval),
559 			       &inc->i_rx_tstamp);
560 		if (ret)
561 			goto out;
562 	}
563 
564 	if (rs->rs_rx_traces) {
565 		struct rds_cmsg_rx_trace t;
566 		int i, j;
567 
568 		memset(&t, 0, sizeof(t));
569 		inc->i_rx_lat_trace[RDS_MSG_RX_CMSG] = local_clock();
570 		t.rx_traces =  rs->rs_rx_traces;
571 		for (i = 0; i < rs->rs_rx_traces; i++) {
572 			j = rs->rs_rx_trace[i];
573 			t.rx_trace_pos[i] = j;
574 			t.rx_trace[i] = inc->i_rx_lat_trace[j + 1] -
575 					  inc->i_rx_lat_trace[j];
576 		}
577 
578 		ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RXPATH_LATENCY,
579 			       sizeof(t), &t);
580 		if (ret)
581 			goto out;
582 	}
583 
584 out:
585 	return ret;
586 }
587 
588 static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
589 {
590 	struct rds_msg_zcopy_queue *q = &rs->rs_zcookie_queue;
591 	struct rds_msg_zcopy_info *info = NULL;
592 	struct rds_zcopy_cookies *done;
593 	unsigned long flags;
594 
595 	if (!msg->msg_control)
596 		return false;
597 
598 	if (!sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY) ||
599 	    msg->msg_controllen < CMSG_SPACE(sizeof(*done)))
600 		return false;
601 
602 	spin_lock_irqsave(&q->lock, flags);
603 	if (!list_empty(&q->zcookie_head)) {
604 		info = list_entry(q->zcookie_head.next,
605 				  struct rds_msg_zcopy_info, rs_zcookie_next);
606 		list_del(&info->rs_zcookie_next);
607 	}
608 	spin_unlock_irqrestore(&q->lock, flags);
609 	if (!info)
610 		return false;
611 	done = &info->zcookies;
612 	if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done),
613 		     done)) {
614 		spin_lock_irqsave(&q->lock, flags);
615 		list_add(&info->rs_zcookie_next, &q->zcookie_head);
616 		spin_unlock_irqrestore(&q->lock, flags);
617 		return false;
618 	}
619 	kfree(info);
620 	return true;
621 }
622 
623 int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
624 		int msg_flags)
625 {
626 	struct sock *sk = sock->sk;
627 	struct rds_sock *rs = rds_sk_to_rs(sk);
628 	long timeo;
629 	int ret = 0, nonblock = msg_flags & MSG_DONTWAIT;
630 	DECLARE_SOCKADDR(struct sockaddr_in6 *, sin6, msg->msg_name);
631 	DECLARE_SOCKADDR(struct sockaddr_in *, sin, msg->msg_name);
632 	struct rds_incoming *inc = NULL;
633 
634 	/* udp_recvmsg()->sock_recvtimeo() gets away without locking too.. */
635 	timeo = sock_rcvtimeo(sk, nonblock);
636 
637 	rdsdebug("size %zu flags 0x%x timeo %ld\n", size, msg_flags, timeo);
638 
639 	if (msg_flags & MSG_OOB)
640 		goto out;
641 	if (msg_flags & MSG_ERRQUEUE)
642 		return sock_recv_errqueue(sk, msg, size, SOL_IP, IP_RECVERR);
643 
644 	while (1) {
645 		/* If there are pending notifications, do those - and nothing else */
646 		if (!list_empty(&rs->rs_notify_queue)) {
647 			ret = rds_notify_queue_get(rs, msg);
648 			break;
649 		}
650 
651 		if (rs->rs_cong_notify) {
652 			ret = rds_notify_cong(rs, msg);
653 			break;
654 		}
655 
656 		if (!rds_next_incoming(rs, &inc)) {
657 			if (nonblock) {
658 				bool reaped = rds_recvmsg_zcookie(rs, msg);
659 
660 				ret = reaped ?  0 : -EAGAIN;
661 				break;
662 			}
663 
664 			timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
665 					(!list_empty(&rs->rs_notify_queue) ||
666 					 rs->rs_cong_notify ||
667 					 rds_next_incoming(rs, &inc)), timeo);
668 			rdsdebug("recvmsg woke inc %p timeo %ld\n", inc,
669 				 timeo);
670 			if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
671 				continue;
672 
673 			ret = timeo;
674 			if (ret == 0)
675 				ret = -ETIMEDOUT;
676 			break;
677 		}
678 
679 		rdsdebug("copying inc %p from %pI6c:%u to user\n", inc,
680 			 &inc->i_conn->c_faddr,
681 			 ntohs(inc->i_hdr.h_sport));
682 		ret = inc->i_conn->c_trans->inc_copy_to_user(inc, &msg->msg_iter);
683 		if (ret < 0)
684 			break;
685 
686 		/*
687 		 * if the message we just copied isn't at the head of the
688 		 * recv queue then someone else raced us to return it, try
689 		 * to get the next message.
690 		 */
691 		if (!rds_still_queued(rs, inc, !(msg_flags & MSG_PEEK))) {
692 			rds_inc_put(inc);
693 			inc = NULL;
694 			rds_stats_inc(s_recv_deliver_raced);
695 			iov_iter_revert(&msg->msg_iter, ret);
696 			continue;
697 		}
698 
699 		if (ret < be32_to_cpu(inc->i_hdr.h_len)) {
700 			if (msg_flags & MSG_TRUNC)
701 				ret = be32_to_cpu(inc->i_hdr.h_len);
702 			msg->msg_flags |= MSG_TRUNC;
703 		}
704 
705 		if (rds_cmsg_recv(inc, msg, rs)) {
706 			ret = -EFAULT;
707 			goto out;
708 		}
709 		rds_recvmsg_zcookie(rs, msg);
710 
711 		rds_stats_inc(s_recv_delivered);
712 
713 		if (msg->msg_name) {
714 			if (ipv6_addr_v4mapped(&inc->i_saddr)) {
715 				sin = (struct sockaddr_in *)msg->msg_name;
716 
717 				sin->sin_family = AF_INET;
718 				sin->sin_port = inc->i_hdr.h_sport;
719 				sin->sin_addr.s_addr =
720 				    inc->i_saddr.s6_addr32[3];
721 				memset(sin->sin_zero, 0, sizeof(sin->sin_zero));
722 				msg->msg_namelen = sizeof(*sin);
723 			} else {
724 				sin6 = (struct sockaddr_in6 *)msg->msg_name;
725 
726 				sin6->sin6_family = AF_INET6;
727 				sin6->sin6_port = inc->i_hdr.h_sport;
728 				sin6->sin6_addr = inc->i_saddr;
729 				sin6->sin6_flowinfo = 0;
730 				sin6->sin6_scope_id = rs->rs_bound_scope_id;
731 				msg->msg_namelen = sizeof(*sin6);
732 			}
733 		}
734 		break;
735 	}
736 
737 	if (inc)
738 		rds_inc_put(inc);
739 
740 out:
741 	return ret;
742 }
743 
744 /*
745  * The socket is being shut down and we're asked to drop messages that were
746  * queued for recvmsg.  The caller has unbound the socket so the receive path
747  * won't queue any more incoming fragments or messages on the socket.
748  */
749 void rds_clear_recv_queue(struct rds_sock *rs)
750 {
751 	struct sock *sk = rds_rs_to_sk(rs);
752 	struct rds_incoming *inc, *tmp;
753 	unsigned long flags;
754 
755 	write_lock_irqsave(&rs->rs_recv_lock, flags);
756 	list_for_each_entry_safe(inc, tmp, &rs->rs_recv_queue, i_item) {
757 		rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
758 				      -be32_to_cpu(inc->i_hdr.h_len),
759 				      inc->i_hdr.h_dport);
760 		list_del_init(&inc->i_item);
761 		rds_inc_put(inc);
762 	}
763 	write_unlock_irqrestore(&rs->rs_recv_lock, flags);
764 }
765 
766 /*
767  * inc->i_saddr isn't used here because it is only set in the receive
768  * path.
769  */
770 void rds_inc_info_copy(struct rds_incoming *inc,
771 		       struct rds_info_iterator *iter,
772 		       __be32 saddr, __be32 daddr, int flip)
773 {
774 	struct rds_info_message minfo;
775 
776 	minfo.seq = be64_to_cpu(inc->i_hdr.h_sequence);
777 	minfo.len = be32_to_cpu(inc->i_hdr.h_len);
778 
779 	if (flip) {
780 		minfo.laddr = daddr;
781 		minfo.faddr = saddr;
782 		minfo.lport = inc->i_hdr.h_dport;
783 		minfo.fport = inc->i_hdr.h_sport;
784 	} else {
785 		minfo.laddr = saddr;
786 		minfo.faddr = daddr;
787 		minfo.lport = inc->i_hdr.h_sport;
788 		minfo.fport = inc->i_hdr.h_dport;
789 	}
790 
791 	minfo.flags = 0;
792 
793 	rds_info_copy(iter, &minfo, sizeof(minfo));
794 }
795 
796 #if IS_ENABLED(CONFIG_IPV6)
797 void rds6_inc_info_copy(struct rds_incoming *inc,
798 			struct rds_info_iterator *iter,
799 			struct in6_addr *saddr, struct in6_addr *daddr,
800 			int flip)
801 {
802 	struct rds6_info_message minfo6;
803 
804 	minfo6.seq = be64_to_cpu(inc->i_hdr.h_sequence);
805 	minfo6.len = be32_to_cpu(inc->i_hdr.h_len);
806 
807 	if (flip) {
808 		minfo6.laddr = *daddr;
809 		minfo6.faddr = *saddr;
810 		minfo6.lport = inc->i_hdr.h_dport;
811 		minfo6.fport = inc->i_hdr.h_sport;
812 	} else {
813 		minfo6.laddr = *saddr;
814 		minfo6.faddr = *daddr;
815 		minfo6.lport = inc->i_hdr.h_sport;
816 		minfo6.fport = inc->i_hdr.h_dport;
817 	}
818 
819 	rds_info_copy(iter, &minfo6, sizeof(minfo6));
820 }
821 #endif
822