1b2441318SGreg Kroah-Hartman // SPDX-License-Identifier: GPL-2.0 2f531a5dbSChuck Lever /* 3f531a5dbSChuck Lever * Copyright (c) 2015 Oracle. All rights reserved. 4f531a5dbSChuck Lever * 5f531a5dbSChuck Lever * Support for backward direction RPCs on RPC/RDMA. 6f531a5dbSChuck Lever */ 7f531a5dbSChuck Lever 863cae470SChuck Lever #include <linux/sunrpc/xprt.h> 963cae470SChuck Lever #include <linux/sunrpc/svc.h> 1076566773SChuck Lever #include <linux/sunrpc/svc_xprt.h> 11bd2abef3SChuck Lever #include <linux/sunrpc/svc_rdma.h> 12f531a5dbSChuck Lever 13f531a5dbSChuck Lever #include "xprt_rdma.h" 14b6e717cbSChuck Lever #include <trace/events/rpcrdma.h> 15f531a5dbSChuck Lever 16f531a5dbSChuck Lever #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 17f531a5dbSChuck Lever # define RPCDBG_FACILITY RPCDBG_TRANS 18f531a5dbSChuck Lever #endif 19f531a5dbSChuck Lever 20c8bbe0c7SChuck Lever #undef RPCRDMA_BACKCHANNEL_DEBUG 2163cae470SChuck Lever 22edb41e61SChuck Lever static int rpcrdma_bc_setup_reqs(struct rpcrdma_xprt *r_xprt, 23edb41e61SChuck Lever unsigned int count) 24f531a5dbSChuck Lever { 25edb41e61SChuck Lever struct rpc_xprt *xprt = &r_xprt->rx_xprt; 2692f4433eSChuck Lever struct rpcrdma_req *req; 27edb41e61SChuck Lever struct rpc_rqst *rqst; 28edb41e61SChuck Lever unsigned int i; 29edb41e61SChuck Lever 30edb41e61SChuck Lever for (i = 0; i < (count << 1); i++) { 31f531a5dbSChuck Lever struct rpcrdma_regbuf *rb; 32f531a5dbSChuck Lever size_t size; 33f531a5dbSChuck Lever 341769e6a8SChuck Lever req = rpcrdma_req_create(r_xprt, GFP_KERNEL); 351769e6a8SChuck Lever if (!req) 361769e6a8SChuck Lever return -ENOMEM; 37edb41e61SChuck Lever rqst = &req->rl_slot; 38edb41e61SChuck Lever 39edb41e61SChuck Lever rqst->rq_xprt = xprt; 40edb41e61SChuck Lever INIT_LIST_HEAD(&rqst->rq_bc_list); 41edb41e61SChuck Lever __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); 42f7d46681SChuck Lever spin_lock(&xprt->bc_pa_lock); 43edb41e61SChuck Lever list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); 44f7d46681SChuck Lever spin_unlock(&xprt->bc_pa_lock); 45f531a5dbSChuck Lever 4608cf2efdSChuck Lever size = r_xprt->rx_data.inline_rsize; 4713650c23SChuck Lever rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL); 48*8cec3dbaSChuck Lever if (!rb) 49f531a5dbSChuck Lever goto out_fail; 50f531a5dbSChuck Lever req->rl_sendbuf = rb; 51*8cec3dbaSChuck Lever xdr_buf_init(&rqst->rq_snd_buf, rdmab_data(rb), 5262aee0e3SChuck Lever min_t(size_t, size, PAGE_SIZE)); 53edb41e61SChuck Lever } 54f531a5dbSChuck Lever return 0; 55f531a5dbSChuck Lever 56f531a5dbSChuck Lever out_fail: 5792f4433eSChuck Lever rpcrdma_req_destroy(req); 58f531a5dbSChuck Lever return -ENOMEM; 59f531a5dbSChuck Lever } 60f531a5dbSChuck Lever 61f531a5dbSChuck Lever /** 62f531a5dbSChuck Lever * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests 63f531a5dbSChuck Lever * @xprt: transport associated with these backchannel resources 64f531a5dbSChuck Lever * @reqs: number of concurrent incoming requests to expect 65f531a5dbSChuck Lever * 66f531a5dbSChuck Lever * Returns 0 on success; otherwise a negative errno 67f531a5dbSChuck Lever */ 68f531a5dbSChuck Lever int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) 69f531a5dbSChuck Lever { 70f531a5dbSChuck Lever struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 71f531a5dbSChuck Lever int rc; 72f531a5dbSChuck Lever 73f531a5dbSChuck Lever /* The backchannel reply path returns each rpc_rqst to the 74f531a5dbSChuck Lever * bc_pa_list _after_ the reply is sent. If the server is 75f531a5dbSChuck Lever * faster than the client, it can send another backward 76f531a5dbSChuck Lever * direction request before the rpc_rqst is returned to the 77f531a5dbSChuck Lever * list. The client rejects the request in this case. 78f531a5dbSChuck Lever * 79f531a5dbSChuck Lever * Twice as many rpc_rqsts are prepared to ensure there is 80f531a5dbSChuck Lever * always an rpc_rqst available as soon as a reply is sent. 81f531a5dbSChuck Lever */ 82124fa17dSChuck Lever if (reqs > RPCRDMA_BACKWARD_WRS >> 1) 83124fa17dSChuck Lever goto out_err; 84124fa17dSChuck Lever 85edb41e61SChuck Lever rc = rpcrdma_bc_setup_reqs(r_xprt, reqs); 86f531a5dbSChuck Lever if (rc) 87f531a5dbSChuck Lever goto out_free; 88f531a5dbSChuck Lever 89edb41e61SChuck Lever r_xprt->rx_buf.rb_bc_srv_max_requests = reqs; 90fc1eb807SChuck Lever trace_xprtrdma_cb_setup(r_xprt, reqs); 91f531a5dbSChuck Lever return 0; 92f531a5dbSChuck Lever 93f531a5dbSChuck Lever out_free: 94f531a5dbSChuck Lever xprt_rdma_bc_destroy(xprt, reqs); 95f531a5dbSChuck Lever 96124fa17dSChuck Lever out_err: 97f531a5dbSChuck Lever pr_err("RPC: %s: setup backchannel transport failed\n", __func__); 98f531a5dbSChuck Lever return -ENOMEM; 99f531a5dbSChuck Lever } 100f531a5dbSChuck Lever 101f531a5dbSChuck Lever /** 1026b26cc8cSChuck Lever * xprt_rdma_bc_maxpayload - Return maximum backchannel message size 1036b26cc8cSChuck Lever * @xprt: transport 1046b26cc8cSChuck Lever * 1056b26cc8cSChuck Lever * Returns maximum size, in bytes, of a backchannel message 1066b26cc8cSChuck Lever */ 1076b26cc8cSChuck Lever size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt) 1086b26cc8cSChuck Lever { 1096b26cc8cSChuck Lever struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 1106b26cc8cSChuck Lever struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 1116b26cc8cSChuck Lever size_t maxmsg; 1126b26cc8cSChuck Lever 1136b26cc8cSChuck Lever maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize); 11462aee0e3SChuck Lever maxmsg = min_t(unsigned int, maxmsg, PAGE_SIZE); 1156b26cc8cSChuck Lever return maxmsg - RPCRDMA_HDRLEN_MIN; 1166b26cc8cSChuck Lever } 1176b26cc8cSChuck Lever 118cf73daf5SChuck Lever static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) 11983128a60SChuck Lever { 1207ec910e7SChuck Lever struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 12183128a60SChuck Lever struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 1227ec910e7SChuck Lever __be32 *p; 12383128a60SChuck Lever 1247ec910e7SChuck Lever rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0); 1257ec910e7SChuck Lever xdr_init_encode(&req->rl_stream, &req->rl_hdrbuf, 126*8cec3dbaSChuck Lever rdmab_data(req->rl_rdmabuf), rqst); 1277ec910e7SChuck Lever 1287ec910e7SChuck Lever p = xdr_reserve_space(&req->rl_stream, 28); 1297ec910e7SChuck Lever if (unlikely(!p)) 1307ec910e7SChuck Lever return -EIO; 1317ec910e7SChuck Lever *p++ = rqst->rq_xid; 1327ec910e7SChuck Lever *p++ = rpcrdma_version; 1337ec910e7SChuck Lever *p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests); 1347ec910e7SChuck Lever *p++ = rdma_msg; 1357ec910e7SChuck Lever *p++ = xdr_zero; 1367ec910e7SChuck Lever *p++ = xdr_zero; 1377ec910e7SChuck Lever *p = xdr_zero; 13883128a60SChuck Lever 139857f9acaSChuck Lever if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN, 140655fec69SChuck Lever &rqst->rq_snd_buf, rpcrdma_noch)) 14154cbd6b0SChuck Lever return -EIO; 142fc1eb807SChuck Lever 143fc1eb807SChuck Lever trace_xprtrdma_cb_reply(rqst); 144655fec69SChuck Lever return 0; 14583128a60SChuck Lever } 14683128a60SChuck Lever 14783128a60SChuck Lever /** 148cf73daf5SChuck Lever * xprt_rdma_bc_send_reply - marshal and send a backchannel reply 149cf73daf5SChuck Lever * @rqst: RPC rqst with a backchannel RPC reply in rq_snd_buf 150cf73daf5SChuck Lever * 151cf73daf5SChuck Lever * Caller holds the transport's write lock. 152cf73daf5SChuck Lever * 153cf73daf5SChuck Lever * Returns: 154cf73daf5SChuck Lever * %0 if the RPC message has been sent 155cf73daf5SChuck Lever * %-ENOTCONN if the caller should reconnect and call again 156cf73daf5SChuck Lever * %-EIO if a permanent error occurred and the request was not 157cf73daf5SChuck Lever * sent. Do not try to send this message again. 158cf73daf5SChuck Lever */ 159cf73daf5SChuck Lever int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst) 160cf73daf5SChuck Lever { 1610c0829bcSChuck Lever struct rpc_xprt *xprt = rqst->rq_xprt; 1620c0829bcSChuck Lever struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 163cf73daf5SChuck Lever struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 164cf73daf5SChuck Lever int rc; 165cf73daf5SChuck Lever 1660c0829bcSChuck Lever if (!xprt_connected(xprt)) 1670c0829bcSChuck Lever return -ENOTCONN; 168cf73daf5SChuck Lever 1690c0829bcSChuck Lever if (!xprt_request_get_cong(xprt, rqst)) 17075891f50STrond Myklebust return -EBADSLT; 17175891f50STrond Myklebust 172cf73daf5SChuck Lever rc = rpcrdma_bc_marshal_reply(rqst); 173cf73daf5SChuck Lever if (rc < 0) 174cf73daf5SChuck Lever goto failed_marshal; 175cf73daf5SChuck Lever 176cf73daf5SChuck Lever if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) 177cf73daf5SChuck Lever goto drop_connection; 178cf73daf5SChuck Lever return 0; 179cf73daf5SChuck Lever 180cf73daf5SChuck Lever failed_marshal: 181cf73daf5SChuck Lever if (rc != -ENOTCONN) 182cf73daf5SChuck Lever return rc; 183cf73daf5SChuck Lever drop_connection: 1840c0829bcSChuck Lever xprt_rdma_close(xprt); 185cf73daf5SChuck Lever return -ENOTCONN; 186cf73daf5SChuck Lever } 187cf73daf5SChuck Lever 188cf73daf5SChuck Lever /** 189f531a5dbSChuck Lever * xprt_rdma_bc_destroy - Release resources for handling backchannel requests 190f531a5dbSChuck Lever * @xprt: transport associated with these backchannel resources 191f531a5dbSChuck Lever * @reqs: number of incoming requests to destroy; ignored 192f531a5dbSChuck Lever */ 193f531a5dbSChuck Lever void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs) 194f531a5dbSChuck Lever { 195f531a5dbSChuck Lever struct rpc_rqst *rqst, *tmp; 196f531a5dbSChuck Lever 197f7d46681SChuck Lever spin_lock(&xprt->bc_pa_lock); 198f531a5dbSChuck Lever list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { 199f531a5dbSChuck Lever list_del(&rqst->rq_bc_pa_list); 200f7d46681SChuck Lever spin_unlock(&xprt->bc_pa_lock); 201f531a5dbSChuck Lever 20292f4433eSChuck Lever rpcrdma_req_destroy(rpcr_to_rdmar(rqst)); 203f531a5dbSChuck Lever 204f7d46681SChuck Lever spin_lock(&xprt->bc_pa_lock); 205f531a5dbSChuck Lever } 206f7d46681SChuck Lever spin_unlock(&xprt->bc_pa_lock); 207f531a5dbSChuck Lever } 208f531a5dbSChuck Lever 209f531a5dbSChuck Lever /** 210f531a5dbSChuck Lever * xprt_rdma_bc_free_rqst - Release a backchannel rqst 211f531a5dbSChuck Lever * @rqst: request to release 212f531a5dbSChuck Lever */ 213f531a5dbSChuck Lever void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) 214f531a5dbSChuck Lever { 2157c8d9e7cSChuck Lever struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 216f531a5dbSChuck Lever struct rpc_xprt *xprt = rqst->rq_xprt; 217f531a5dbSChuck Lever 2187c8d9e7cSChuck Lever rpcrdma_recv_buffer_put(req->rl_reply); 2197c8d9e7cSChuck Lever req->rl_reply = NULL; 220c8bbe0c7SChuck Lever 221f7d46681SChuck Lever spin_lock(&xprt->bc_pa_lock); 222f531a5dbSChuck Lever list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); 223f7d46681SChuck Lever spin_unlock(&xprt->bc_pa_lock); 224f531a5dbSChuck Lever } 22563cae470SChuck Lever 22663cae470SChuck Lever /** 22763cae470SChuck Lever * rpcrdma_bc_receive_call - Handle a backward direction call 2289ab6d89eSChuck Lever * @r_xprt: transport receiving the call 22963cae470SChuck Lever * @rep: receive buffer containing the call 23063cae470SChuck Lever * 23163cae470SChuck Lever * Operational assumptions: 23263cae470SChuck Lever * o Backchannel credits are ignored, just as the NFS server 23363cae470SChuck Lever * forechannel currently does 23463cae470SChuck Lever * o The ULP manages a replay cache (eg, NFSv4.1 sessions). 23563cae470SChuck Lever * No replay detection is done at the transport level 23663cae470SChuck Lever */ 23763cae470SChuck Lever void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, 23863cae470SChuck Lever struct rpcrdma_rep *rep) 23963cae470SChuck Lever { 24063cae470SChuck Lever struct rpc_xprt *xprt = &r_xprt->rx_xprt; 24163cae470SChuck Lever struct svc_serv *bc_serv; 24263cae470SChuck Lever struct rpcrdma_req *req; 24363cae470SChuck Lever struct rpc_rqst *rqst; 24463cae470SChuck Lever struct xdr_buf *buf; 24563cae470SChuck Lever size_t size; 24663cae470SChuck Lever __be32 *p; 24763cae470SChuck Lever 24841c8f70fSChuck Lever p = xdr_inline_decode(&rep->rr_stream, 0); 24941c8f70fSChuck Lever size = xdr_stream_remaining(&rep->rr_stream); 25041c8f70fSChuck Lever 25163cae470SChuck Lever #ifdef RPCRDMA_BACKCHANNEL_DEBUG 25263cae470SChuck Lever pr_info("RPC: %s: callback XID %08x, length=%u\n", 25341c8f70fSChuck Lever __func__, be32_to_cpup(p), size); 25441c8f70fSChuck Lever pr_info("RPC: %s: %*ph\n", __func__, size, p); 25563cae470SChuck Lever #endif 25663cae470SChuck Lever 25763cae470SChuck Lever /* Grab a free bc rqst */ 25863cae470SChuck Lever spin_lock(&xprt->bc_pa_lock); 25963cae470SChuck Lever if (list_empty(&xprt->bc_pa_list)) { 26063cae470SChuck Lever spin_unlock(&xprt->bc_pa_lock); 26163cae470SChuck Lever goto out_overflow; 26263cae470SChuck Lever } 26363cae470SChuck Lever rqst = list_first_entry(&xprt->bc_pa_list, 26463cae470SChuck Lever struct rpc_rqst, rq_bc_pa_list); 26563cae470SChuck Lever list_del(&rqst->rq_bc_pa_list); 26663cae470SChuck Lever spin_unlock(&xprt->bc_pa_lock); 26763cae470SChuck Lever 26863cae470SChuck Lever /* Prepare rqst */ 26963cae470SChuck Lever rqst->rq_reply_bytes_recvd = 0; 27041c8f70fSChuck Lever rqst->rq_xid = *p; 2719f74660bSChuck Lever 2729f74660bSChuck Lever rqst->rq_private_buf.len = size; 27363cae470SChuck Lever 27463cae470SChuck Lever buf = &rqst->rq_rcv_buf; 27563cae470SChuck Lever memset(buf, 0, sizeof(*buf)); 27663cae470SChuck Lever buf->head[0].iov_base = p; 27763cae470SChuck Lever buf->head[0].iov_len = size; 27863cae470SChuck Lever buf->len = size; 27963cae470SChuck Lever 28063cae470SChuck Lever /* The receive buffer has to be hooked to the rpcrdma_req 28141c8f70fSChuck Lever * so that it is not released while the req is pointing 28241c8f70fSChuck Lever * to its buffer, and so that it can be reposted after 28341c8f70fSChuck Lever * the Upper Layer is done decoding it. 28463cae470SChuck Lever */ 28563cae470SChuck Lever req = rpcr_to_rdmar(rqst); 28663cae470SChuck Lever req->rl_reply = rep; 287fc1eb807SChuck Lever trace_xprtrdma_cb_call(rqst); 28863cae470SChuck Lever 28963cae470SChuck Lever /* Queue rqst for ULP's callback service */ 29063cae470SChuck Lever bc_serv = xprt->bc_serv; 29163cae470SChuck Lever spin_lock(&bc_serv->sv_cb_lock); 29263cae470SChuck Lever list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list); 29363cae470SChuck Lever spin_unlock(&bc_serv->sv_cb_lock); 29463cae470SChuck Lever 29563cae470SChuck Lever wake_up(&bc_serv->sv_cb_waitq); 29663cae470SChuck Lever 29763cae470SChuck Lever r_xprt->rx_stats.bcall_count++; 29863cae470SChuck Lever return; 29963cae470SChuck Lever 30063cae470SChuck Lever out_overflow: 30163cae470SChuck Lever pr_warn("RPC/RDMA backchannel overflow\n"); 3020c0829bcSChuck Lever xprt_force_disconnect(xprt); 30363cae470SChuck Lever /* This receive buffer gets reposted automatically 30463cae470SChuck Lever * when the connection is re-established. 30563cae470SChuck Lever */ 30663cae470SChuck Lever return; 30763cae470SChuck Lever } 308