1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 4 (c) 2007 Network Appliance, Inc. All Rights Reserved. 5 (c) 2009 NetApp. All Rights Reserved. 6 7 8 ******************************************************************************/ 9 10 #include <linux/tcp.h> 11 #include <linux/slab.h> 12 #include <linux/sunrpc/xprt.h> 13 #include <linux/export.h> 14 #include <linux/sunrpc/bc_xprt.h> 15 16 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 17 #define RPCDBG_FACILITY RPCDBG_TRANS 18 #endif 19 20 #define BC_MAX_SLOTS 64U 21 22 unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt) 23 { 24 return BC_MAX_SLOTS; 25 } 26 27 /* 28 * Helper routines that track the number of preallocation elements 29 * on the transport. 30 */ 31 static inline int xprt_need_to_requeue(struct rpc_xprt *xprt) 32 { 33 return xprt->bc_alloc_count < xprt->bc_alloc_max; 34 } 35 36 /* 37 * Free the preallocated rpc_rqst structure and the memory 38 * buffers hanging off of it. 39 */ 40 static void xprt_free_allocation(struct rpc_rqst *req) 41 { 42 struct xdr_buf *xbufp; 43 44 dprintk("RPC: free allocations for req= %p\n", req); 45 WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state)); 46 xbufp = &req->rq_rcv_buf; 47 free_page((unsigned long)xbufp->head[0].iov_base); 48 xbufp = &req->rq_snd_buf; 49 free_page((unsigned long)xbufp->head[0].iov_base); 50 kfree(req); 51 } 52 53 static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags) 54 { 55 struct page *page; 56 /* Preallocate one XDR receive buffer */ 57 page = alloc_page(gfp_flags); 58 if (page == NULL) 59 return -ENOMEM; 60 xdr_buf_init(buf, page_address(page), PAGE_SIZE); 61 return 0; 62 } 63 64 static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt) 65 { 66 gfp_t gfp_flags = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN; 67 struct rpc_rqst *req; 68 69 /* Pre-allocate one backchannel rpc_rqst */ 70 req = kzalloc(sizeof(*req), gfp_flags); 71 if (req == NULL) 72 return NULL; 73 74 req->rq_xprt = xprt; 75 INIT_LIST_HEAD(&req->rq_bc_list); 76 77 /* Preallocate one XDR receive buffer */ 78 if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) { 79 printk(KERN_ERR "Failed to create bc receive xbuf\n"); 80 goto out_free; 81 } 82 req->rq_rcv_buf.len = PAGE_SIZE; 83 84 /* Preallocate one XDR send buffer */ 85 if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) { 86 printk(KERN_ERR "Failed to create bc snd xbuf\n"); 87 goto out_free; 88 } 89 return req; 90 out_free: 91 xprt_free_allocation(req); 92 return NULL; 93 } 94 95 /* 96 * Preallocate up to min_reqs structures and related buffers for use 97 * by the backchannel. This function can be called multiple times 98 * when creating new sessions that use the same rpc_xprt. The 99 * preallocated buffers are added to the pool of resources used by 100 * the rpc_xprt. Any one of these resources may be used by an 101 * incoming callback request. It's up to the higher levels in the 102 * stack to enforce that the maximum number of session slots is not 103 * being exceeded. 104 * 105 * Some callback arguments can be large. For example, a pNFS server 106 * using multiple deviceids. The list can be unbound, but the client 107 * has the ability to tell the server the maximum size of the callback 108 * requests. Each deviceID is 16 bytes, so allocate one page 109 * for the arguments to have enough room to receive a number of these 110 * deviceIDs. The NFS client indicates to the pNFS server that its 111 * callback requests can be up to 4096 bytes in size. 112 */ 113 int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs) 114 { 115 if (!xprt->ops->bc_setup) 116 return 0; 117 return xprt->ops->bc_setup(xprt, min_reqs); 118 } 119 EXPORT_SYMBOL_GPL(xprt_setup_backchannel); 120 121 int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs) 122 { 123 struct rpc_rqst *req; 124 struct list_head tmp_list; 125 int i; 126 127 dprintk("RPC: setup backchannel transport\n"); 128 129 if (min_reqs > BC_MAX_SLOTS) 130 min_reqs = BC_MAX_SLOTS; 131 132 /* 133 * We use a temporary list to keep track of the preallocated 134 * buffers. Once we're done building the list we splice it 135 * into the backchannel preallocation list off of the rpc_xprt 136 * struct. This helps minimize the amount of time the list 137 * lock is held on the rpc_xprt struct. It also makes cleanup 138 * easier in case of memory allocation errors. 139 */ 140 INIT_LIST_HEAD(&tmp_list); 141 for (i = 0; i < min_reqs; i++) { 142 /* Pre-allocate one backchannel rpc_rqst */ 143 req = xprt_alloc_bc_req(xprt); 144 if (req == NULL) { 145 printk(KERN_ERR "Failed to create bc rpc_rqst\n"); 146 goto out_free; 147 } 148 149 /* Add the allocated buffer to the tmp list */ 150 dprintk("RPC: adding req= %p\n", req); 151 list_add(&req->rq_bc_pa_list, &tmp_list); 152 } 153 154 /* 155 * Add the temporary list to the backchannel preallocation list 156 */ 157 spin_lock(&xprt->bc_pa_lock); 158 list_splice(&tmp_list, &xprt->bc_pa_list); 159 xprt->bc_alloc_count += min_reqs; 160 xprt->bc_alloc_max += min_reqs; 161 atomic_add(min_reqs, &xprt->bc_slot_count); 162 spin_unlock(&xprt->bc_pa_lock); 163 164 dprintk("RPC: setup backchannel transport done\n"); 165 return 0; 166 167 out_free: 168 /* 169 * Memory allocation failed, free the temporary list 170 */ 171 while (!list_empty(&tmp_list)) { 172 req = list_first_entry(&tmp_list, 173 struct rpc_rqst, 174 rq_bc_pa_list); 175 list_del(&req->rq_bc_pa_list); 176 xprt_free_allocation(req); 177 } 178 179 dprintk("RPC: setup backchannel transport failed\n"); 180 return -ENOMEM; 181 } 182 183 /** 184 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures. 185 * @xprt: the transport holding the preallocated strucures 186 * @max_reqs: the maximum number of preallocated structures to destroy 187 * 188 * Since these structures may have been allocated by multiple calls 189 * to xprt_setup_backchannel, we only destroy up to the maximum number 190 * of reqs specified by the caller. 191 */ 192 void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs) 193 { 194 if (xprt->ops->bc_destroy) 195 xprt->ops->bc_destroy(xprt, max_reqs); 196 } 197 EXPORT_SYMBOL_GPL(xprt_destroy_backchannel); 198 199 void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs) 200 { 201 struct rpc_rqst *req = NULL, *tmp = NULL; 202 203 dprintk("RPC: destroy backchannel transport\n"); 204 205 if (max_reqs == 0) 206 goto out; 207 208 spin_lock_bh(&xprt->bc_pa_lock); 209 xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max); 210 list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { 211 dprintk("RPC: req=%p\n", req); 212 list_del(&req->rq_bc_pa_list); 213 xprt_free_allocation(req); 214 xprt->bc_alloc_count--; 215 atomic_dec(&xprt->bc_slot_count); 216 if (--max_reqs == 0) 217 break; 218 } 219 spin_unlock_bh(&xprt->bc_pa_lock); 220 221 out: 222 dprintk("RPC: backchannel list empty= %s\n", 223 list_empty(&xprt->bc_pa_list) ? "true" : "false"); 224 } 225 226 static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid, 227 struct rpc_rqst *new) 228 { 229 struct rpc_rqst *req = NULL; 230 231 dprintk("RPC: allocate a backchannel request\n"); 232 if (list_empty(&xprt->bc_pa_list)) { 233 if (!new) 234 goto not_found; 235 if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS) 236 goto not_found; 237 list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list); 238 xprt->bc_alloc_count++; 239 atomic_inc(&xprt->bc_slot_count); 240 } 241 req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst, 242 rq_bc_pa_list); 243 req->rq_reply_bytes_recvd = 0; 244 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 245 sizeof(req->rq_private_buf)); 246 req->rq_xid = xid; 247 req->rq_connect_cookie = xprt->connect_cookie; 248 dprintk("RPC: backchannel req=%p\n", req); 249 not_found: 250 return req; 251 } 252 253 /* 254 * Return the preallocated rpc_rqst structure and XDR buffers 255 * associated with this rpc_task. 256 */ 257 void xprt_free_bc_request(struct rpc_rqst *req) 258 { 259 struct rpc_xprt *xprt = req->rq_xprt; 260 261 xprt->ops->bc_free_rqst(req); 262 } 263 264 void xprt_free_bc_rqst(struct rpc_rqst *req) 265 { 266 struct rpc_xprt *xprt = req->rq_xprt; 267 268 dprintk("RPC: free backchannel req=%p\n", req); 269 270 req->rq_connect_cookie = xprt->connect_cookie - 1; 271 smp_mb__before_atomic(); 272 clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); 273 smp_mb__after_atomic(); 274 275 /* 276 * Return it to the list of preallocations so that it 277 * may be reused by a new callback request. 278 */ 279 spin_lock_bh(&xprt->bc_pa_lock); 280 if (xprt_need_to_requeue(xprt)) { 281 list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list); 282 xprt->bc_alloc_count++; 283 atomic_inc(&xprt->bc_slot_count); 284 req = NULL; 285 } 286 spin_unlock_bh(&xprt->bc_pa_lock); 287 if (req != NULL) { 288 /* 289 * The last remaining session was destroyed while this 290 * entry was in use. Free the entry and don't attempt 291 * to add back to the list because there is no need to 292 * have anymore preallocated entries. 293 */ 294 dprintk("RPC: Last session removed req=%p\n", req); 295 xprt_free_allocation(req); 296 } 297 xprt_put(xprt); 298 } 299 300 /* 301 * One or more rpc_rqst structure have been preallocated during the 302 * backchannel setup. Buffer space for the send and private XDR buffers 303 * has been preallocated as well. Use xprt_alloc_bc_request to allocate 304 * to this request. Use xprt_free_bc_request to return it. 305 * 306 * We know that we're called in soft interrupt context, grab the spin_lock 307 * since there is no need to grab the bottom half spin_lock. 308 * 309 * Return an available rpc_rqst, otherwise NULL if non are available. 310 */ 311 struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid) 312 { 313 struct rpc_rqst *req, *new = NULL; 314 315 do { 316 spin_lock(&xprt->bc_pa_lock); 317 list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) { 318 if (req->rq_connect_cookie != xprt->connect_cookie) 319 continue; 320 if (req->rq_xid == xid) 321 goto found; 322 } 323 req = xprt_get_bc_request(xprt, xid, new); 324 found: 325 spin_unlock(&xprt->bc_pa_lock); 326 if (new) { 327 if (req != new) 328 xprt_free_allocation(new); 329 break; 330 } else if (req) 331 break; 332 new = xprt_alloc_bc_req(xprt); 333 } while (new); 334 return req; 335 } 336 337 /* 338 * Add callback request to callback list. The callback 339 * service sleeps on the sv_cb_waitq waiting for new 340 * requests. Wake it up after adding enqueing the 341 * request. 342 */ 343 void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied) 344 { 345 struct rpc_xprt *xprt = req->rq_xprt; 346 struct svc_serv *bc_serv = xprt->bc_serv; 347 348 spin_lock(&xprt->bc_pa_lock); 349 list_del(&req->rq_bc_pa_list); 350 xprt->bc_alloc_count--; 351 spin_unlock(&xprt->bc_pa_lock); 352 353 req->rq_private_buf.len = copied; 354 set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); 355 356 dprintk("RPC: add callback request to list\n"); 357 xprt_get(xprt); 358 spin_lock(&bc_serv->sv_cb_lock); 359 list_add(&req->rq_bc_list, &bc_serv->sv_cb_list); 360 wake_up(&bc_serv->sv_cb_waitq); 361 spin_unlock(&bc_serv->sv_cb_lock); 362 } 363