1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Copyright (c) 2015 Oracle. All rights reserved. 4 * 5 * Support for backward direction RPCs on RPC/RDMA. 6 */ 7 8 #include <linux/module.h> 9 #include <linux/sunrpc/xprt.h> 10 #include <linux/sunrpc/svc.h> 11 #include <linux/sunrpc/svc_xprt.h> 12 13 #include "xprt_rdma.h" 14 15 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 16 # define RPCDBG_FACILITY RPCDBG_TRANS 17 #endif 18 19 #undef RPCRDMA_BACKCHANNEL_DEBUG 20 21 static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt, 22 struct rpc_rqst *rqst) 23 { 24 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 25 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 26 27 spin_lock(&buf->rb_reqslock); 28 list_del(&req->rl_all); 29 spin_unlock(&buf->rb_reqslock); 30 31 rpcrdma_destroy_req(req); 32 33 kfree(rqst); 34 } 35 36 static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt, 37 struct rpc_rqst *rqst) 38 { 39 struct rpcrdma_regbuf *rb; 40 struct rpcrdma_req *req; 41 size_t size; 42 43 req = rpcrdma_create_req(r_xprt); 44 if (IS_ERR(req)) 45 return PTR_ERR(req); 46 47 rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE, 48 DMA_TO_DEVICE, GFP_KERNEL); 49 if (IS_ERR(rb)) 50 goto out_fail; 51 req->rl_rdmabuf = rb; 52 xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb)); 53 54 size = r_xprt->rx_data.inline_rsize; 55 rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL); 56 if (IS_ERR(rb)) 57 goto out_fail; 58 req->rl_sendbuf = rb; 59 xdr_buf_init(&rqst->rq_snd_buf, rb->rg_base, 60 min_t(size_t, size, PAGE_SIZE)); 61 rpcrdma_set_xprtdata(rqst, req); 62 return 0; 63 64 out_fail: 65 rpcrdma_bc_free_rqst(r_xprt, rqst); 66 return -ENOMEM; 67 } 68 69 /* Allocate and add receive buffers to the rpcrdma_buffer's 70 * existing list of rep's. These are released when the 71 * transport is destroyed. 72 */ 73 static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt, 74 unsigned int count) 75 { 76 int rc = 0; 77 78 while (count--) { 79 rc = rpcrdma_create_rep(r_xprt); 80 if (rc) 81 break; 82 } 83 return rc; 84 } 85 86 /** 87 * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests 88 * @xprt: transport associated with these backchannel resources 89 * @reqs: number of concurrent incoming requests to expect 90 * 91 * Returns 0 on success; otherwise a negative errno 92 */ 93 int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) 94 { 95 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 96 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; 97 struct rpc_rqst *rqst; 98 unsigned int i; 99 int rc; 100 101 /* The backchannel reply path returns each rpc_rqst to the 102 * bc_pa_list _after_ the reply is sent. If the server is 103 * faster than the client, it can send another backward 104 * direction request before the rpc_rqst is returned to the 105 * list. The client rejects the request in this case. 106 * 107 * Twice as many rpc_rqsts are prepared to ensure there is 108 * always an rpc_rqst available as soon as a reply is sent. 109 */ 110 if (reqs > RPCRDMA_BACKWARD_WRS >> 1) 111 goto out_err; 112 113 for (i = 0; i < (reqs << 1); i++) { 114 rqst = kzalloc(sizeof(*rqst), GFP_KERNEL); 115 if (!rqst) 116 goto out_free; 117 118 dprintk("RPC: %s: new rqst %p\n", __func__, rqst); 119 120 rqst->rq_xprt = &r_xprt->rx_xprt; 121 INIT_LIST_HEAD(&rqst->rq_list); 122 INIT_LIST_HEAD(&rqst->rq_bc_list); 123 __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); 124 125 if (rpcrdma_bc_setup_rqst(r_xprt, rqst)) 126 goto out_free; 127 128 spin_lock_bh(&xprt->bc_pa_lock); 129 list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); 130 spin_unlock_bh(&xprt->bc_pa_lock); 131 } 132 133 rc = rpcrdma_bc_setup_reps(r_xprt, reqs); 134 if (rc) 135 goto out_free; 136 137 rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs); 138 if (rc) 139 goto out_free; 140 141 buffer->rb_bc_srv_max_requests = reqs; 142 request_module("svcrdma"); 143 trace_xprtrdma_cb_setup(r_xprt, reqs); 144 return 0; 145 146 out_free: 147 xprt_rdma_bc_destroy(xprt, reqs); 148 149 out_err: 150 pr_err("RPC: %s: setup backchannel transport failed\n", __func__); 151 return -ENOMEM; 152 } 153 154 /** 155 * xprt_rdma_bc_up - Create transport endpoint for backchannel service 156 * @serv: server endpoint 157 * @net: network namespace 158 * 159 * The "xprt" is an implied argument: it supplies the name of the 160 * backchannel transport class. 161 * 162 * Returns zero on success, negative errno on failure 163 */ 164 int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net) 165 { 166 int ret; 167 168 ret = svc_create_xprt(serv, "rdma-bc", net, PF_INET, 0, 0); 169 if (ret < 0) 170 return ret; 171 return 0; 172 } 173 174 /** 175 * xprt_rdma_bc_maxpayload - Return maximum backchannel message size 176 * @xprt: transport 177 * 178 * Returns maximum size, in bytes, of a backchannel message 179 */ 180 size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt) 181 { 182 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 183 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 184 size_t maxmsg; 185 186 maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize); 187 maxmsg = min_t(unsigned int, maxmsg, PAGE_SIZE); 188 return maxmsg - RPCRDMA_HDRLEN_MIN; 189 } 190 191 static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) 192 { 193 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 194 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 195 __be32 *p; 196 197 rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0); 198 xdr_init_encode(&req->rl_stream, &req->rl_hdrbuf, 199 req->rl_rdmabuf->rg_base); 200 201 p = xdr_reserve_space(&req->rl_stream, 28); 202 if (unlikely(!p)) 203 return -EIO; 204 *p++ = rqst->rq_xid; 205 *p++ = rpcrdma_version; 206 *p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests); 207 *p++ = rdma_msg; 208 *p++ = xdr_zero; 209 *p++ = xdr_zero; 210 *p = xdr_zero; 211 212 if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN, 213 &rqst->rq_snd_buf, rpcrdma_noch)) 214 return -EIO; 215 216 trace_xprtrdma_cb_reply(rqst); 217 return 0; 218 } 219 220 /** 221 * xprt_rdma_bc_send_reply - marshal and send a backchannel reply 222 * @rqst: RPC rqst with a backchannel RPC reply in rq_snd_buf 223 * 224 * Caller holds the transport's write lock. 225 * 226 * Returns: 227 * %0 if the RPC message has been sent 228 * %-ENOTCONN if the caller should reconnect and call again 229 * %-EIO if a permanent error occurred and the request was not 230 * sent. Do not try to send this message again. 231 */ 232 int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst) 233 { 234 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 235 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 236 int rc; 237 238 if (!xprt_connected(rqst->rq_xprt)) 239 goto drop_connection; 240 241 rc = rpcrdma_bc_marshal_reply(rqst); 242 if (rc < 0) 243 goto failed_marshal; 244 245 if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) 246 goto drop_connection; 247 return 0; 248 249 failed_marshal: 250 if (rc != -ENOTCONN) 251 return rc; 252 drop_connection: 253 xprt_disconnect_done(rqst->rq_xprt); 254 return -ENOTCONN; 255 } 256 257 /** 258 * xprt_rdma_bc_destroy - Release resources for handling backchannel requests 259 * @xprt: transport associated with these backchannel resources 260 * @reqs: number of incoming requests to destroy; ignored 261 */ 262 void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs) 263 { 264 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 265 struct rpc_rqst *rqst, *tmp; 266 267 spin_lock_bh(&xprt->bc_pa_lock); 268 list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { 269 list_del(&rqst->rq_bc_pa_list); 270 spin_unlock_bh(&xprt->bc_pa_lock); 271 272 rpcrdma_bc_free_rqst(r_xprt, rqst); 273 274 spin_lock_bh(&xprt->bc_pa_lock); 275 } 276 spin_unlock_bh(&xprt->bc_pa_lock); 277 } 278 279 /** 280 * xprt_rdma_bc_free_rqst - Release a backchannel rqst 281 * @rqst: request to release 282 */ 283 void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) 284 { 285 struct rpc_xprt *xprt = rqst->rq_xprt; 286 287 dprintk("RPC: %s: freeing rqst %p (req %p)\n", 288 __func__, rqst, rpcr_to_rdmar(rqst)); 289 290 spin_lock_bh(&xprt->bc_pa_lock); 291 list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); 292 spin_unlock_bh(&xprt->bc_pa_lock); 293 } 294 295 /** 296 * rpcrdma_bc_receive_call - Handle a backward direction call 297 * @r_xprt: transport receiving the call 298 * @rep: receive buffer containing the call 299 * 300 * Operational assumptions: 301 * o Backchannel credits are ignored, just as the NFS server 302 * forechannel currently does 303 * o The ULP manages a replay cache (eg, NFSv4.1 sessions). 304 * No replay detection is done at the transport level 305 */ 306 void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, 307 struct rpcrdma_rep *rep) 308 { 309 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 310 struct svc_serv *bc_serv; 311 struct rpcrdma_req *req; 312 struct rpc_rqst *rqst; 313 struct xdr_buf *buf; 314 size_t size; 315 __be32 *p; 316 317 p = xdr_inline_decode(&rep->rr_stream, 0); 318 size = xdr_stream_remaining(&rep->rr_stream); 319 320 #ifdef RPCRDMA_BACKCHANNEL_DEBUG 321 pr_info("RPC: %s: callback XID %08x, length=%u\n", 322 __func__, be32_to_cpup(p), size); 323 pr_info("RPC: %s: %*ph\n", __func__, size, p); 324 #endif 325 326 /* Grab a free bc rqst */ 327 spin_lock(&xprt->bc_pa_lock); 328 if (list_empty(&xprt->bc_pa_list)) { 329 spin_unlock(&xprt->bc_pa_lock); 330 goto out_overflow; 331 } 332 rqst = list_first_entry(&xprt->bc_pa_list, 333 struct rpc_rqst, rq_bc_pa_list); 334 list_del(&rqst->rq_bc_pa_list); 335 spin_unlock(&xprt->bc_pa_lock); 336 337 /* Prepare rqst */ 338 rqst->rq_reply_bytes_recvd = 0; 339 rqst->rq_bytes_sent = 0; 340 rqst->rq_xid = *p; 341 342 rqst->rq_private_buf.len = size; 343 344 buf = &rqst->rq_rcv_buf; 345 memset(buf, 0, sizeof(*buf)); 346 buf->head[0].iov_base = p; 347 buf->head[0].iov_len = size; 348 buf->len = size; 349 350 /* The receive buffer has to be hooked to the rpcrdma_req 351 * so that it is not released while the req is pointing 352 * to its buffer, and so that it can be reposted after 353 * the Upper Layer is done decoding it. 354 */ 355 req = rpcr_to_rdmar(rqst); 356 req->rl_reply = rep; 357 trace_xprtrdma_cb_call(rqst); 358 359 /* Queue rqst for ULP's callback service */ 360 bc_serv = xprt->bc_serv; 361 spin_lock(&bc_serv->sv_cb_lock); 362 list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list); 363 spin_unlock(&bc_serv->sv_cb_lock); 364 365 wake_up(&bc_serv->sv_cb_waitq); 366 367 r_xprt->rx_stats.bcall_count++; 368 return; 369 370 out_overflow: 371 pr_warn("RPC/RDMA backchannel overflow\n"); 372 xprt_disconnect_done(xprt); 373 /* This receive buffer gets reposted automatically 374 * when the connection is re-established. 375 */ 376 return; 377 } 378