1 /* 2 * Shared Memory Communications over RDMA (SMC-R) and RoCE 3 * 4 * Manage send buffer. 5 * Producer: 6 * Copy user space data into send buffer, if send buffer space available. 7 * Consumer: 8 * Trigger RDMA write into RMBE of peer and send CDC, if RMBE space available. 9 * 10 * Copyright IBM Corp. 2016 11 * 12 * Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com> 13 */ 14 15 #include <linux/net.h> 16 #include <linux/rcupdate.h> 17 #include <linux/workqueue.h> 18 #include <net/sock.h> 19 20 #include "smc.h" 21 #include "smc_wr.h" 22 #include "smc_cdc.h" 23 #include "smc_tx.h" 24 25 /***************************** sndbuf producer *******************************/ 26 27 /* callback implementation for sk.sk_write_space() 28 * to wakeup sndbuf producers that blocked with smc_tx_wait_memory(). 29 * called under sk_socket lock. 30 */ 31 static void smc_tx_write_space(struct sock *sk) 32 { 33 struct socket *sock = sk->sk_socket; 34 struct smc_sock *smc = smc_sk(sk); 35 struct socket_wq *wq; 36 37 /* similar to sk_stream_write_space */ 38 if (atomic_read(&smc->conn.sndbuf_space) && sock) { 39 clear_bit(SOCK_NOSPACE, &sock->flags); 40 rcu_read_lock(); 41 wq = rcu_dereference(sk->sk_wq); 42 if (skwq_has_sleeper(wq)) 43 wake_up_interruptible_poll(&wq->wait, 44 POLLOUT | POLLWRNORM | 45 POLLWRBAND); 46 if (wq && wq->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN)) 47 sock_wake_async(wq, SOCK_WAKE_SPACE, POLL_OUT); 48 rcu_read_unlock(); 49 } 50 } 51 52 /* Wakeup sndbuf producers that blocked with smc_tx_wait_memory(). 53 * Cf. tcp_data_snd_check()=>tcp_check_space()=>tcp_new_space(). 54 */ 55 void smc_tx_sndbuf_nonfull(struct smc_sock *smc) 56 { 57 if (smc->sk.sk_socket && 58 test_bit(SOCK_NOSPACE, &smc->sk.sk_socket->flags)) 59 smc->sk.sk_write_space(&smc->sk); 60 } 61 62 /* blocks sndbuf producer until at least one byte of free space available */ 63 static int smc_tx_wait_memory(struct smc_sock *smc, int flags) 64 { 65 DEFINE_WAIT_FUNC(wait, woken_wake_function); 66 struct smc_connection *conn = &smc->conn; 67 struct sock *sk = &smc->sk; 68 bool noblock; 69 long timeo; 70 int rc = 0; 71 72 /* similar to sk_stream_wait_memory */ 73 timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT); 74 noblock = timeo ? false : true; 75 add_wait_queue(sk_sleep(sk), &wait); 76 while (1) { 77 sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk); 78 if (sk->sk_err || 79 (sk->sk_shutdown & SEND_SHUTDOWN) || 80 conn->local_tx_ctrl.conn_state_flags.peer_done_writing) { 81 rc = -EPIPE; 82 break; 83 } 84 if (conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) { 85 rc = -ECONNRESET; 86 break; 87 } 88 if (!timeo) { 89 if (noblock) 90 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 91 rc = -EAGAIN; 92 break; 93 } 94 if (signal_pending(current)) { 95 rc = sock_intr_errno(timeo); 96 break; 97 } 98 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk); 99 if (atomic_read(&conn->sndbuf_space)) 100 break; /* at least 1 byte of free space available */ 101 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 102 sk->sk_write_pending++; 103 sk_wait_event(sk, &timeo, 104 sk->sk_err || 105 (sk->sk_shutdown & SEND_SHUTDOWN) || 106 smc_cdc_rxed_any_close_or_senddone(conn) || 107 atomic_read(&conn->sndbuf_space), 108 &wait); 109 sk->sk_write_pending--; 110 } 111 remove_wait_queue(sk_sleep(sk), &wait); 112 return rc; 113 } 114 115 /* sndbuf producer: main API called by socket layer. 116 * called under sock lock. 117 */ 118 int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len) 119 { 120 size_t copylen, send_done = 0, send_remaining = len; 121 size_t chunk_len, chunk_off, chunk_len_sum; 122 struct smc_connection *conn = &smc->conn; 123 union smc_host_cursor prep; 124 struct sock *sk = &smc->sk; 125 char *sndbuf_base; 126 int tx_cnt_prep; 127 int writespace; 128 int rc, chunk; 129 130 /* This should be in poll */ 131 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk); 132 133 if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) { 134 rc = -EPIPE; 135 goto out_err; 136 } 137 138 while (msg_data_left(msg)) { 139 if (sk->sk_state == SMC_INIT) 140 return -ENOTCONN; 141 if (smc->sk.sk_shutdown & SEND_SHUTDOWN || 142 (smc->sk.sk_err == ECONNABORTED) || 143 conn->local_tx_ctrl.conn_state_flags.peer_conn_abort) 144 return -EPIPE; 145 if (smc_cdc_rxed_any_close(conn)) 146 return send_done ?: -ECONNRESET; 147 148 if (!atomic_read(&conn->sndbuf_space)) { 149 rc = smc_tx_wait_memory(smc, msg->msg_flags); 150 if (rc) { 151 if (send_done) 152 return send_done; 153 goto out_err; 154 } 155 continue; 156 } 157 158 /* initialize variables for 1st iteration of subsequent loop */ 159 /* could be just 1 byte, even after smc_tx_wait_memory above */ 160 writespace = atomic_read(&conn->sndbuf_space); 161 /* not more than what user space asked for */ 162 copylen = min_t(size_t, send_remaining, writespace); 163 /* determine start of sndbuf */ 164 sndbuf_base = conn->sndbuf_desc->cpu_addr; 165 smc_curs_write(&prep, 166 smc_curs_read(&conn->tx_curs_prep, conn), 167 conn); 168 tx_cnt_prep = prep.count; 169 /* determine chunks where to write into sndbuf */ 170 /* either unwrapped case, or 1st chunk of wrapped case */ 171 chunk_len = min_t(size_t, 172 copylen, conn->sndbuf_size - tx_cnt_prep); 173 chunk_len_sum = chunk_len; 174 chunk_off = tx_cnt_prep; 175 for (chunk = 0; chunk < 2; chunk++) { 176 rc = memcpy_from_msg(sndbuf_base + chunk_off, 177 msg, chunk_len); 178 if (rc) { 179 if (send_done) 180 return send_done; 181 goto out_err; 182 } 183 send_done += chunk_len; 184 send_remaining -= chunk_len; 185 186 if (chunk_len_sum == copylen) 187 break; /* either on 1st or 2nd iteration */ 188 /* prepare next (== 2nd) iteration */ 189 chunk_len = copylen - chunk_len; /* remainder */ 190 chunk_len_sum += chunk_len; 191 chunk_off = 0; /* modulo offset in send ring buffer */ 192 } 193 /* update cursors */ 194 smc_curs_add(conn->sndbuf_size, &prep, copylen); 195 smc_curs_write(&conn->tx_curs_prep, 196 smc_curs_read(&prep, conn), 197 conn); 198 /* increased in send tasklet smc_cdc_tx_handler() */ 199 smp_mb__before_atomic(); 200 atomic_sub(copylen, &conn->sndbuf_space); 201 /* guarantee 0 <= sndbuf_space <= sndbuf_size */ 202 smp_mb__after_atomic(); 203 /* since we just produced more new data into sndbuf, 204 * trigger sndbuf consumer: RDMA write into peer RMBE and CDC 205 */ 206 smc_tx_sndbuf_nonempty(conn); 207 } /* while (msg_data_left(msg)) */ 208 209 return send_done; 210 211 out_err: 212 rc = sk_stream_error(sk, msg->msg_flags, rc); 213 /* make sure we wake any epoll edge trigger waiter */ 214 if (unlikely(rc == -EAGAIN)) 215 sk->sk_write_space(sk); 216 return rc; 217 } 218 219 /***************************** sndbuf consumer *******************************/ 220 221 /* sndbuf consumer: actual data transfer of one target chunk with RDMA write */ 222 static int smc_tx_rdma_write(struct smc_connection *conn, int peer_rmbe_offset, 223 int num_sges, struct ib_sge sges[]) 224 { 225 struct smc_link_group *lgr = conn->lgr; 226 struct ib_send_wr *failed_wr = NULL; 227 struct ib_rdma_wr rdma_wr; 228 struct smc_link *link; 229 int rc; 230 231 memset(&rdma_wr, 0, sizeof(rdma_wr)); 232 link = &lgr->lnk[SMC_SINGLE_LINK]; 233 rdma_wr.wr.wr_id = smc_wr_tx_get_next_wr_id(link); 234 rdma_wr.wr.sg_list = sges; 235 rdma_wr.wr.num_sge = num_sges; 236 rdma_wr.wr.opcode = IB_WR_RDMA_WRITE; 237 rdma_wr.remote_addr = 238 lgr->rtokens[conn->rtoken_idx][SMC_SINGLE_LINK].dma_addr + 239 /* RMBE within RMB */ 240 ((conn->peer_conn_idx - 1) * conn->peer_rmbe_size) + 241 /* offset within RMBE */ 242 peer_rmbe_offset; 243 rdma_wr.rkey = lgr->rtokens[conn->rtoken_idx][SMC_SINGLE_LINK].rkey; 244 rc = ib_post_send(link->roce_qp, &rdma_wr.wr, &failed_wr); 245 if (rc) 246 conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1; 247 return rc; 248 } 249 250 /* sndbuf consumer */ 251 static inline void smc_tx_advance_cursors(struct smc_connection *conn, 252 union smc_host_cursor *prod, 253 union smc_host_cursor *sent, 254 size_t len) 255 { 256 smc_curs_add(conn->peer_rmbe_size, prod, len); 257 /* increased in recv tasklet smc_cdc_msg_rcv() */ 258 smp_mb__before_atomic(); 259 /* data in flight reduces usable snd_wnd */ 260 atomic_sub(len, &conn->peer_rmbe_space); 261 /* guarantee 0 <= peer_rmbe_space <= peer_rmbe_size */ 262 smp_mb__after_atomic(); 263 smc_curs_add(conn->sndbuf_size, sent, len); 264 } 265 266 /* sndbuf consumer: prepare all necessary (src&dst) chunks of data transmit; 267 * usable snd_wnd as max transmit 268 */ 269 static int smc_tx_rdma_writes(struct smc_connection *conn) 270 { 271 size_t src_off, src_len, dst_off, dst_len; /* current chunk values */ 272 size_t len, dst_len_sum, src_len_sum, dstchunk, srcchunk; 273 union smc_host_cursor sent, prep, prod, cons; 274 struct ib_sge sges[SMC_IB_MAX_SEND_SGE]; 275 struct smc_link_group *lgr = conn->lgr; 276 int to_send, rmbespace; 277 struct smc_link *link; 278 int num_sges; 279 int rc; 280 281 /* source: sndbuf */ 282 smc_curs_write(&sent, smc_curs_read(&conn->tx_curs_sent, conn), conn); 283 smc_curs_write(&prep, smc_curs_read(&conn->tx_curs_prep, conn), conn); 284 /* cf. wmem_alloc - (snd_max - snd_una) */ 285 to_send = smc_curs_diff(conn->sndbuf_size, &sent, &prep); 286 if (to_send <= 0) 287 return 0; 288 289 /* destination: RMBE */ 290 /* cf. snd_wnd */ 291 rmbespace = atomic_read(&conn->peer_rmbe_space); 292 if (rmbespace <= 0) 293 return 0; 294 smc_curs_write(&prod, 295 smc_curs_read(&conn->local_tx_ctrl.prod, conn), 296 conn); 297 smc_curs_write(&cons, 298 smc_curs_read(&conn->local_rx_ctrl.cons, conn), 299 conn); 300 301 /* if usable snd_wnd closes ask peer to advertise once it opens again */ 302 conn->local_tx_ctrl.prod_flags.write_blocked = (to_send >= rmbespace); 303 /* cf. usable snd_wnd */ 304 len = min(to_send, rmbespace); 305 306 /* initialize variables for first iteration of subsequent nested loop */ 307 link = &lgr->lnk[SMC_SINGLE_LINK]; 308 dst_off = prod.count; 309 if (prod.wrap == cons.wrap) { 310 /* the filled destination area is unwrapped, 311 * hence the available free destination space is wrapped 312 * and we need 2 destination chunks of sum len; start with 1st 313 * which is limited by what's available in sndbuf 314 */ 315 dst_len = min_t(size_t, 316 conn->peer_rmbe_size - prod.count, len); 317 } else { 318 /* the filled destination area is wrapped, 319 * hence the available free destination space is unwrapped 320 * and we need a single destination chunk of entire len 321 */ 322 dst_len = len; 323 } 324 dst_len_sum = dst_len; 325 src_off = sent.count; 326 /* dst_len determines the maximum src_len */ 327 if (sent.count + dst_len <= conn->sndbuf_size) { 328 /* unwrapped src case: single chunk of entire dst_len */ 329 src_len = dst_len; 330 } else { 331 /* wrapped src case: 2 chunks of sum dst_len; start with 1st: */ 332 src_len = conn->sndbuf_size - sent.count; 333 } 334 src_len_sum = src_len; 335 for (dstchunk = 0; dstchunk < 2; dstchunk++) { 336 num_sges = 0; 337 for (srcchunk = 0; srcchunk < 2; srcchunk++) { 338 sges[srcchunk].addr = 339 conn->sndbuf_desc->dma_addr[SMC_SINGLE_LINK] + 340 src_off; 341 sges[srcchunk].length = src_len; 342 sges[srcchunk].lkey = link->roce_pd->local_dma_lkey; 343 num_sges++; 344 src_off += src_len; 345 if (src_off >= conn->sndbuf_size) 346 src_off -= conn->sndbuf_size; 347 /* modulo in send ring */ 348 if (src_len_sum == dst_len) 349 break; /* either on 1st or 2nd iteration */ 350 /* prepare next (== 2nd) iteration */ 351 src_len = dst_len - src_len; /* remainder */ 352 src_len_sum += src_len; 353 } 354 rc = smc_tx_rdma_write(conn, dst_off, num_sges, sges); 355 if (rc) 356 return rc; 357 if (dst_len_sum == len) 358 break; /* either on 1st or 2nd iteration */ 359 /* prepare next (== 2nd) iteration */ 360 dst_off = 0; /* modulo offset in RMBE ring buffer */ 361 dst_len = len - dst_len; /* remainder */ 362 dst_len_sum += dst_len; 363 src_len = min_t(int, 364 dst_len, conn->sndbuf_size - sent.count); 365 src_len_sum = src_len; 366 } 367 368 smc_tx_advance_cursors(conn, &prod, &sent, len); 369 /* update connection's cursors with advanced local cursors */ 370 smc_curs_write(&conn->local_tx_ctrl.prod, 371 smc_curs_read(&prod, conn), 372 conn); 373 /* dst: peer RMBE */ 374 smc_curs_write(&conn->tx_curs_sent, 375 smc_curs_read(&sent, conn), 376 conn); 377 /* src: local sndbuf */ 378 379 return 0; 380 } 381 382 /* Wakeup sndbuf consumers from any context (IRQ or process) 383 * since there is more data to transmit; usable snd_wnd as max transmit 384 */ 385 int smc_tx_sndbuf_nonempty(struct smc_connection *conn) 386 { 387 struct smc_cdc_tx_pend *pend; 388 struct smc_wr_buf *wr_buf; 389 int rc; 390 391 spin_lock_bh(&conn->send_lock); 392 rc = smc_cdc_get_free_slot(&conn->lgr->lnk[SMC_SINGLE_LINK], &wr_buf, 393 &pend); 394 if (rc < 0) { 395 if (rc == -EBUSY) { 396 struct smc_sock *smc = 397 container_of(conn, struct smc_sock, conn); 398 399 if (smc->sk.sk_err == ECONNABORTED) { 400 rc = sock_error(&smc->sk); 401 goto out_unlock; 402 } 403 rc = 0; 404 schedule_work(&conn->tx_work); 405 } 406 goto out_unlock; 407 } 408 409 rc = smc_tx_rdma_writes(conn); 410 if (rc) { 411 smc_wr_tx_put_slot(&conn->lgr->lnk[SMC_SINGLE_LINK], 412 (struct smc_wr_tx_pend_priv *)pend); 413 goto out_unlock; 414 } 415 416 rc = smc_cdc_msg_send(conn, wr_buf, pend); 417 418 out_unlock: 419 spin_unlock_bh(&conn->send_lock); 420 return rc; 421 } 422 423 /* Wakeup sndbuf consumers from process context 424 * since there is more data to transmit 425 */ 426 static void smc_tx_work(struct work_struct *work) 427 { 428 struct smc_connection *conn = container_of(work, 429 struct smc_connection, 430 tx_work); 431 struct smc_sock *smc = container_of(conn, struct smc_sock, conn); 432 433 lock_sock(&smc->sk); 434 smc_tx_sndbuf_nonempty(conn); 435 release_sock(&smc->sk); 436 } 437 438 void smc_tx_consumer_update(struct smc_connection *conn) 439 { 440 union smc_host_cursor cfed, cons; 441 struct smc_cdc_tx_pend *pend; 442 struct smc_wr_buf *wr_buf; 443 int to_confirm, rc; 444 445 smc_curs_write(&cons, 446 smc_curs_read(&conn->local_tx_ctrl.cons, conn), 447 conn); 448 smc_curs_write(&cfed, 449 smc_curs_read(&conn->rx_curs_confirmed, conn), 450 conn); 451 to_confirm = smc_curs_diff(conn->rmbe_size, &cfed, &cons); 452 453 if (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req || 454 ((to_confirm > conn->rmbe_update_limit) && 455 ((to_confirm > (conn->rmbe_size / 2)) || 456 conn->local_rx_ctrl.prod_flags.write_blocked))) { 457 rc = smc_cdc_get_free_slot(&conn->lgr->lnk[SMC_SINGLE_LINK], 458 &wr_buf, &pend); 459 if (!rc) 460 rc = smc_cdc_msg_send(conn, wr_buf, pend); 461 if (rc < 0) { 462 schedule_work(&conn->tx_work); 463 return; 464 } 465 smc_curs_write(&conn->rx_curs_confirmed, 466 smc_curs_read(&conn->local_tx_ctrl.cons, conn), 467 conn); 468 conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0; 469 } 470 if (conn->local_rx_ctrl.prod_flags.write_blocked && 471 !atomic_read(&conn->bytes_to_rcv)) 472 conn->local_rx_ctrl.prod_flags.write_blocked = 0; 473 } 474 475 /***************************** send initialize *******************************/ 476 477 /* Initialize send properties on connection establishment. NB: not __init! */ 478 void smc_tx_init(struct smc_sock *smc) 479 { 480 smc->sk.sk_write_space = smc_tx_write_space; 481 INIT_WORK(&smc->conn.tx_work, smc_tx_work); 482 spin_lock_init(&smc->conn.send_lock); 483 } 484