xref: /openbmc/linux/net/rds/send.c (revision 675aaf05)
1  /*
2   * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
3   *
4   * This software is available to you under a choice of one of two
5   * licenses.  You may choose to be licensed under the terms of the GNU
6   * General Public License (GPL) Version 2, available from the file
7   * COPYING in the main directory of this source tree, or the
8   * OpenIB.org BSD license below:
9   *
10   *     Redistribution and use in source and binary forms, with or
11   *     without modification, are permitted provided that the following
12   *     conditions are met:
13   *
14   *      - Redistributions of source code must retain the above
15   *        copyright notice, this list of conditions and the following
16   *        disclaimer.
17   *
18   *      - Redistributions in binary form must reproduce the above
19   *        copyright notice, this list of conditions and the following
20   *        disclaimer in the documentation and/or other materials
21   *        provided with the distribution.
22   *
23   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24   * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25   * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26   * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27   * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28   * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29   * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30   * SOFTWARE.
31   *
32   */
33  #include <linux/kernel.h>
34  #include <linux/moduleparam.h>
35  #include <linux/gfp.h>
36  #include <net/sock.h>
37  #include <linux/in.h>
38  #include <linux/list.h>
39  #include <linux/ratelimit.h>
40  #include <linux/export.h>
41  #include <linux/sizes.h>
42  
43  #include "rds.h"
44  
45  /* When transmitting messages in rds_send_xmit, we need to emerge from
46   * time to time and briefly release the CPU. Otherwise the softlock watchdog
47   * will kick our shin.
48   * Also, it seems fairer to not let one busy connection stall all the
49   * others.
50   *
51   * send_batch_count is the number of times we'll loop in send_xmit. Setting
52   * it to 0 will restore the old behavior (where we looped until we had
53   * drained the queue).
54   */
55  static int send_batch_count = SZ_1K;
56  module_param(send_batch_count, int, 0444);
57  MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
58  
59  static void rds_send_remove_from_sock(struct list_head *messages, int status);
60  
61  /*
62   * Reset the send state.  Callers must ensure that this doesn't race with
63   * rds_send_xmit().
64   */
65  void rds_send_path_reset(struct rds_conn_path *cp)
66  {
67  	struct rds_message *rm, *tmp;
68  	unsigned long flags;
69  
70  	if (cp->cp_xmit_rm) {
71  		rm = cp->cp_xmit_rm;
72  		cp->cp_xmit_rm = NULL;
73  		/* Tell the user the RDMA op is no longer mapped by the
74  		 * transport. This isn't entirely true (it's flushed out
75  		 * independently) but as the connection is down, there's
76  		 * no ongoing RDMA to/from that memory */
77  		rds_message_unmapped(rm);
78  		rds_message_put(rm);
79  	}
80  
81  	cp->cp_xmit_sg = 0;
82  	cp->cp_xmit_hdr_off = 0;
83  	cp->cp_xmit_data_off = 0;
84  	cp->cp_xmit_atomic_sent = 0;
85  	cp->cp_xmit_rdma_sent = 0;
86  	cp->cp_xmit_data_sent = 0;
87  
88  	cp->cp_conn->c_map_queued = 0;
89  
90  	cp->cp_unacked_packets = rds_sysctl_max_unacked_packets;
91  	cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes;
92  
93  	/* Mark messages as retransmissions, and move them to the send q */
94  	spin_lock_irqsave(&cp->cp_lock, flags);
95  	list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
96  		set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
97  		set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
98  	}
99  	list_splice_init(&cp->cp_retrans, &cp->cp_send_queue);
100  	spin_unlock_irqrestore(&cp->cp_lock, flags);
101  }
102  EXPORT_SYMBOL_GPL(rds_send_path_reset);
103  
104  static int acquire_in_xmit(struct rds_conn_path *cp)
105  {
106  	return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0;
107  }
108  
109  static void release_in_xmit(struct rds_conn_path *cp)
110  {
111  	clear_bit(RDS_IN_XMIT, &cp->cp_flags);
112  	smp_mb__after_atomic();
113  	/*
114  	 * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
115  	 * hot path and finding waiters is very rare.  We don't want to walk
116  	 * the system-wide hashed waitqueue buckets in the fast path only to
117  	 * almost never find waiters.
118  	 */
119  	if (waitqueue_active(&cp->cp_waitq))
120  		wake_up_all(&cp->cp_waitq);
121  }
122  
123  /*
124   * We're making the conscious trade-off here to only send one message
125   * down the connection at a time.
126   *   Pro:
127   *      - tx queueing is a simple fifo list
128   *   	- reassembly is optional and easily done by transports per conn
129   *      - no per flow rx lookup at all, straight to the socket
130   *   	- less per-frag memory and wire overhead
131   *   Con:
132   *      - queued acks can be delayed behind large messages
133   *   Depends:
134   *      - small message latency is higher behind queued large messages
135   *      - large message latency isn't starved by intervening small sends
136   */
137  int rds_send_xmit(struct rds_conn_path *cp)
138  {
139  	struct rds_connection *conn = cp->cp_conn;
140  	struct rds_message *rm;
141  	unsigned long flags;
142  	unsigned int tmp;
143  	struct scatterlist *sg;
144  	int ret = 0;
145  	LIST_HEAD(to_be_dropped);
146  	int batch_count;
147  	unsigned long send_gen = 0;
148  
149  restart:
150  	batch_count = 0;
151  
152  	/*
153  	 * sendmsg calls here after having queued its message on the send
154  	 * queue.  We only have one task feeding the connection at a time.  If
155  	 * another thread is already feeding the queue then we back off.  This
156  	 * avoids blocking the caller and trading per-connection data between
157  	 * caches per message.
158  	 */
159  	if (!acquire_in_xmit(cp)) {
160  		rds_stats_inc(s_send_lock_contention);
161  		ret = -ENOMEM;
162  		goto out;
163  	}
164  
165  	if (rds_destroy_pending(cp->cp_conn)) {
166  		release_in_xmit(cp);
167  		ret = -ENETUNREACH; /* dont requeue send work */
168  		goto out;
169  	}
170  
171  	/*
172  	 * we record the send generation after doing the xmit acquire.
173  	 * if someone else manages to jump in and do some work, we'll use
174  	 * this to avoid a goto restart farther down.
175  	 *
176  	 * The acquire_in_xmit() check above ensures that only one
177  	 * caller can increment c_send_gen at any time.
178  	 */
179  	send_gen = READ_ONCE(cp->cp_send_gen) + 1;
180  	WRITE_ONCE(cp->cp_send_gen, send_gen);
181  
182  	/*
183  	 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
184  	 * we do the opposite to avoid races.
185  	 */
186  	if (!rds_conn_path_up(cp)) {
187  		release_in_xmit(cp);
188  		ret = 0;
189  		goto out;
190  	}
191  
192  	if (conn->c_trans->xmit_path_prepare)
193  		conn->c_trans->xmit_path_prepare(cp);
194  
195  	/*
196  	 * spin trying to push headers and data down the connection until
197  	 * the connection doesn't make forward progress.
198  	 */
199  	while (1) {
200  
201  		rm = cp->cp_xmit_rm;
202  
203  		/*
204  		 * If between sending messages, we can send a pending congestion
205  		 * map update.
206  		 */
207  		if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) {
208  			rm = rds_cong_update_alloc(conn);
209  			if (IS_ERR(rm)) {
210  				ret = PTR_ERR(rm);
211  				break;
212  			}
213  			rm->data.op_active = 1;
214  			rm->m_inc.i_conn_path = cp;
215  			rm->m_inc.i_conn = cp->cp_conn;
216  
217  			cp->cp_xmit_rm = rm;
218  		}
219  
220  		/*
221  		 * If not already working on one, grab the next message.
222  		 *
223  		 * cp_xmit_rm holds a ref while we're sending this message down
224  		 * the connction.  We can use this ref while holding the
225  		 * send_sem.. rds_send_reset() is serialized with it.
226  		 */
227  		if (!rm) {
228  			unsigned int len;
229  
230  			batch_count++;
231  
232  			/* we want to process as big a batch as we can, but
233  			 * we also want to avoid softlockups.  If we've been
234  			 * through a lot of messages, lets back off and see
235  			 * if anyone else jumps in
236  			 */
237  			if (batch_count >= send_batch_count)
238  				goto over_batch;
239  
240  			spin_lock_irqsave(&cp->cp_lock, flags);
241  
242  			if (!list_empty(&cp->cp_send_queue)) {
243  				rm = list_entry(cp->cp_send_queue.next,
244  						struct rds_message,
245  						m_conn_item);
246  				rds_message_addref(rm);
247  
248  				/*
249  				 * Move the message from the send queue to the retransmit
250  				 * list right away.
251  				 */
252  				list_move_tail(&rm->m_conn_item,
253  					       &cp->cp_retrans);
254  			}
255  
256  			spin_unlock_irqrestore(&cp->cp_lock, flags);
257  
258  			if (!rm)
259  				break;
260  
261  			/* Unfortunately, the way Infiniband deals with
262  			 * RDMA to a bad MR key is by moving the entire
263  			 * queue pair to error state. We cold possibly
264  			 * recover from that, but right now we drop the
265  			 * connection.
266  			 * Therefore, we never retransmit messages with RDMA ops.
267  			 */
268  			if (test_bit(RDS_MSG_FLUSH, &rm->m_flags) ||
269  			    (rm->rdma.op_active &&
270  			    test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))) {
271  				spin_lock_irqsave(&cp->cp_lock, flags);
272  				if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
273  					list_move(&rm->m_conn_item, &to_be_dropped);
274  				spin_unlock_irqrestore(&cp->cp_lock, flags);
275  				continue;
276  			}
277  
278  			/* Require an ACK every once in a while */
279  			len = ntohl(rm->m_inc.i_hdr.h_len);
280  			if (cp->cp_unacked_packets == 0 ||
281  			    cp->cp_unacked_bytes < len) {
282  				set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
283  
284  				cp->cp_unacked_packets =
285  					rds_sysctl_max_unacked_packets;
286  				cp->cp_unacked_bytes =
287  					rds_sysctl_max_unacked_bytes;
288  				rds_stats_inc(s_send_ack_required);
289  			} else {
290  				cp->cp_unacked_bytes -= len;
291  				cp->cp_unacked_packets--;
292  			}
293  
294  			cp->cp_xmit_rm = rm;
295  		}
296  
297  		/* The transport either sends the whole rdma or none of it */
298  		if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) {
299  			rm->m_final_op = &rm->rdma;
300  			/* The transport owns the mapped memory for now.
301  			 * You can't unmap it while it's on the send queue
302  			 */
303  			set_bit(RDS_MSG_MAPPED, &rm->m_flags);
304  			ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
305  			if (ret) {
306  				clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
307  				wake_up_interruptible(&rm->m_flush_wait);
308  				break;
309  			}
310  			cp->cp_xmit_rdma_sent = 1;
311  
312  		}
313  
314  		if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) {
315  			rm->m_final_op = &rm->atomic;
316  			/* The transport owns the mapped memory for now.
317  			 * You can't unmap it while it's on the send queue
318  			 */
319  			set_bit(RDS_MSG_MAPPED, &rm->m_flags);
320  			ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
321  			if (ret) {
322  				clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
323  				wake_up_interruptible(&rm->m_flush_wait);
324  				break;
325  			}
326  			cp->cp_xmit_atomic_sent = 1;
327  
328  		}
329  
330  		/*
331  		 * A number of cases require an RDS header to be sent
332  		 * even if there is no data.
333  		 * We permit 0-byte sends; rds-ping depends on this.
334  		 * However, if there are exclusively attached silent ops,
335  		 * we skip the hdr/data send, to enable silent operation.
336  		 */
337  		if (rm->data.op_nents == 0) {
338  			int ops_present;
339  			int all_ops_are_silent = 1;
340  
341  			ops_present = (rm->atomic.op_active || rm->rdma.op_active);
342  			if (rm->atomic.op_active && !rm->atomic.op_silent)
343  				all_ops_are_silent = 0;
344  			if (rm->rdma.op_active && !rm->rdma.op_silent)
345  				all_ops_are_silent = 0;
346  
347  			if (ops_present && all_ops_are_silent
348  			    && !rm->m_rdma_cookie)
349  				rm->data.op_active = 0;
350  		}
351  
352  		if (rm->data.op_active && !cp->cp_xmit_data_sent) {
353  			rm->m_final_op = &rm->data;
354  
355  			ret = conn->c_trans->xmit(conn, rm,
356  						  cp->cp_xmit_hdr_off,
357  						  cp->cp_xmit_sg,
358  						  cp->cp_xmit_data_off);
359  			if (ret <= 0)
360  				break;
361  
362  			if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) {
363  				tmp = min_t(int, ret,
364  					    sizeof(struct rds_header) -
365  					    cp->cp_xmit_hdr_off);
366  				cp->cp_xmit_hdr_off += tmp;
367  				ret -= tmp;
368  			}
369  
370  			sg = &rm->data.op_sg[cp->cp_xmit_sg];
371  			while (ret) {
372  				tmp = min_t(int, ret, sg->length -
373  						      cp->cp_xmit_data_off);
374  				cp->cp_xmit_data_off += tmp;
375  				ret -= tmp;
376  				if (cp->cp_xmit_data_off == sg->length) {
377  					cp->cp_xmit_data_off = 0;
378  					sg++;
379  					cp->cp_xmit_sg++;
380  					BUG_ON(ret != 0 && cp->cp_xmit_sg ==
381  					       rm->data.op_nents);
382  				}
383  			}
384  
385  			if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) &&
386  			    (cp->cp_xmit_sg == rm->data.op_nents))
387  				cp->cp_xmit_data_sent = 1;
388  		}
389  
390  		/*
391  		 * A rm will only take multiple times through this loop
392  		 * if there is a data op. Thus, if the data is sent (or there was
393  		 * none), then we're done with the rm.
394  		 */
395  		if (!rm->data.op_active || cp->cp_xmit_data_sent) {
396  			cp->cp_xmit_rm = NULL;
397  			cp->cp_xmit_sg = 0;
398  			cp->cp_xmit_hdr_off = 0;
399  			cp->cp_xmit_data_off = 0;
400  			cp->cp_xmit_rdma_sent = 0;
401  			cp->cp_xmit_atomic_sent = 0;
402  			cp->cp_xmit_data_sent = 0;
403  
404  			rds_message_put(rm);
405  		}
406  	}
407  
408  over_batch:
409  	if (conn->c_trans->xmit_path_complete)
410  		conn->c_trans->xmit_path_complete(cp);
411  	release_in_xmit(cp);
412  
413  	/* Nuke any messages we decided not to retransmit. */
414  	if (!list_empty(&to_be_dropped)) {
415  		/* irqs on here, so we can put(), unlike above */
416  		list_for_each_entry(rm, &to_be_dropped, m_conn_item)
417  			rds_message_put(rm);
418  		rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
419  	}
420  
421  	/*
422  	 * Other senders can queue a message after we last test the send queue
423  	 * but before we clear RDS_IN_XMIT.  In that case they'd back off and
424  	 * not try and send their newly queued message.  We need to check the
425  	 * send queue after having cleared RDS_IN_XMIT so that their message
426  	 * doesn't get stuck on the send queue.
427  	 *
428  	 * If the transport cannot continue (i.e ret != 0), then it must
429  	 * call us when more room is available, such as from the tx
430  	 * completion handler.
431  	 *
432  	 * We have an extra generation check here so that if someone manages
433  	 * to jump in after our release_in_xmit, we'll see that they have done
434  	 * some work and we will skip our goto
435  	 */
436  	if (ret == 0) {
437  		bool raced;
438  
439  		smp_mb();
440  		raced = send_gen != READ_ONCE(cp->cp_send_gen);
441  
442  		if ((test_bit(0, &conn->c_map_queued) ||
443  		    !list_empty(&cp->cp_send_queue)) && !raced) {
444  			if (batch_count < send_batch_count)
445  				goto restart;
446  			rcu_read_lock();
447  			if (rds_destroy_pending(cp->cp_conn))
448  				ret = -ENETUNREACH;
449  			else
450  				queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
451  			rcu_read_unlock();
452  		} else if (raced) {
453  			rds_stats_inc(s_send_lock_queue_raced);
454  		}
455  	}
456  out:
457  	return ret;
458  }
459  EXPORT_SYMBOL_GPL(rds_send_xmit);
460  
461  static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm)
462  {
463  	u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
464  
465  	assert_spin_locked(&rs->rs_lock);
466  
467  	BUG_ON(rs->rs_snd_bytes < len);
468  	rs->rs_snd_bytes -= len;
469  
470  	if (rs->rs_snd_bytes == 0)
471  		rds_stats_inc(s_send_queue_empty);
472  }
473  
474  static inline int rds_send_is_acked(struct rds_message *rm, u64 ack,
475  				    is_acked_func is_acked)
476  {
477  	if (is_acked)
478  		return is_acked(rm, ack);
479  	return be64_to_cpu(rm->m_inc.i_hdr.h_sequence) <= ack;
480  }
481  
482  /*
483   * This is pretty similar to what happens below in the ACK
484   * handling code - except that we call here as soon as we get
485   * the IB send completion on the RDMA op and the accompanying
486   * message.
487   */
488  void rds_rdma_send_complete(struct rds_message *rm, int status)
489  {
490  	struct rds_sock *rs = NULL;
491  	struct rm_rdma_op *ro;
492  	struct rds_notifier *notifier;
493  	unsigned long flags;
494  
495  	spin_lock_irqsave(&rm->m_rs_lock, flags);
496  
497  	ro = &rm->rdma;
498  	if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
499  	    ro->op_active && ro->op_notify && ro->op_notifier) {
500  		notifier = ro->op_notifier;
501  		rs = rm->m_rs;
502  		sock_hold(rds_rs_to_sk(rs));
503  
504  		notifier->n_status = status;
505  		spin_lock(&rs->rs_lock);
506  		list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
507  		spin_unlock(&rs->rs_lock);
508  
509  		ro->op_notifier = NULL;
510  	}
511  
512  	spin_unlock_irqrestore(&rm->m_rs_lock, flags);
513  
514  	if (rs) {
515  		rds_wake_sk_sleep(rs);
516  		sock_put(rds_rs_to_sk(rs));
517  	}
518  }
519  EXPORT_SYMBOL_GPL(rds_rdma_send_complete);
520  
521  /*
522   * Just like above, except looks at atomic op
523   */
524  void rds_atomic_send_complete(struct rds_message *rm, int status)
525  {
526  	struct rds_sock *rs = NULL;
527  	struct rm_atomic_op *ao;
528  	struct rds_notifier *notifier;
529  	unsigned long flags;
530  
531  	spin_lock_irqsave(&rm->m_rs_lock, flags);
532  
533  	ao = &rm->atomic;
534  	if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
535  	    && ao->op_active && ao->op_notify && ao->op_notifier) {
536  		notifier = ao->op_notifier;
537  		rs = rm->m_rs;
538  		sock_hold(rds_rs_to_sk(rs));
539  
540  		notifier->n_status = status;
541  		spin_lock(&rs->rs_lock);
542  		list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
543  		spin_unlock(&rs->rs_lock);
544  
545  		ao->op_notifier = NULL;
546  	}
547  
548  	spin_unlock_irqrestore(&rm->m_rs_lock, flags);
549  
550  	if (rs) {
551  		rds_wake_sk_sleep(rs);
552  		sock_put(rds_rs_to_sk(rs));
553  	}
554  }
555  EXPORT_SYMBOL_GPL(rds_atomic_send_complete);
556  
557  /*
558   * This is the same as rds_rdma_send_complete except we
559   * don't do any locking - we have all the ingredients (message,
560   * socket, socket lock) and can just move the notifier.
561   */
562  static inline void
563  __rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status)
564  {
565  	struct rm_rdma_op *ro;
566  	struct rm_atomic_op *ao;
567  
568  	ro = &rm->rdma;
569  	if (ro->op_active && ro->op_notify && ro->op_notifier) {
570  		ro->op_notifier->n_status = status;
571  		list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue);
572  		ro->op_notifier = NULL;
573  	}
574  
575  	ao = &rm->atomic;
576  	if (ao->op_active && ao->op_notify && ao->op_notifier) {
577  		ao->op_notifier->n_status = status;
578  		list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue);
579  		ao->op_notifier = NULL;
580  	}
581  
582  	/* No need to wake the app - caller does this */
583  }
584  
585  /*
586   * This removes messages from the socket's list if they're on it.  The list
587   * argument must be private to the caller, we must be able to modify it
588   * without locks.  The messages must have a reference held for their
589   * position on the list.  This function will drop that reference after
590   * removing the messages from the 'messages' list regardless of if it found
591   * the messages on the socket list or not.
592   */
593  static void rds_send_remove_from_sock(struct list_head *messages, int status)
594  {
595  	unsigned long flags;
596  	struct rds_sock *rs = NULL;
597  	struct rds_message *rm;
598  
599  	while (!list_empty(messages)) {
600  		int was_on_sock = 0;
601  
602  		rm = list_entry(messages->next, struct rds_message,
603  				m_conn_item);
604  		list_del_init(&rm->m_conn_item);
605  
606  		/*
607  		 * If we see this flag cleared then we're *sure* that someone
608  		 * else beat us to removing it from the sock.  If we race
609  		 * with their flag update we'll get the lock and then really
610  		 * see that the flag has been cleared.
611  		 *
612  		 * The message spinlock makes sure nobody clears rm->m_rs
613  		 * while we're messing with it. It does not prevent the
614  		 * message from being removed from the socket, though.
615  		 */
616  		spin_lock_irqsave(&rm->m_rs_lock, flags);
617  		if (!test_bit(RDS_MSG_ON_SOCK, &rm->m_flags))
618  			goto unlock_and_drop;
619  
620  		if (rs != rm->m_rs) {
621  			if (rs) {
622  				rds_wake_sk_sleep(rs);
623  				sock_put(rds_rs_to_sk(rs));
624  			}
625  			rs = rm->m_rs;
626  			if (rs)
627  				sock_hold(rds_rs_to_sk(rs));
628  		}
629  		if (!rs)
630  			goto unlock_and_drop;
631  		spin_lock(&rs->rs_lock);
632  
633  		if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) {
634  			struct rm_rdma_op *ro = &rm->rdma;
635  			struct rds_notifier *notifier;
636  
637  			list_del_init(&rm->m_sock_item);
638  			rds_send_sndbuf_remove(rs, rm);
639  
640  			if (ro->op_active && ro->op_notifier &&
641  			       (ro->op_notify || (ro->op_recverr && status))) {
642  				notifier = ro->op_notifier;
643  				list_add_tail(&notifier->n_list,
644  						&rs->rs_notify_queue);
645  				if (!notifier->n_status)
646  					notifier->n_status = status;
647  				rm->rdma.op_notifier = NULL;
648  			}
649  			was_on_sock = 1;
650  		}
651  		spin_unlock(&rs->rs_lock);
652  
653  unlock_and_drop:
654  		spin_unlock_irqrestore(&rm->m_rs_lock, flags);
655  		rds_message_put(rm);
656  		if (was_on_sock)
657  			rds_message_put(rm);
658  	}
659  
660  	if (rs) {
661  		rds_wake_sk_sleep(rs);
662  		sock_put(rds_rs_to_sk(rs));
663  	}
664  }
665  
666  /*
667   * Transports call here when they've determined that the receiver queued
668   * messages up to, and including, the given sequence number.  Messages are
669   * moved to the retrans queue when rds_send_xmit picks them off the send
670   * queue. This means that in the TCP case, the message may not have been
671   * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked
672   * checks the RDS_MSG_HAS_ACK_SEQ bit.
673   */
674  void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack,
675  			      is_acked_func is_acked)
676  {
677  	struct rds_message *rm, *tmp;
678  	unsigned long flags;
679  	LIST_HEAD(list);
680  
681  	spin_lock_irqsave(&cp->cp_lock, flags);
682  
683  	list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
684  		if (!rds_send_is_acked(rm, ack, is_acked))
685  			break;
686  
687  		list_move(&rm->m_conn_item, &list);
688  		clear_bit(RDS_MSG_ON_CONN, &rm->m_flags);
689  	}
690  
691  	/* order flag updates with spin locks */
692  	if (!list_empty(&list))
693  		smp_mb__after_atomic();
694  
695  	spin_unlock_irqrestore(&cp->cp_lock, flags);
696  
697  	/* now remove the messages from the sock list as needed */
698  	rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS);
699  }
700  EXPORT_SYMBOL_GPL(rds_send_path_drop_acked);
701  
702  void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
703  			 is_acked_func is_acked)
704  {
705  	WARN_ON(conn->c_trans->t_mp_capable);
706  	rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked);
707  }
708  EXPORT_SYMBOL_GPL(rds_send_drop_acked);
709  
710  void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
711  {
712  	struct rds_message *rm, *tmp;
713  	struct rds_connection *conn;
714  	struct rds_conn_path *cp;
715  	unsigned long flags;
716  	LIST_HEAD(list);
717  
718  	/* get all the messages we're dropping under the rs lock */
719  	spin_lock_irqsave(&rs->rs_lock, flags);
720  
721  	list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) {
722  		if (dest &&
723  		    (!ipv6_addr_equal(&dest->sin6_addr, &rm->m_daddr) ||
724  		     dest->sin6_port != rm->m_inc.i_hdr.h_dport))
725  			continue;
726  
727  		list_move(&rm->m_sock_item, &list);
728  		rds_send_sndbuf_remove(rs, rm);
729  		clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
730  	}
731  
732  	/* order flag updates with the rs lock */
733  	smp_mb__after_atomic();
734  
735  	spin_unlock_irqrestore(&rs->rs_lock, flags);
736  
737  	if (list_empty(&list))
738  		return;
739  
740  	/* Remove the messages from the conn */
741  	list_for_each_entry(rm, &list, m_sock_item) {
742  
743  		conn = rm->m_inc.i_conn;
744  		if (conn->c_trans->t_mp_capable)
745  			cp = rm->m_inc.i_conn_path;
746  		else
747  			cp = &conn->c_path[0];
748  
749  		spin_lock_irqsave(&cp->cp_lock, flags);
750  		/*
751  		 * Maybe someone else beat us to removing rm from the conn.
752  		 * If we race with their flag update we'll get the lock and
753  		 * then really see that the flag has been cleared.
754  		 */
755  		if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
756  			spin_unlock_irqrestore(&cp->cp_lock, flags);
757  			continue;
758  		}
759  		list_del_init(&rm->m_conn_item);
760  		spin_unlock_irqrestore(&cp->cp_lock, flags);
761  
762  		/*
763  		 * Couldn't grab m_rs_lock in top loop (lock ordering),
764  		 * but we can now.
765  		 */
766  		spin_lock_irqsave(&rm->m_rs_lock, flags);
767  
768  		spin_lock(&rs->rs_lock);
769  		__rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
770  		spin_unlock(&rs->rs_lock);
771  
772  		spin_unlock_irqrestore(&rm->m_rs_lock, flags);
773  
774  		rds_message_put(rm);
775  	}
776  
777  	rds_wake_sk_sleep(rs);
778  
779  	while (!list_empty(&list)) {
780  		rm = list_entry(list.next, struct rds_message, m_sock_item);
781  		list_del_init(&rm->m_sock_item);
782  		rds_message_wait(rm);
783  
784  		/* just in case the code above skipped this message
785  		 * because RDS_MSG_ON_CONN wasn't set, run it again here
786  		 * taking m_rs_lock is the only thing that keeps us
787  		 * from racing with ack processing.
788  		 */
789  		spin_lock_irqsave(&rm->m_rs_lock, flags);
790  
791  		spin_lock(&rs->rs_lock);
792  		__rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
793  		spin_unlock(&rs->rs_lock);
794  
795  		spin_unlock_irqrestore(&rm->m_rs_lock, flags);
796  
797  		rds_message_put(rm);
798  	}
799  }
800  
801  /*
802   * we only want this to fire once so we use the callers 'queued'.  It's
803   * possible that another thread can race with us and remove the
804   * message from the flow with RDS_CANCEL_SENT_TO.
805   */
806  static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
807  			     struct rds_conn_path *cp,
808  			     struct rds_message *rm, __be16 sport,
809  			     __be16 dport, int *queued)
810  {
811  	unsigned long flags;
812  	u32 len;
813  
814  	if (*queued)
815  		goto out;
816  
817  	len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
818  
819  	/* this is the only place which holds both the socket's rs_lock
820  	 * and the connection's c_lock */
821  	spin_lock_irqsave(&rs->rs_lock, flags);
822  
823  	/*
824  	 * If there is a little space in sndbuf, we don't queue anything,
825  	 * and userspace gets -EAGAIN. But poll() indicates there's send
826  	 * room. This can lead to bad behavior (spinning) if snd_bytes isn't
827  	 * freed up by incoming acks. So we check the *old* value of
828  	 * rs_snd_bytes here to allow the last msg to exceed the buffer,
829  	 * and poll() now knows no more data can be sent.
830  	 */
831  	if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) {
832  		rs->rs_snd_bytes += len;
833  
834  		/* let recv side know we are close to send space exhaustion.
835  		 * This is probably not the optimal way to do it, as this
836  		 * means we set the flag on *all* messages as soon as our
837  		 * throughput hits a certain threshold.
838  		 */
839  		if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2)
840  			set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
841  
842  		list_add_tail(&rm->m_sock_item, &rs->rs_send_queue);
843  		set_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
844  		rds_message_addref(rm);
845  		sock_hold(rds_rs_to_sk(rs));
846  		rm->m_rs = rs;
847  
848  		/* The code ordering is a little weird, but we're
849  		   trying to minimize the time we hold c_lock */
850  		rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0);
851  		rm->m_inc.i_conn = conn;
852  		rm->m_inc.i_conn_path = cp;
853  		rds_message_addref(rm);
854  
855  		spin_lock(&cp->cp_lock);
856  		rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++);
857  		list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
858  		set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
859  		spin_unlock(&cp->cp_lock);
860  
861  		rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n",
862  			 rm, len, rs, rs->rs_snd_bytes,
863  			 (unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence));
864  
865  		*queued = 1;
866  	}
867  
868  	spin_unlock_irqrestore(&rs->rs_lock, flags);
869  out:
870  	return *queued;
871  }
872  
873  /*
874   * rds_message is getting to be quite complicated, and we'd like to allocate
875   * it all in one go. This figures out how big it needs to be up front.
876   */
877  static int rds_rm_size(struct msghdr *msg, int num_sgs,
878  		       struct rds_iov_vector_arr *vct)
879  {
880  	struct cmsghdr *cmsg;
881  	int size = 0;
882  	int cmsg_groups = 0;
883  	int retval;
884  	bool zcopy_cookie = false;
885  	struct rds_iov_vector *iov, *tmp_iov;
886  
887  	if (num_sgs < 0)
888  		return -EINVAL;
889  
890  	for_each_cmsghdr(cmsg, msg) {
891  		if (!CMSG_OK(msg, cmsg))
892  			return -EINVAL;
893  
894  		if (cmsg->cmsg_level != SOL_RDS)
895  			continue;
896  
897  		switch (cmsg->cmsg_type) {
898  		case RDS_CMSG_RDMA_ARGS:
899  			if (vct->indx >= vct->len) {
900  				vct->len += vct->incr;
901  				tmp_iov =
902  					krealloc(vct->vec,
903  						 vct->len *
904  						 sizeof(struct rds_iov_vector),
905  						 GFP_KERNEL);
906  				if (!tmp_iov) {
907  					vct->len -= vct->incr;
908  					return -ENOMEM;
909  				}
910  				vct->vec = tmp_iov;
911  			}
912  			iov = &vct->vec[vct->indx];
913  			memset(iov, 0, sizeof(struct rds_iov_vector));
914  			vct->indx++;
915  			cmsg_groups |= 1;
916  			retval = rds_rdma_extra_size(CMSG_DATA(cmsg), iov);
917  			if (retval < 0)
918  				return retval;
919  			size += retval;
920  
921  			break;
922  
923  		case RDS_CMSG_ZCOPY_COOKIE:
924  			zcopy_cookie = true;
925  			/* fall through */
926  
927  		case RDS_CMSG_RDMA_DEST:
928  		case RDS_CMSG_RDMA_MAP:
929  			cmsg_groups |= 2;
930  			/* these are valid but do no add any size */
931  			break;
932  
933  		case RDS_CMSG_ATOMIC_CSWP:
934  		case RDS_CMSG_ATOMIC_FADD:
935  		case RDS_CMSG_MASKED_ATOMIC_CSWP:
936  		case RDS_CMSG_MASKED_ATOMIC_FADD:
937  			cmsg_groups |= 1;
938  			size += sizeof(struct scatterlist);
939  			break;
940  
941  		default:
942  			return -EINVAL;
943  		}
944  
945  	}
946  
947  	if ((msg->msg_flags & MSG_ZEROCOPY) && !zcopy_cookie)
948  		return -EINVAL;
949  
950  	size += num_sgs * sizeof(struct scatterlist);
951  
952  	/* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */
953  	if (cmsg_groups == 3)
954  		return -EINVAL;
955  
956  	return size;
957  }
958  
959  static int rds_cmsg_zcopy(struct rds_sock *rs, struct rds_message *rm,
960  			  struct cmsghdr *cmsg)
961  {
962  	u32 *cookie;
963  
964  	if (cmsg->cmsg_len < CMSG_LEN(sizeof(*cookie)) ||
965  	    !rm->data.op_mmp_znotifier)
966  		return -EINVAL;
967  	cookie = CMSG_DATA(cmsg);
968  	rm->data.op_mmp_znotifier->z_cookie = *cookie;
969  	return 0;
970  }
971  
972  static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
973  			 struct msghdr *msg, int *allocated_mr,
974  			 struct rds_iov_vector_arr *vct)
975  {
976  	struct cmsghdr *cmsg;
977  	int ret = 0, ind = 0;
978  
979  	for_each_cmsghdr(cmsg, msg) {
980  		if (!CMSG_OK(msg, cmsg))
981  			return -EINVAL;
982  
983  		if (cmsg->cmsg_level != SOL_RDS)
984  			continue;
985  
986  		/* As a side effect, RDMA_DEST and RDMA_MAP will set
987  		 * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr.
988  		 */
989  		switch (cmsg->cmsg_type) {
990  		case RDS_CMSG_RDMA_ARGS:
991  			if (ind >= vct->indx)
992  				return -ENOMEM;
993  			ret = rds_cmsg_rdma_args(rs, rm, cmsg, &vct->vec[ind]);
994  			ind++;
995  			break;
996  
997  		case RDS_CMSG_RDMA_DEST:
998  			ret = rds_cmsg_rdma_dest(rs, rm, cmsg);
999  			break;
1000  
1001  		case RDS_CMSG_RDMA_MAP:
1002  			ret = rds_cmsg_rdma_map(rs, rm, cmsg);
1003  			if (!ret)
1004  				*allocated_mr = 1;
1005  			else if (ret == -ENODEV)
1006  				/* Accommodate the get_mr() case which can fail
1007  				 * if connection isn't established yet.
1008  				 */
1009  				ret = -EAGAIN;
1010  			break;
1011  		case RDS_CMSG_ATOMIC_CSWP:
1012  		case RDS_CMSG_ATOMIC_FADD:
1013  		case RDS_CMSG_MASKED_ATOMIC_CSWP:
1014  		case RDS_CMSG_MASKED_ATOMIC_FADD:
1015  			ret = rds_cmsg_atomic(rs, rm, cmsg);
1016  			break;
1017  
1018  		case RDS_CMSG_ZCOPY_COOKIE:
1019  			ret = rds_cmsg_zcopy(rs, rm, cmsg);
1020  			break;
1021  
1022  		default:
1023  			return -EINVAL;
1024  		}
1025  
1026  		if (ret)
1027  			break;
1028  	}
1029  
1030  	return ret;
1031  }
1032  
1033  static int rds_send_mprds_hash(struct rds_sock *rs,
1034  			       struct rds_connection *conn, int nonblock)
1035  {
1036  	int hash;
1037  
1038  	if (conn->c_npaths == 0)
1039  		hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS);
1040  	else
1041  		hash = RDS_MPATH_HASH(rs, conn->c_npaths);
1042  	if (conn->c_npaths == 0 && hash != 0) {
1043  		rds_send_ping(conn, 0);
1044  
1045  		/* The underlying connection is not up yet.  Need to wait
1046  		 * until it is up to be sure that the non-zero c_path can be
1047  		 * used.  But if we are interrupted, we have to use the zero
1048  		 * c_path in case the connection ends up being non-MP capable.
1049  		 */
1050  		if (conn->c_npaths == 0) {
1051  			/* Cannot wait for the connection be made, so just use
1052  			 * the base c_path.
1053  			 */
1054  			if (nonblock)
1055  				return 0;
1056  			if (wait_event_interruptible(conn->c_hs_waitq,
1057  						     conn->c_npaths != 0))
1058  				hash = 0;
1059  		}
1060  		if (conn->c_npaths == 1)
1061  			hash = 0;
1062  	}
1063  	return hash;
1064  }
1065  
1066  static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes)
1067  {
1068  	struct rds_rdma_args *args;
1069  	struct cmsghdr *cmsg;
1070  
1071  	for_each_cmsghdr(cmsg, msg) {
1072  		if (!CMSG_OK(msg, cmsg))
1073  			return -EINVAL;
1074  
1075  		if (cmsg->cmsg_level != SOL_RDS)
1076  			continue;
1077  
1078  		if (cmsg->cmsg_type == RDS_CMSG_RDMA_ARGS) {
1079  			if (cmsg->cmsg_len <
1080  			    CMSG_LEN(sizeof(struct rds_rdma_args)))
1081  				return -EINVAL;
1082  			args = CMSG_DATA(cmsg);
1083  			*rdma_bytes += args->remote_vec.bytes;
1084  		}
1085  	}
1086  	return 0;
1087  }
1088  
1089  int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
1090  {
1091  	struct sock *sk = sock->sk;
1092  	struct rds_sock *rs = rds_sk_to_rs(sk);
1093  	DECLARE_SOCKADDR(struct sockaddr_in6 *, sin6, msg->msg_name);
1094  	DECLARE_SOCKADDR(struct sockaddr_in *, usin, msg->msg_name);
1095  	__be16 dport;
1096  	struct rds_message *rm = NULL;
1097  	struct rds_connection *conn;
1098  	int ret = 0;
1099  	int queued = 0, allocated_mr = 0;
1100  	int nonblock = msg->msg_flags & MSG_DONTWAIT;
1101  	long timeo = sock_sndtimeo(sk, nonblock);
1102  	struct rds_conn_path *cpath;
1103  	struct in6_addr daddr;
1104  	__u32 scope_id = 0;
1105  	size_t total_payload_len = payload_len, rdma_payload_len = 0;
1106  	bool zcopy = ((msg->msg_flags & MSG_ZEROCOPY) &&
1107  		      sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY));
1108  	int num_sgs = DIV_ROUND_UP(payload_len, PAGE_SIZE);
1109  	int namelen;
1110  	struct rds_iov_vector_arr vct;
1111  	int ind;
1112  
1113  	memset(&vct, 0, sizeof(vct));
1114  
1115  	/* expect 1 RDMA CMSG per rds_sendmsg. can still grow if more needed. */
1116  	vct.incr = 1;
1117  
1118  	/* Mirror Linux UDP mirror of BSD error message compatibility */
1119  	/* XXX: Perhaps MSG_MORE someday */
1120  	if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT | MSG_ZEROCOPY)) {
1121  		ret = -EOPNOTSUPP;
1122  		goto out;
1123  	}
1124  
1125  	namelen = msg->msg_namelen;
1126  	if (namelen != 0) {
1127  		if (namelen < sizeof(*usin)) {
1128  			ret = -EINVAL;
1129  			goto out;
1130  		}
1131  		switch (usin->sin_family) {
1132  		case AF_INET:
1133  			if (usin->sin_addr.s_addr == htonl(INADDR_ANY) ||
1134  			    usin->sin_addr.s_addr == htonl(INADDR_BROADCAST) ||
1135  			    IN_MULTICAST(ntohl(usin->sin_addr.s_addr))) {
1136  				ret = -EINVAL;
1137  				goto out;
1138  			}
1139  			ipv6_addr_set_v4mapped(usin->sin_addr.s_addr, &daddr);
1140  			dport = usin->sin_port;
1141  			break;
1142  
1143  #if IS_ENABLED(CONFIG_IPV6)
1144  		case AF_INET6: {
1145  			int addr_type;
1146  
1147  			if (namelen < sizeof(*sin6)) {
1148  				ret = -EINVAL;
1149  				goto out;
1150  			}
1151  			addr_type = ipv6_addr_type(&sin6->sin6_addr);
1152  			if (!(addr_type & IPV6_ADDR_UNICAST)) {
1153  				__be32 addr4;
1154  
1155  				if (!(addr_type & IPV6_ADDR_MAPPED)) {
1156  					ret = -EINVAL;
1157  					goto out;
1158  				}
1159  
1160  				/* It is a mapped address.  Need to do some
1161  				 * sanity checks.
1162  				 */
1163  				addr4 = sin6->sin6_addr.s6_addr32[3];
1164  				if (addr4 == htonl(INADDR_ANY) ||
1165  				    addr4 == htonl(INADDR_BROADCAST) ||
1166  				    IN_MULTICAST(ntohl(addr4))) {
1167  					ret = -EINVAL;
1168  					goto out;
1169  				}
1170  			}
1171  			if (addr_type & IPV6_ADDR_LINKLOCAL) {
1172  				if (sin6->sin6_scope_id == 0) {
1173  					ret = -EINVAL;
1174  					goto out;
1175  				}
1176  				scope_id = sin6->sin6_scope_id;
1177  			}
1178  
1179  			daddr = sin6->sin6_addr;
1180  			dport = sin6->sin6_port;
1181  			break;
1182  		}
1183  #endif
1184  
1185  		default:
1186  			ret = -EINVAL;
1187  			goto out;
1188  		}
1189  	} else {
1190  		/* We only care about consistency with ->connect() */
1191  		lock_sock(sk);
1192  		daddr = rs->rs_conn_addr;
1193  		dport = rs->rs_conn_port;
1194  		scope_id = rs->rs_bound_scope_id;
1195  		release_sock(sk);
1196  	}
1197  
1198  	lock_sock(sk);
1199  	if (ipv6_addr_any(&rs->rs_bound_addr) || ipv6_addr_any(&daddr)) {
1200  		release_sock(sk);
1201  		ret = -ENOTCONN;
1202  		goto out;
1203  	} else if (namelen != 0) {
1204  		/* Cannot send to an IPv4 address using an IPv6 source
1205  		 * address and cannot send to an IPv6 address using an
1206  		 * IPv4 source address.
1207  		 */
1208  		if (ipv6_addr_v4mapped(&daddr) ^
1209  		    ipv6_addr_v4mapped(&rs->rs_bound_addr)) {
1210  			release_sock(sk);
1211  			ret = -EOPNOTSUPP;
1212  			goto out;
1213  		}
1214  		/* If the socket is already bound to a link local address,
1215  		 * it can only send to peers on the same link.  But allow
1216  		 * communicating beween link local and non-link local address.
1217  		 */
1218  		if (scope_id != rs->rs_bound_scope_id) {
1219  			if (!scope_id) {
1220  				scope_id = rs->rs_bound_scope_id;
1221  			} else if (rs->rs_bound_scope_id) {
1222  				release_sock(sk);
1223  				ret = -EINVAL;
1224  				goto out;
1225  			}
1226  		}
1227  	}
1228  	release_sock(sk);
1229  
1230  	ret = rds_rdma_bytes(msg, &rdma_payload_len);
1231  	if (ret)
1232  		goto out;
1233  
1234  	total_payload_len += rdma_payload_len;
1235  	if (max_t(size_t, payload_len, rdma_payload_len) > RDS_MAX_MSG_SIZE) {
1236  		ret = -EMSGSIZE;
1237  		goto out;
1238  	}
1239  
1240  	if (payload_len > rds_sk_sndbuf(rs)) {
1241  		ret = -EMSGSIZE;
1242  		goto out;
1243  	}
1244  
1245  	if (zcopy) {
1246  		if (rs->rs_transport->t_type != RDS_TRANS_TCP) {
1247  			ret = -EOPNOTSUPP;
1248  			goto out;
1249  		}
1250  		num_sgs = iov_iter_npages(&msg->msg_iter, INT_MAX);
1251  	}
1252  	/* size of rm including all sgs */
1253  	ret = rds_rm_size(msg, num_sgs, &vct);
1254  	if (ret < 0)
1255  		goto out;
1256  
1257  	rm = rds_message_alloc(ret, GFP_KERNEL);
1258  	if (!rm) {
1259  		ret = -ENOMEM;
1260  		goto out;
1261  	}
1262  
1263  	/* Attach data to the rm */
1264  	if (payload_len) {
1265  		rm->data.op_sg = rds_message_alloc_sgs(rm, num_sgs, &ret);
1266  		if (!rm->data.op_sg)
1267  			goto out;
1268  		ret = rds_message_copy_from_user(rm, &msg->msg_iter, zcopy);
1269  		if (ret)
1270  			goto out;
1271  	}
1272  	rm->data.op_active = 1;
1273  
1274  	rm->m_daddr = daddr;
1275  
1276  	/* rds_conn_create has a spinlock that runs with IRQ off.
1277  	 * Caching the conn in the socket helps a lot. */
1278  	if (rs->rs_conn && ipv6_addr_equal(&rs->rs_conn->c_faddr, &daddr) &&
1279  	    rs->rs_tos == rs->rs_conn->c_tos) {
1280  		conn = rs->rs_conn;
1281  	} else {
1282  		conn = rds_conn_create_outgoing(sock_net(sock->sk),
1283  						&rs->rs_bound_addr, &daddr,
1284  						rs->rs_transport, rs->rs_tos,
1285  						sock->sk->sk_allocation,
1286  						scope_id);
1287  		if (IS_ERR(conn)) {
1288  			ret = PTR_ERR(conn);
1289  			goto out;
1290  		}
1291  		rs->rs_conn = conn;
1292  	}
1293  
1294  	if (conn->c_trans->t_mp_capable)
1295  		cpath = &conn->c_path[rds_send_mprds_hash(rs, conn, nonblock)];
1296  	else
1297  		cpath = &conn->c_path[0];
1298  
1299  	rm->m_conn_path = cpath;
1300  
1301  	/* Parse any control messages the user may have included. */
1302  	ret = rds_cmsg_send(rs, rm, msg, &allocated_mr, &vct);
1303  	if (ret) {
1304  		/* Trigger connection so that its ready for the next retry */
1305  		if (ret ==  -EAGAIN)
1306  			rds_conn_connect_if_down(conn);
1307  		goto out;
1308  	}
1309  
1310  	if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) {
1311  		printk_ratelimited(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
1312  			       &rm->rdma, conn->c_trans->xmit_rdma);
1313  		ret = -EOPNOTSUPP;
1314  		goto out;
1315  	}
1316  
1317  	if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) {
1318  		printk_ratelimited(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n",
1319  			       &rm->atomic, conn->c_trans->xmit_atomic);
1320  		ret = -EOPNOTSUPP;
1321  		goto out;
1322  	}
1323  
1324  	if (rds_destroy_pending(conn)) {
1325  		ret = -EAGAIN;
1326  		goto out;
1327  	}
1328  
1329  	rds_conn_path_connect_if_down(cpath);
1330  
1331  	ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
1332  	if (ret) {
1333  		rs->rs_seen_congestion = 1;
1334  		goto out;
1335  	}
1336  	while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port,
1337  				  dport, &queued)) {
1338  		rds_stats_inc(s_send_queue_full);
1339  
1340  		if (nonblock) {
1341  			ret = -EAGAIN;
1342  			goto out;
1343  		}
1344  
1345  		timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
1346  					rds_send_queue_rm(rs, conn, cpath, rm,
1347  							  rs->rs_bound_port,
1348  							  dport,
1349  							  &queued),
1350  					timeo);
1351  		rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo);
1352  		if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
1353  			continue;
1354  
1355  		ret = timeo;
1356  		if (ret == 0)
1357  			ret = -ETIMEDOUT;
1358  		goto out;
1359  	}
1360  
1361  	/*
1362  	 * By now we've committed to the send.  We reuse rds_send_worker()
1363  	 * to retry sends in the rds thread if the transport asks us to.
1364  	 */
1365  	rds_stats_inc(s_send_queued);
1366  
1367  	ret = rds_send_xmit(cpath);
1368  	if (ret == -ENOMEM || ret == -EAGAIN) {
1369  		ret = 0;
1370  		rcu_read_lock();
1371  		if (rds_destroy_pending(cpath->cp_conn))
1372  			ret = -ENETUNREACH;
1373  		else
1374  			queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
1375  		rcu_read_unlock();
1376  	}
1377  	if (ret)
1378  		goto out;
1379  	rds_message_put(rm);
1380  
1381  	for (ind = 0; ind < vct.indx; ind++)
1382  		kfree(vct.vec[ind].iov);
1383  	kfree(vct.vec);
1384  
1385  	return payload_len;
1386  
1387  out:
1388  	for (ind = 0; ind < vct.indx; ind++)
1389  		kfree(vct.vec[ind].iov);
1390  	kfree(vct.vec);
1391  
1392  	/* If the user included a RDMA_MAP cmsg, we allocated a MR on the fly.
1393  	 * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN
1394  	 * or in any other way, we need to destroy the MR again */
1395  	if (allocated_mr)
1396  		rds_rdma_unuse(rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1);
1397  
1398  	if (rm)
1399  		rds_message_put(rm);
1400  	return ret;
1401  }
1402  
1403  /*
1404   * send out a probe. Can be shared by rds_send_ping,
1405   * rds_send_pong, rds_send_hb.
1406   * rds_send_hb should use h_flags
1407   *   RDS_FLAG_HB_PING|RDS_FLAG_ACK_REQUIRED
1408   * or
1409   *   RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED
1410   */
1411  static int
1412  rds_send_probe(struct rds_conn_path *cp, __be16 sport,
1413  	       __be16 dport, u8 h_flags)
1414  {
1415  	struct rds_message *rm;
1416  	unsigned long flags;
1417  	int ret = 0;
1418  
1419  	rm = rds_message_alloc(0, GFP_ATOMIC);
1420  	if (!rm) {
1421  		ret = -ENOMEM;
1422  		goto out;
1423  	}
1424  
1425  	rm->m_daddr = cp->cp_conn->c_faddr;
1426  	rm->data.op_active = 1;
1427  
1428  	rds_conn_path_connect_if_down(cp);
1429  
1430  	ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL);
1431  	if (ret)
1432  		goto out;
1433  
1434  	spin_lock_irqsave(&cp->cp_lock, flags);
1435  	list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
1436  	set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
1437  	rds_message_addref(rm);
1438  	rm->m_inc.i_conn = cp->cp_conn;
1439  	rm->m_inc.i_conn_path = cp;
1440  
1441  	rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport,
1442  				    cp->cp_next_tx_seq);
1443  	rm->m_inc.i_hdr.h_flags |= h_flags;
1444  	cp->cp_next_tx_seq++;
1445  
1446  	if (RDS_HS_PROBE(be16_to_cpu(sport), be16_to_cpu(dport)) &&
1447  	    cp->cp_conn->c_trans->t_mp_capable) {
1448  		u16 npaths = cpu_to_be16(RDS_MPATH_WORKERS);
1449  		u32 my_gen_num = cpu_to_be32(cp->cp_conn->c_my_gen_num);
1450  
1451  		rds_message_add_extension(&rm->m_inc.i_hdr,
1452  					  RDS_EXTHDR_NPATHS, &npaths,
1453  					  sizeof(npaths));
1454  		rds_message_add_extension(&rm->m_inc.i_hdr,
1455  					  RDS_EXTHDR_GEN_NUM,
1456  					  &my_gen_num,
1457  					  sizeof(u32));
1458  	}
1459  	spin_unlock_irqrestore(&cp->cp_lock, flags);
1460  
1461  	rds_stats_inc(s_send_queued);
1462  	rds_stats_inc(s_send_pong);
1463  
1464  	/* schedule the send work on rds_wq */
1465  	rcu_read_lock();
1466  	if (!rds_destroy_pending(cp->cp_conn))
1467  		queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
1468  	rcu_read_unlock();
1469  
1470  	rds_message_put(rm);
1471  	return 0;
1472  
1473  out:
1474  	if (rm)
1475  		rds_message_put(rm);
1476  	return ret;
1477  }
1478  
1479  int
1480  rds_send_pong(struct rds_conn_path *cp, __be16 dport)
1481  {
1482  	return rds_send_probe(cp, 0, dport, 0);
1483  }
1484  
1485  void
1486  rds_send_ping(struct rds_connection *conn, int cp_index)
1487  {
1488  	unsigned long flags;
1489  	struct rds_conn_path *cp = &conn->c_path[cp_index];
1490  
1491  	spin_lock_irqsave(&cp->cp_lock, flags);
1492  	if (conn->c_ping_triggered) {
1493  		spin_unlock_irqrestore(&cp->cp_lock, flags);
1494  		return;
1495  	}
1496  	conn->c_ping_triggered = 1;
1497  	spin_unlock_irqrestore(&cp->cp_lock, flags);
1498  	rds_send_probe(cp, cpu_to_be16(RDS_FLAG_PROBE_PORT), 0, 0);
1499  }
1500  EXPORT_SYMBOL_GPL(rds_send_ping);
1501