xref: /openbmc/linux/net/smc/smc_tx.c (revision 60630924bb5af8751adcecc896e7763c3783ca89)
1  // SPDX-License-Identifier: GPL-2.0
2  /*
3   * Shared Memory Communications over RDMA (SMC-R) and RoCE
4   *
5   * Manage send buffer.
6   * Producer:
7   * Copy user space data into send buffer, if send buffer space available.
8   * Consumer:
9   * Trigger RDMA write into RMBE of peer and send CDC, if RMBE space available.
10   *
11   * Copyright IBM Corp. 2016
12   *
13   * Author(s):  Ursula Braun <ubraun@linux.vnet.ibm.com>
14   */
15  
16  #include <linux/net.h>
17  #include <linux/rcupdate.h>
18  #include <linux/workqueue.h>
19  #include <linux/sched/signal.h>
20  
21  #include <net/sock.h>
22  #include <net/tcp.h>
23  
24  #include "smc.h"
25  #include "smc_wr.h"
26  #include "smc_cdc.h"
27  #include "smc_close.h"
28  #include "smc_ism.h"
29  #include "smc_tx.h"
30  #include "smc_stats.h"
31  #include "smc_tracepoint.h"
32  
33  #define SMC_TX_WORK_DELAY	0
34  #define SMC_TX_CORK_DELAY	(HZ >> 2)	/* 250 ms */
35  
36  /***************************** sndbuf producer *******************************/
37  
38  /* callback implementation for sk.sk_write_space()
39   * to wakeup sndbuf producers that blocked with smc_tx_wait().
40   * called under sk_socket lock.
41   */
42  static void smc_tx_write_space(struct sock *sk)
43  {
44  	struct socket *sock = sk->sk_socket;
45  	struct smc_sock *smc = smc_sk(sk);
46  	struct socket_wq *wq;
47  
48  	/* similar to sk_stream_write_space */
49  	if (atomic_read(&smc->conn.sndbuf_space) && sock) {
50  		if (test_bit(SOCK_NOSPACE, &sock->flags))
51  			SMC_STAT_RMB_TX_FULL(smc, !smc->conn.lnk);
52  		clear_bit(SOCK_NOSPACE, &sock->flags);
53  		rcu_read_lock();
54  		wq = rcu_dereference(sk->sk_wq);
55  		if (skwq_has_sleeper(wq))
56  			wake_up_interruptible_poll(&wq->wait,
57  						   EPOLLOUT | EPOLLWRNORM |
58  						   EPOLLWRBAND);
59  		if (wq && wq->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
60  			sock_wake_async(wq, SOCK_WAKE_SPACE, POLL_OUT);
61  		rcu_read_unlock();
62  	}
63  }
64  
65  /* Wakeup sndbuf producers that blocked with smc_tx_wait().
66   * Cf. tcp_data_snd_check()=>tcp_check_space()=>tcp_new_space().
67   */
68  void smc_tx_sndbuf_nonfull(struct smc_sock *smc)
69  {
70  	if (smc->sk.sk_socket &&
71  	    test_bit(SOCK_NOSPACE, &smc->sk.sk_socket->flags))
72  		smc->sk.sk_write_space(&smc->sk);
73  }
74  
75  /* blocks sndbuf producer until at least one byte of free space available
76   * or urgent Byte was consumed
77   */
78  static int smc_tx_wait(struct smc_sock *smc, int flags)
79  {
80  	DEFINE_WAIT_FUNC(wait, woken_wake_function);
81  	struct smc_connection *conn = &smc->conn;
82  	struct sock *sk = &smc->sk;
83  	long timeo;
84  	int rc = 0;
85  
86  	/* similar to sk_stream_wait_memory */
87  	timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
88  	add_wait_queue(sk_sleep(sk), &wait);
89  	while (1) {
90  		sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
91  		if (sk->sk_err ||
92  		    (sk->sk_shutdown & SEND_SHUTDOWN) ||
93  		    conn->killed ||
94  		    conn->local_tx_ctrl.conn_state_flags.peer_done_writing) {
95  			rc = -EPIPE;
96  			break;
97  		}
98  		if (smc_cdc_rxed_any_close(conn)) {
99  			rc = -ECONNRESET;
100  			break;
101  		}
102  		if (!timeo) {
103  			/* ensure EPOLLOUT is subsequently generated */
104  			set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
105  			rc = -EAGAIN;
106  			break;
107  		}
108  		if (signal_pending(current)) {
109  			rc = sock_intr_errno(timeo);
110  			break;
111  		}
112  		sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
113  		if (atomic_read(&conn->sndbuf_space) && !conn->urg_tx_pend)
114  			break; /* at least 1 byte of free & no urgent data */
115  		set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
116  		sk_wait_event(sk, &timeo,
117  			      sk->sk_err ||
118  			      (sk->sk_shutdown & SEND_SHUTDOWN) ||
119  			      smc_cdc_rxed_any_close(conn) ||
120  			      (atomic_read(&conn->sndbuf_space) &&
121  			       !conn->urg_tx_pend),
122  			      &wait);
123  	}
124  	remove_wait_queue(sk_sleep(sk), &wait);
125  	return rc;
126  }
127  
128  static bool smc_tx_is_corked(struct smc_sock *smc)
129  {
130  	struct tcp_sock *tp = tcp_sk(smc->clcsock->sk);
131  
132  	return (tp->nonagle & TCP_NAGLE_CORK) ? true : false;
133  }
134  
135  /* sndbuf producer: main API called by socket layer.
136   * called under sock lock.
137   */
138  int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
139  {
140  	size_t copylen, send_done = 0, send_remaining = len;
141  	size_t chunk_len, chunk_off, chunk_len_sum;
142  	struct smc_connection *conn = &smc->conn;
143  	union smc_host_cursor prep;
144  	struct sock *sk = &smc->sk;
145  	char *sndbuf_base;
146  	int tx_cnt_prep;
147  	int writespace;
148  	int rc, chunk;
149  
150  	/* This should be in poll */
151  	sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
152  
153  	if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) {
154  		rc = -EPIPE;
155  		goto out_err;
156  	}
157  
158  	if (sk->sk_state == SMC_INIT)
159  		return -ENOTCONN;
160  
161  	if (len > conn->sndbuf_desc->len)
162  		SMC_STAT_RMB_TX_SIZE_SMALL(smc, !conn->lnk);
163  
164  	if (len > conn->peer_rmbe_size)
165  		SMC_STAT_RMB_TX_PEER_SIZE_SMALL(smc, !conn->lnk);
166  
167  	if (msg->msg_flags & MSG_OOB)
168  		SMC_STAT_INC(smc, urg_data_cnt);
169  
170  	while (msg_data_left(msg)) {
171  		if (smc->sk.sk_shutdown & SEND_SHUTDOWN ||
172  		    (smc->sk.sk_err == ECONNABORTED) ||
173  		    conn->killed)
174  			return -EPIPE;
175  		if (smc_cdc_rxed_any_close(conn))
176  			return send_done ?: -ECONNRESET;
177  
178  		if (msg->msg_flags & MSG_OOB)
179  			conn->local_tx_ctrl.prod_flags.urg_data_pending = 1;
180  
181  		if (!atomic_read(&conn->sndbuf_space) || conn->urg_tx_pend) {
182  			if (send_done)
183  				return send_done;
184  			rc = smc_tx_wait(smc, msg->msg_flags);
185  			if (rc)
186  				goto out_err;
187  			continue;
188  		}
189  
190  		/* initialize variables for 1st iteration of subsequent loop */
191  		/* could be just 1 byte, even after smc_tx_wait above */
192  		writespace = atomic_read(&conn->sndbuf_space);
193  		/* not more than what user space asked for */
194  		copylen = min_t(size_t, send_remaining, writespace);
195  		/* determine start of sndbuf */
196  		sndbuf_base = conn->sndbuf_desc->cpu_addr;
197  		smc_curs_copy(&prep, &conn->tx_curs_prep, conn);
198  		tx_cnt_prep = prep.count;
199  		/* determine chunks where to write into sndbuf */
200  		/* either unwrapped case, or 1st chunk of wrapped case */
201  		chunk_len = min_t(size_t, copylen, conn->sndbuf_desc->len -
202  				  tx_cnt_prep);
203  		chunk_len_sum = chunk_len;
204  		chunk_off = tx_cnt_prep;
205  		smc_sndbuf_sync_sg_for_cpu(conn);
206  		for (chunk = 0; chunk < 2; chunk++) {
207  			rc = memcpy_from_msg(sndbuf_base + chunk_off,
208  					     msg, chunk_len);
209  			if (rc) {
210  				smc_sndbuf_sync_sg_for_device(conn);
211  				if (send_done)
212  					return send_done;
213  				goto out_err;
214  			}
215  			send_done += chunk_len;
216  			send_remaining -= chunk_len;
217  
218  			if (chunk_len_sum == copylen)
219  				break; /* either on 1st or 2nd iteration */
220  			/* prepare next (== 2nd) iteration */
221  			chunk_len = copylen - chunk_len; /* remainder */
222  			chunk_len_sum += chunk_len;
223  			chunk_off = 0; /* modulo offset in send ring buffer */
224  		}
225  		smc_sndbuf_sync_sg_for_device(conn);
226  		/* update cursors */
227  		smc_curs_add(conn->sndbuf_desc->len, &prep, copylen);
228  		smc_curs_copy(&conn->tx_curs_prep, &prep, conn);
229  		/* increased in send tasklet smc_cdc_tx_handler() */
230  		smp_mb__before_atomic();
231  		atomic_sub(copylen, &conn->sndbuf_space);
232  		/* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
233  		smp_mb__after_atomic();
234  		/* since we just produced more new data into sndbuf,
235  		 * trigger sndbuf consumer: RDMA write into peer RMBE and CDC
236  		 */
237  		if ((msg->msg_flags & MSG_OOB) && !send_remaining)
238  			conn->urg_tx_pend = true;
239  		if ((msg->msg_flags & MSG_MORE || smc_tx_is_corked(smc)) &&
240  		    (atomic_read(&conn->sndbuf_space) >
241  						(conn->sndbuf_desc->len >> 1)))
242  			/* for a corked socket defer the RDMA writes if there
243  			 * is still sufficient sndbuf_space available
244  			 */
245  			queue_delayed_work(conn->lgr->tx_wq, &conn->tx_work,
246  					   SMC_TX_CORK_DELAY);
247  		else
248  			smc_tx_sndbuf_nonempty(conn);
249  
250  		trace_smc_tx_sendmsg(smc, copylen);
251  	} /* while (msg_data_left(msg)) */
252  
253  	return send_done;
254  
255  out_err:
256  	rc = sk_stream_error(sk, msg->msg_flags, rc);
257  	/* make sure we wake any epoll edge trigger waiter */
258  	if (unlikely(rc == -EAGAIN))
259  		sk->sk_write_space(sk);
260  	return rc;
261  }
262  
263  /***************************** sndbuf consumer *******************************/
264  
265  /* sndbuf consumer: actual data transfer of one target chunk with ISM write */
266  int smcd_tx_ism_write(struct smc_connection *conn, void *data, size_t len,
267  		      u32 offset, int signal)
268  {
269  	struct smc_ism_position pos;
270  	int rc;
271  
272  	memset(&pos, 0, sizeof(pos));
273  	pos.token = conn->peer_token;
274  	pos.index = conn->peer_rmbe_idx;
275  	pos.offset = conn->tx_off + offset;
276  	pos.signal = signal;
277  	rc = smc_ism_write(conn->lgr->smcd, &pos, data, len);
278  	if (rc)
279  		conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
280  	return rc;
281  }
282  
283  /* sndbuf consumer: actual data transfer of one target chunk with RDMA write */
284  static int smc_tx_rdma_write(struct smc_connection *conn, int peer_rmbe_offset,
285  			     int num_sges, struct ib_rdma_wr *rdma_wr)
286  {
287  	struct smc_link_group *lgr = conn->lgr;
288  	struct smc_link *link = conn->lnk;
289  	int rc;
290  
291  	rdma_wr->wr.wr_id = smc_wr_tx_get_next_wr_id(link);
292  	rdma_wr->wr.num_sge = num_sges;
293  	rdma_wr->remote_addr =
294  		lgr->rtokens[conn->rtoken_idx][link->link_idx].dma_addr +
295  		/* RMBE within RMB */
296  		conn->tx_off +
297  		/* offset within RMBE */
298  		peer_rmbe_offset;
299  	rdma_wr->rkey = lgr->rtokens[conn->rtoken_idx][link->link_idx].rkey;
300  	rc = ib_post_send(link->roce_qp, &rdma_wr->wr, NULL);
301  	if (rc)
302  		smcr_link_down_cond_sched(link);
303  	return rc;
304  }
305  
306  /* sndbuf consumer */
307  static inline void smc_tx_advance_cursors(struct smc_connection *conn,
308  					  union smc_host_cursor *prod,
309  					  union smc_host_cursor *sent,
310  					  size_t len)
311  {
312  	smc_curs_add(conn->peer_rmbe_size, prod, len);
313  	/* increased in recv tasklet smc_cdc_msg_rcv() */
314  	smp_mb__before_atomic();
315  	/* data in flight reduces usable snd_wnd */
316  	atomic_sub(len, &conn->peer_rmbe_space);
317  	/* guarantee 0 <= peer_rmbe_space <= peer_rmbe_size */
318  	smp_mb__after_atomic();
319  	smc_curs_add(conn->sndbuf_desc->len, sent, len);
320  }
321  
322  /* SMC-R helper for smc_tx_rdma_writes() */
323  static int smcr_tx_rdma_writes(struct smc_connection *conn, size_t len,
324  			       size_t src_off, size_t src_len,
325  			       size_t dst_off, size_t dst_len,
326  			       struct smc_rdma_wr *wr_rdma_buf)
327  {
328  	struct smc_link *link = conn->lnk;
329  
330  	dma_addr_t dma_addr =
331  		sg_dma_address(conn->sndbuf_desc->sgt[link->link_idx].sgl);
332  	int src_len_sum = src_len, dst_len_sum = dst_len;
333  	int sent_count = src_off;
334  	int srcchunk, dstchunk;
335  	int num_sges;
336  	int rc;
337  
338  	for (dstchunk = 0; dstchunk < 2; dstchunk++) {
339  		struct ib_sge *sge =
340  			wr_rdma_buf->wr_tx_rdma[dstchunk].wr.sg_list;
341  
342  		num_sges = 0;
343  		for (srcchunk = 0; srcchunk < 2; srcchunk++) {
344  			sge[srcchunk].addr = dma_addr + src_off;
345  			sge[srcchunk].length = src_len;
346  			num_sges++;
347  
348  			src_off += src_len;
349  			if (src_off >= conn->sndbuf_desc->len)
350  				src_off -= conn->sndbuf_desc->len;
351  						/* modulo in send ring */
352  			if (src_len_sum == dst_len)
353  				break; /* either on 1st or 2nd iteration */
354  			/* prepare next (== 2nd) iteration */
355  			src_len = dst_len - src_len; /* remainder */
356  			src_len_sum += src_len;
357  		}
358  		rc = smc_tx_rdma_write(conn, dst_off, num_sges,
359  				       &wr_rdma_buf->wr_tx_rdma[dstchunk]);
360  		if (rc)
361  			return rc;
362  		if (dst_len_sum == len)
363  			break; /* either on 1st or 2nd iteration */
364  		/* prepare next (== 2nd) iteration */
365  		dst_off = 0; /* modulo offset in RMBE ring buffer */
366  		dst_len = len - dst_len; /* remainder */
367  		dst_len_sum += dst_len;
368  		src_len = min_t(int, dst_len, conn->sndbuf_desc->len -
369  				sent_count);
370  		src_len_sum = src_len;
371  	}
372  	return 0;
373  }
374  
375  /* SMC-D helper for smc_tx_rdma_writes() */
376  static int smcd_tx_rdma_writes(struct smc_connection *conn, size_t len,
377  			       size_t src_off, size_t src_len,
378  			       size_t dst_off, size_t dst_len)
379  {
380  	int src_len_sum = src_len, dst_len_sum = dst_len;
381  	int srcchunk, dstchunk;
382  	int rc;
383  
384  	for (dstchunk = 0; dstchunk < 2; dstchunk++) {
385  		for (srcchunk = 0; srcchunk < 2; srcchunk++) {
386  			void *data = conn->sndbuf_desc->cpu_addr + src_off;
387  
388  			rc = smcd_tx_ism_write(conn, data, src_len, dst_off +
389  					       sizeof(struct smcd_cdc_msg), 0);
390  			if (rc)
391  				return rc;
392  			dst_off += src_len;
393  			src_off += src_len;
394  			if (src_off >= conn->sndbuf_desc->len)
395  				src_off -= conn->sndbuf_desc->len;
396  						/* modulo in send ring */
397  			if (src_len_sum == dst_len)
398  				break; /* either on 1st or 2nd iteration */
399  			/* prepare next (== 2nd) iteration */
400  			src_len = dst_len - src_len; /* remainder */
401  			src_len_sum += src_len;
402  		}
403  		if (dst_len_sum == len)
404  			break; /* either on 1st or 2nd iteration */
405  		/* prepare next (== 2nd) iteration */
406  		dst_off = 0; /* modulo offset in RMBE ring buffer */
407  		dst_len = len - dst_len; /* remainder */
408  		dst_len_sum += dst_len;
409  		src_len = min_t(int, dst_len, conn->sndbuf_desc->len - src_off);
410  		src_len_sum = src_len;
411  	}
412  	return 0;
413  }
414  
415  /* sndbuf consumer: prepare all necessary (src&dst) chunks of data transmit;
416   * usable snd_wnd as max transmit
417   */
418  static int smc_tx_rdma_writes(struct smc_connection *conn,
419  			      struct smc_rdma_wr *wr_rdma_buf)
420  {
421  	size_t len, src_len, dst_off, dst_len; /* current chunk values */
422  	union smc_host_cursor sent, prep, prod, cons;
423  	struct smc_cdc_producer_flags *pflags;
424  	int to_send, rmbespace;
425  	int rc;
426  
427  	/* source: sndbuf */
428  	smc_curs_copy(&sent, &conn->tx_curs_sent, conn);
429  	smc_curs_copy(&prep, &conn->tx_curs_prep, conn);
430  	/* cf. wmem_alloc - (snd_max - snd_una) */
431  	to_send = smc_curs_diff(conn->sndbuf_desc->len, &sent, &prep);
432  	if (to_send <= 0)
433  		return 0;
434  
435  	/* destination: RMBE */
436  	/* cf. snd_wnd */
437  	rmbespace = atomic_read(&conn->peer_rmbe_space);
438  	if (rmbespace <= 0) {
439  		struct smc_sock *smc = container_of(conn, struct smc_sock,
440  						    conn);
441  		SMC_STAT_RMB_TX_PEER_FULL(smc, !conn->lnk);
442  		return 0;
443  	}
444  	smc_curs_copy(&prod, &conn->local_tx_ctrl.prod, conn);
445  	smc_curs_copy(&cons, &conn->local_rx_ctrl.cons, conn);
446  
447  	/* if usable snd_wnd closes ask peer to advertise once it opens again */
448  	pflags = &conn->local_tx_ctrl.prod_flags;
449  	pflags->write_blocked = (to_send >= rmbespace);
450  	/* cf. usable snd_wnd */
451  	len = min(to_send, rmbespace);
452  
453  	/* initialize variables for first iteration of subsequent nested loop */
454  	dst_off = prod.count;
455  	if (prod.wrap == cons.wrap) {
456  		/* the filled destination area is unwrapped,
457  		 * hence the available free destination space is wrapped
458  		 * and we need 2 destination chunks of sum len; start with 1st
459  		 * which is limited by what's available in sndbuf
460  		 */
461  		dst_len = min_t(size_t,
462  				conn->peer_rmbe_size - prod.count, len);
463  	} else {
464  		/* the filled destination area is wrapped,
465  		 * hence the available free destination space is unwrapped
466  		 * and we need a single destination chunk of entire len
467  		 */
468  		dst_len = len;
469  	}
470  	/* dst_len determines the maximum src_len */
471  	if (sent.count + dst_len <= conn->sndbuf_desc->len) {
472  		/* unwrapped src case: single chunk of entire dst_len */
473  		src_len = dst_len;
474  	} else {
475  		/* wrapped src case: 2 chunks of sum dst_len; start with 1st: */
476  		src_len = conn->sndbuf_desc->len - sent.count;
477  	}
478  
479  	if (conn->lgr->is_smcd)
480  		rc = smcd_tx_rdma_writes(conn, len, sent.count, src_len,
481  					 dst_off, dst_len);
482  	else
483  		rc = smcr_tx_rdma_writes(conn, len, sent.count, src_len,
484  					 dst_off, dst_len, wr_rdma_buf);
485  	if (rc)
486  		return rc;
487  
488  	if (conn->urg_tx_pend && len == to_send)
489  		pflags->urg_data_present = 1;
490  	smc_tx_advance_cursors(conn, &prod, &sent, len);
491  	/* update connection's cursors with advanced local cursors */
492  	smc_curs_copy(&conn->local_tx_ctrl.prod, &prod, conn);
493  							/* dst: peer RMBE */
494  	smc_curs_copy(&conn->tx_curs_sent, &sent, conn);/* src: local sndbuf */
495  
496  	return 0;
497  }
498  
499  /* Wakeup sndbuf consumers from any context (IRQ or process)
500   * since there is more data to transmit; usable snd_wnd as max transmit
501   */
502  static int smcr_tx_sndbuf_nonempty(struct smc_connection *conn)
503  {
504  	struct smc_cdc_producer_flags *pflags = &conn->local_tx_ctrl.prod_flags;
505  	struct smc_link *link = conn->lnk;
506  	struct smc_rdma_wr *wr_rdma_buf;
507  	struct smc_cdc_tx_pend *pend;
508  	struct smc_wr_buf *wr_buf;
509  	int rc;
510  
511  	if (!link || !smc_wr_tx_link_hold(link))
512  		return -ENOLINK;
513  	rc = smc_cdc_get_free_slot(conn, link, &wr_buf, &wr_rdma_buf, &pend);
514  	if (rc < 0) {
515  		smc_wr_tx_link_put(link);
516  		if (rc == -EBUSY) {
517  			struct smc_sock *smc =
518  				container_of(conn, struct smc_sock, conn);
519  
520  			if (smc->sk.sk_err == ECONNABORTED)
521  				return sock_error(&smc->sk);
522  			if (conn->killed)
523  				return -EPIPE;
524  			rc = 0;
525  			mod_delayed_work(conn->lgr->tx_wq, &conn->tx_work,
526  					 SMC_TX_WORK_DELAY);
527  		}
528  		return rc;
529  	}
530  
531  	spin_lock_bh(&conn->send_lock);
532  	if (link != conn->lnk) {
533  		/* link of connection changed, tx_work will restart */
534  		smc_wr_tx_put_slot(link,
535  				   (struct smc_wr_tx_pend_priv *)pend);
536  		rc = -ENOLINK;
537  		goto out_unlock;
538  	}
539  	if (!pflags->urg_data_present) {
540  		rc = smc_tx_rdma_writes(conn, wr_rdma_buf);
541  		if (rc) {
542  			smc_wr_tx_put_slot(link,
543  					   (struct smc_wr_tx_pend_priv *)pend);
544  			goto out_unlock;
545  		}
546  	}
547  
548  	rc = smc_cdc_msg_send(conn, wr_buf, pend);
549  	if (!rc && pflags->urg_data_present) {
550  		pflags->urg_data_pending = 0;
551  		pflags->urg_data_present = 0;
552  	}
553  
554  out_unlock:
555  	spin_unlock_bh(&conn->send_lock);
556  	smc_wr_tx_link_put(link);
557  	return rc;
558  }
559  
560  static int smcd_tx_sndbuf_nonempty(struct smc_connection *conn)
561  {
562  	struct smc_cdc_producer_flags *pflags = &conn->local_tx_ctrl.prod_flags;
563  	int rc = 0;
564  
565  	spin_lock_bh(&conn->send_lock);
566  	if (!pflags->urg_data_present)
567  		rc = smc_tx_rdma_writes(conn, NULL);
568  	if (!rc)
569  		rc = smcd_cdc_msg_send(conn);
570  
571  	if (!rc && pflags->urg_data_present) {
572  		pflags->urg_data_pending = 0;
573  		pflags->urg_data_present = 0;
574  	}
575  	spin_unlock_bh(&conn->send_lock);
576  	return rc;
577  }
578  
579  int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
580  {
581  	int rc;
582  
583  	if (conn->killed ||
584  	    conn->local_rx_ctrl.conn_state_flags.peer_conn_abort)
585  		return -EPIPE;	/* connection being aborted */
586  	if (conn->lgr->is_smcd)
587  		rc = smcd_tx_sndbuf_nonempty(conn);
588  	else
589  		rc = smcr_tx_sndbuf_nonempty(conn);
590  
591  	if (!rc) {
592  		/* trigger socket release if connection is closing */
593  		struct smc_sock *smc = container_of(conn, struct smc_sock,
594  						    conn);
595  		smc_close_wake_tx_prepared(smc);
596  	}
597  	return rc;
598  }
599  
600  /* Wakeup sndbuf consumers from process context
601   * since there is more data to transmit
602   */
603  void smc_tx_work(struct work_struct *work)
604  {
605  	struct smc_connection *conn = container_of(to_delayed_work(work),
606  						   struct smc_connection,
607  						   tx_work);
608  	struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
609  	int rc;
610  
611  	lock_sock(&smc->sk);
612  	if (smc->sk.sk_err)
613  		goto out;
614  
615  	rc = smc_tx_sndbuf_nonempty(conn);
616  	if (!rc && conn->local_rx_ctrl.prod_flags.write_blocked &&
617  	    !atomic_read(&conn->bytes_to_rcv))
618  		conn->local_rx_ctrl.prod_flags.write_blocked = 0;
619  
620  out:
621  	release_sock(&smc->sk);
622  }
623  
624  void smc_tx_consumer_update(struct smc_connection *conn, bool force)
625  {
626  	union smc_host_cursor cfed, cons, prod;
627  	int sender_free = conn->rmb_desc->len;
628  	int to_confirm;
629  
630  	smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn);
631  	smc_curs_copy(&cfed, &conn->rx_curs_confirmed, conn);
632  	to_confirm = smc_curs_diff(conn->rmb_desc->len, &cfed, &cons);
633  	if (to_confirm > conn->rmbe_update_limit) {
634  		smc_curs_copy(&prod, &conn->local_rx_ctrl.prod, conn);
635  		sender_free = conn->rmb_desc->len -
636  			      smc_curs_diff_large(conn->rmb_desc->len,
637  						  &cfed, &prod);
638  	}
639  
640  	if (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
641  	    force ||
642  	    ((to_confirm > conn->rmbe_update_limit) &&
643  	     ((sender_free <= (conn->rmb_desc->len / 2)) ||
644  	      conn->local_rx_ctrl.prod_flags.write_blocked))) {
645  		if (conn->killed ||
646  		    conn->local_rx_ctrl.conn_state_flags.peer_conn_abort)
647  			return;
648  		if ((smc_cdc_get_slot_and_msg_send(conn) < 0) &&
649  		    !conn->killed) {
650  			queue_delayed_work(conn->lgr->tx_wq, &conn->tx_work,
651  					   SMC_TX_WORK_DELAY);
652  			return;
653  		}
654  	}
655  	if (conn->local_rx_ctrl.prod_flags.write_blocked &&
656  	    !atomic_read(&conn->bytes_to_rcv))
657  		conn->local_rx_ctrl.prod_flags.write_blocked = 0;
658  }
659  
660  /***************************** send initialize *******************************/
661  
662  /* Initialize send properties on connection establishment. NB: not __init! */
663  void smc_tx_init(struct smc_sock *smc)
664  {
665  	smc->sk.sk_write_space = smc_tx_write_space;
666  }
667