1 /* 2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the BSD-type 8 * license below: 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 17 * Redistributions in binary form must reproduce the above 18 * copyright notice, this list of conditions and the following 19 * disclaimer in the documentation and/or other materials provided 20 * with the distribution. 21 * 22 * Neither the name of the Network Appliance, Inc. nor the names of 23 * its contributors may be used to endorse or promote products 24 * derived from this software without specific prior written 25 * permission. 26 * 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 */ 39 40 /* 41 * verbs.c 42 * 43 * Encapsulates the major functions managing: 44 * o adapters 45 * o endpoints 46 * o connections 47 * o buffer memory 48 */ 49 50 #include <linux/interrupt.h> 51 #include <linux/slab.h> 52 #include <linux/prefetch.h> 53 #include <linux/sunrpc/addr.h> 54 #include <asm/bitops.h> 55 #include <linux/module.h> /* try_module_get()/module_put() */ 56 57 #include "xprt_rdma.h" 58 59 /* 60 * Globals/Macros 61 */ 62 63 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 64 # define RPCDBG_FACILITY RPCDBG_TRANS 65 #endif 66 67 /* 68 * internal functions 69 */ 70 71 static struct workqueue_struct *rpcrdma_receive_wq; 72 73 int 74 rpcrdma_alloc_wq(void) 75 { 76 struct workqueue_struct *recv_wq; 77 78 recv_wq = alloc_workqueue("xprtrdma_receive", 79 WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI, 80 0); 81 if (!recv_wq) 82 return -ENOMEM; 83 84 rpcrdma_receive_wq = recv_wq; 85 return 0; 86 } 87 88 void 89 rpcrdma_destroy_wq(void) 90 { 91 struct workqueue_struct *wq; 92 93 if (rpcrdma_receive_wq) { 94 wq = rpcrdma_receive_wq; 95 rpcrdma_receive_wq = NULL; 96 destroy_workqueue(wq); 97 } 98 } 99 100 static void 101 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) 102 { 103 struct rpcrdma_ep *ep = context; 104 105 pr_err("RPC: %s: %s on device %s ep %p\n", 106 __func__, ib_event_msg(event->event), 107 event->device->name, context); 108 if (ep->rep_connected == 1) { 109 ep->rep_connected = -EIO; 110 rpcrdma_conn_func(ep); 111 wake_up_all(&ep->rep_connect_wait); 112 } 113 } 114 115 /** 116 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC 117 * @cq: completion queue (ignored) 118 * @wc: completed WR 119 * 120 */ 121 static void 122 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 123 { 124 /* WARNING: Only wr_cqe and status are reliable at this point */ 125 if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) 126 pr_err("rpcrdma: Send: %s (%u/0x%x)\n", 127 ib_wc_status_msg(wc->status), 128 wc->status, wc->vendor_err); 129 } 130 131 static void 132 rpcrdma_receive_worker(struct work_struct *work) 133 { 134 struct rpcrdma_rep *rep = 135 container_of(work, struct rpcrdma_rep, rr_work); 136 137 rpcrdma_reply_handler(rep); 138 } 139 140 /* Perform basic sanity checking to avoid using garbage 141 * to update the credit grant value. 142 */ 143 static void 144 rpcrdma_update_granted_credits(struct rpcrdma_rep *rep) 145 { 146 struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf); 147 struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf; 148 u32 credits; 149 150 if (rep->rr_len < RPCRDMA_HDRLEN_ERR) 151 return; 152 153 credits = be32_to_cpu(rmsgp->rm_credit); 154 if (credits == 0) 155 credits = 1; /* don't deadlock */ 156 else if (credits > buffer->rb_max_requests) 157 credits = buffer->rb_max_requests; 158 159 atomic_set(&buffer->rb_credits, credits); 160 } 161 162 /** 163 * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC 164 * @cq: completion queue (ignored) 165 * @wc: completed WR 166 * 167 */ 168 static void 169 rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc) 170 { 171 struct ib_cqe *cqe = wc->wr_cqe; 172 struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, 173 rr_cqe); 174 175 /* WARNING: Only wr_id and status are reliable at this point */ 176 if (wc->status != IB_WC_SUCCESS) 177 goto out_fail; 178 179 /* status == SUCCESS means all fields in wc are trustworthy */ 180 if (wc->opcode != IB_WC_RECV) 181 return; 182 183 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n", 184 __func__, rep, wc->byte_len); 185 186 rep->rr_len = wc->byte_len; 187 ib_dma_sync_single_for_cpu(rep->rr_device, 188 rdmab_addr(rep->rr_rdmabuf), 189 rep->rr_len, DMA_FROM_DEVICE); 190 191 rpcrdma_update_granted_credits(rep); 192 193 out_schedule: 194 queue_work(rpcrdma_receive_wq, &rep->rr_work); 195 return; 196 197 out_fail: 198 if (wc->status != IB_WC_WR_FLUSH_ERR) 199 pr_err("rpcrdma: Recv: %s (%u/0x%x)\n", 200 ib_wc_status_msg(wc->status), 201 wc->status, wc->vendor_err); 202 rep->rr_len = RPCRDMA_BAD_LEN; 203 goto out_schedule; 204 } 205 206 static void 207 rpcrdma_flush_cqs(struct rpcrdma_ep *ep) 208 { 209 struct ib_wc wc; 210 211 while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0) 212 rpcrdma_receive_wc(NULL, &wc); 213 } 214 215 static int 216 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) 217 { 218 struct rpcrdma_xprt *xprt = id->context; 219 struct rpcrdma_ia *ia = &xprt->rx_ia; 220 struct rpcrdma_ep *ep = &xprt->rx_ep; 221 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 222 struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr; 223 #endif 224 struct ib_qp_attr *attr = &ia->ri_qp_attr; 225 struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr; 226 int connstate = 0; 227 228 switch (event->event) { 229 case RDMA_CM_EVENT_ADDR_RESOLVED: 230 case RDMA_CM_EVENT_ROUTE_RESOLVED: 231 ia->ri_async_rc = 0; 232 complete(&ia->ri_done); 233 break; 234 case RDMA_CM_EVENT_ADDR_ERROR: 235 ia->ri_async_rc = -EHOSTUNREACH; 236 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n", 237 __func__, ep); 238 complete(&ia->ri_done); 239 break; 240 case RDMA_CM_EVENT_ROUTE_ERROR: 241 ia->ri_async_rc = -ENETUNREACH; 242 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n", 243 __func__, ep); 244 complete(&ia->ri_done); 245 break; 246 case RDMA_CM_EVENT_ESTABLISHED: 247 connstate = 1; 248 ib_query_qp(ia->ri_id->qp, attr, 249 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC, 250 iattr); 251 dprintk("RPC: %s: %d responder resources" 252 " (%d initiator)\n", 253 __func__, attr->max_dest_rd_atomic, 254 attr->max_rd_atomic); 255 goto connected; 256 case RDMA_CM_EVENT_CONNECT_ERROR: 257 connstate = -ENOTCONN; 258 goto connected; 259 case RDMA_CM_EVENT_UNREACHABLE: 260 connstate = -ENETDOWN; 261 goto connected; 262 case RDMA_CM_EVENT_REJECTED: 263 connstate = -ECONNREFUSED; 264 goto connected; 265 case RDMA_CM_EVENT_DISCONNECTED: 266 connstate = -ECONNABORTED; 267 goto connected; 268 case RDMA_CM_EVENT_DEVICE_REMOVAL: 269 connstate = -ENODEV; 270 connected: 271 dprintk("RPC: %s: %sconnected\n", 272 __func__, connstate > 0 ? "" : "dis"); 273 atomic_set(&xprt->rx_buf.rb_credits, 1); 274 ep->rep_connected = connstate; 275 rpcrdma_conn_func(ep); 276 wake_up_all(&ep->rep_connect_wait); 277 /*FALLTHROUGH*/ 278 default: 279 dprintk("RPC: %s: %pIS:%u (ep 0x%p): %s\n", 280 __func__, sap, rpc_get_port(sap), ep, 281 rdma_event_msg(event->event)); 282 break; 283 } 284 285 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 286 if (connstate == 1) { 287 int ird = attr->max_dest_rd_atomic; 288 int tird = ep->rep_remote_cma.responder_resources; 289 290 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n", 291 sap, rpc_get_port(sap), 292 ia->ri_device->name, 293 ia->ri_ops->ro_displayname, 294 xprt->rx_buf.rb_max_requests, 295 ird, ird < 4 && ird < tird / 2 ? " (low!)" : ""); 296 } else if (connstate < 0) { 297 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n", 298 sap, rpc_get_port(sap), connstate); 299 } 300 #endif 301 302 return 0; 303 } 304 305 static void rpcrdma_destroy_id(struct rdma_cm_id *id) 306 { 307 if (id) { 308 module_put(id->device->owner); 309 rdma_destroy_id(id); 310 } 311 } 312 313 static struct rdma_cm_id * 314 rpcrdma_create_id(struct rpcrdma_xprt *xprt, 315 struct rpcrdma_ia *ia, struct sockaddr *addr) 316 { 317 struct rdma_cm_id *id; 318 int rc; 319 320 init_completion(&ia->ri_done); 321 322 id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, 323 IB_QPT_RC); 324 if (IS_ERR(id)) { 325 rc = PTR_ERR(id); 326 dprintk("RPC: %s: rdma_create_id() failed %i\n", 327 __func__, rc); 328 return id; 329 } 330 331 ia->ri_async_rc = -ETIMEDOUT; 332 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT); 333 if (rc) { 334 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n", 335 __func__, rc); 336 goto out; 337 } 338 wait_for_completion_interruptible_timeout(&ia->ri_done, 339 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1); 340 341 /* FIXME: 342 * Until xprtrdma supports DEVICE_REMOVAL, the provider must 343 * be pinned while there are active NFS/RDMA mounts to prevent 344 * hangs and crashes at umount time. 345 */ 346 if (!ia->ri_async_rc && !try_module_get(id->device->owner)) { 347 dprintk("RPC: %s: Failed to get device module\n", 348 __func__); 349 ia->ri_async_rc = -ENODEV; 350 } 351 rc = ia->ri_async_rc; 352 if (rc) 353 goto out; 354 355 ia->ri_async_rc = -ETIMEDOUT; 356 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT); 357 if (rc) { 358 dprintk("RPC: %s: rdma_resolve_route() failed %i\n", 359 __func__, rc); 360 goto put; 361 } 362 wait_for_completion_interruptible_timeout(&ia->ri_done, 363 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1); 364 rc = ia->ri_async_rc; 365 if (rc) 366 goto put; 367 368 return id; 369 put: 370 module_put(id->device->owner); 371 out: 372 rdma_destroy_id(id); 373 return ERR_PTR(rc); 374 } 375 376 /* 377 * Drain any cq, prior to teardown. 378 */ 379 static void 380 rpcrdma_clean_cq(struct ib_cq *cq) 381 { 382 struct ib_wc wc; 383 int count = 0; 384 385 while (1 == ib_poll_cq(cq, 1, &wc)) 386 ++count; 387 388 if (count) 389 dprintk("RPC: %s: flushed %d events (last 0x%x)\n", 390 __func__, count, wc.opcode); 391 } 392 393 /* 394 * Exported functions. 395 */ 396 397 /* 398 * Open and initialize an Interface Adapter. 399 * o initializes fields of struct rpcrdma_ia, including 400 * interface and provider attributes and protection zone. 401 */ 402 int 403 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) 404 { 405 struct rpcrdma_ia *ia = &xprt->rx_ia; 406 int rc; 407 408 ia->ri_dma_mr = NULL; 409 410 ia->ri_id = rpcrdma_create_id(xprt, ia, addr); 411 if (IS_ERR(ia->ri_id)) { 412 rc = PTR_ERR(ia->ri_id); 413 goto out1; 414 } 415 ia->ri_device = ia->ri_id->device; 416 417 ia->ri_pd = ib_alloc_pd(ia->ri_device); 418 if (IS_ERR(ia->ri_pd)) { 419 rc = PTR_ERR(ia->ri_pd); 420 dprintk("RPC: %s: ib_alloc_pd() failed %i\n", 421 __func__, rc); 422 goto out2; 423 } 424 425 if (memreg == RPCRDMA_FRMR) { 426 if (!(ia->ri_device->attrs.device_cap_flags & 427 IB_DEVICE_MEM_MGT_EXTENSIONS) || 428 (ia->ri_device->attrs.max_fast_reg_page_list_len == 0)) { 429 dprintk("RPC: %s: FRMR registration " 430 "not supported by HCA\n", __func__); 431 memreg = RPCRDMA_MTHCAFMR; 432 } 433 } 434 if (memreg == RPCRDMA_MTHCAFMR) { 435 if (!ia->ri_device->alloc_fmr) { 436 dprintk("RPC: %s: MTHCAFMR registration " 437 "not supported by HCA\n", __func__); 438 rc = -EINVAL; 439 goto out3; 440 } 441 } 442 443 switch (memreg) { 444 case RPCRDMA_FRMR: 445 ia->ri_ops = &rpcrdma_frwr_memreg_ops; 446 break; 447 case RPCRDMA_ALLPHYSICAL: 448 ia->ri_ops = &rpcrdma_physical_memreg_ops; 449 break; 450 case RPCRDMA_MTHCAFMR: 451 ia->ri_ops = &rpcrdma_fmr_memreg_ops; 452 break; 453 default: 454 printk(KERN_ERR "RPC: Unsupported memory " 455 "registration mode: %d\n", memreg); 456 rc = -ENOMEM; 457 goto out3; 458 } 459 dprintk("RPC: %s: memory registration strategy is '%s'\n", 460 __func__, ia->ri_ops->ro_displayname); 461 462 rwlock_init(&ia->ri_qplock); 463 return 0; 464 465 out3: 466 ib_dealloc_pd(ia->ri_pd); 467 ia->ri_pd = NULL; 468 out2: 469 rpcrdma_destroy_id(ia->ri_id); 470 ia->ri_id = NULL; 471 out1: 472 return rc; 473 } 474 475 /* 476 * Clean up/close an IA. 477 * o if event handles and PD have been initialized, free them. 478 * o close the IA 479 */ 480 void 481 rpcrdma_ia_close(struct rpcrdma_ia *ia) 482 { 483 dprintk("RPC: %s: entering\n", __func__); 484 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { 485 if (ia->ri_id->qp) 486 rdma_destroy_qp(ia->ri_id); 487 rpcrdma_destroy_id(ia->ri_id); 488 ia->ri_id = NULL; 489 } 490 491 /* If the pd is still busy, xprtrdma missed freeing a resource */ 492 if (ia->ri_pd && !IS_ERR(ia->ri_pd)) 493 ib_dealloc_pd(ia->ri_pd); 494 } 495 496 /* 497 * Create unconnected endpoint. 498 */ 499 int 500 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, 501 struct rpcrdma_create_data_internal *cdata) 502 { 503 struct ib_cq *sendcq, *recvcq; 504 unsigned int max_qp_wr; 505 int rc; 506 507 if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) { 508 dprintk("RPC: %s: insufficient sge's available\n", 509 __func__); 510 return -ENOMEM; 511 } 512 513 if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) { 514 dprintk("RPC: %s: insufficient wqe's available\n", 515 __func__); 516 return -ENOMEM; 517 } 518 max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS; 519 520 /* check provider's send/recv wr limits */ 521 if (cdata->max_requests > max_qp_wr) 522 cdata->max_requests = max_qp_wr; 523 524 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; 525 ep->rep_attr.qp_context = ep; 526 ep->rep_attr.srq = NULL; 527 ep->rep_attr.cap.max_send_wr = cdata->max_requests; 528 ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; 529 rc = ia->ri_ops->ro_open(ia, ep, cdata); 530 if (rc) 531 return rc; 532 ep->rep_attr.cap.max_recv_wr = cdata->max_requests; 533 ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; 534 ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS; 535 ep->rep_attr.cap.max_recv_sge = 1; 536 ep->rep_attr.cap.max_inline_data = 0; 537 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 538 ep->rep_attr.qp_type = IB_QPT_RC; 539 ep->rep_attr.port_num = ~0; 540 541 dprintk("RPC: %s: requested max: dtos: send %d recv %d; " 542 "iovs: send %d recv %d\n", 543 __func__, 544 ep->rep_attr.cap.max_send_wr, 545 ep->rep_attr.cap.max_recv_wr, 546 ep->rep_attr.cap.max_send_sge, 547 ep->rep_attr.cap.max_recv_sge); 548 549 /* set trigger for requesting send completion */ 550 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1; 551 if (ep->rep_cqinit <= 2) 552 ep->rep_cqinit = 0; /* always signal? */ 553 INIT_CQCOUNT(ep); 554 init_waitqueue_head(&ep->rep_connect_wait); 555 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); 556 557 sendcq = ib_alloc_cq(ia->ri_device, NULL, 558 ep->rep_attr.cap.max_send_wr + 1, 559 0, IB_POLL_SOFTIRQ); 560 if (IS_ERR(sendcq)) { 561 rc = PTR_ERR(sendcq); 562 dprintk("RPC: %s: failed to create send CQ: %i\n", 563 __func__, rc); 564 goto out1; 565 } 566 567 recvcq = ib_alloc_cq(ia->ri_device, NULL, 568 ep->rep_attr.cap.max_recv_wr + 1, 569 0, IB_POLL_SOFTIRQ); 570 if (IS_ERR(recvcq)) { 571 rc = PTR_ERR(recvcq); 572 dprintk("RPC: %s: failed to create recv CQ: %i\n", 573 __func__, rc); 574 goto out2; 575 } 576 577 ep->rep_attr.send_cq = sendcq; 578 ep->rep_attr.recv_cq = recvcq; 579 580 /* Initialize cma parameters */ 581 582 /* RPC/RDMA does not use private data */ 583 ep->rep_remote_cma.private_data = NULL; 584 ep->rep_remote_cma.private_data_len = 0; 585 586 /* Client offers RDMA Read but does not initiate */ 587 ep->rep_remote_cma.initiator_depth = 0; 588 if (ia->ri_device->attrs.max_qp_rd_atom > 32) /* arbitrary but <= 255 */ 589 ep->rep_remote_cma.responder_resources = 32; 590 else 591 ep->rep_remote_cma.responder_resources = 592 ia->ri_device->attrs.max_qp_rd_atom; 593 594 ep->rep_remote_cma.retry_count = 7; 595 ep->rep_remote_cma.flow_control = 0; 596 ep->rep_remote_cma.rnr_retry_count = 0; 597 598 return 0; 599 600 out2: 601 ib_free_cq(sendcq); 602 out1: 603 if (ia->ri_dma_mr) 604 ib_dereg_mr(ia->ri_dma_mr); 605 return rc; 606 } 607 608 /* 609 * rpcrdma_ep_destroy 610 * 611 * Disconnect and destroy endpoint. After this, the only 612 * valid operations on the ep are to free it (if dynamically 613 * allocated) or re-create it. 614 */ 615 void 616 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 617 { 618 int rc; 619 620 dprintk("RPC: %s: entering, connected is %d\n", 621 __func__, ep->rep_connected); 622 623 cancel_delayed_work_sync(&ep->rep_connect_worker); 624 625 if (ia->ri_id->qp) 626 rpcrdma_ep_disconnect(ep, ia); 627 628 rpcrdma_clean_cq(ep->rep_attr.recv_cq); 629 rpcrdma_clean_cq(ep->rep_attr.send_cq); 630 631 if (ia->ri_id->qp) { 632 rdma_destroy_qp(ia->ri_id); 633 ia->ri_id->qp = NULL; 634 } 635 636 ib_free_cq(ep->rep_attr.recv_cq); 637 ib_free_cq(ep->rep_attr.send_cq); 638 639 if (ia->ri_dma_mr) { 640 rc = ib_dereg_mr(ia->ri_dma_mr); 641 dprintk("RPC: %s: ib_dereg_mr returned %i\n", 642 __func__, rc); 643 } 644 } 645 646 /* 647 * Connect unconnected endpoint. 648 */ 649 int 650 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 651 { 652 struct rdma_cm_id *id, *old; 653 int rc = 0; 654 int retry_count = 0; 655 656 if (ep->rep_connected != 0) { 657 struct rpcrdma_xprt *xprt; 658 retry: 659 dprintk("RPC: %s: reconnecting...\n", __func__); 660 661 rpcrdma_ep_disconnect(ep, ia); 662 rpcrdma_flush_cqs(ep); 663 664 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); 665 id = rpcrdma_create_id(xprt, ia, 666 (struct sockaddr *)&xprt->rx_data.addr); 667 if (IS_ERR(id)) { 668 rc = -EHOSTUNREACH; 669 goto out; 670 } 671 /* TEMP TEMP TEMP - fail if new device: 672 * Deregister/remarshal *all* requests! 673 * Close and recreate adapter, pd, etc! 674 * Re-determine all attributes still sane! 675 * More stuff I haven't thought of! 676 * Rrrgh! 677 */ 678 if (ia->ri_device != id->device) { 679 printk("RPC: %s: can't reconnect on " 680 "different device!\n", __func__); 681 rpcrdma_destroy_id(id); 682 rc = -ENETUNREACH; 683 goto out; 684 } 685 /* END TEMP */ 686 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); 687 if (rc) { 688 dprintk("RPC: %s: rdma_create_qp failed %i\n", 689 __func__, rc); 690 rpcrdma_destroy_id(id); 691 rc = -ENETUNREACH; 692 goto out; 693 } 694 695 write_lock(&ia->ri_qplock); 696 old = ia->ri_id; 697 ia->ri_id = id; 698 write_unlock(&ia->ri_qplock); 699 700 rdma_destroy_qp(old); 701 rpcrdma_destroy_id(old); 702 } else { 703 dprintk("RPC: %s: connecting...\n", __func__); 704 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 705 if (rc) { 706 dprintk("RPC: %s: rdma_create_qp failed %i\n", 707 __func__, rc); 708 /* do not update ep->rep_connected */ 709 return -ENETUNREACH; 710 } 711 } 712 713 ep->rep_connected = 0; 714 715 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); 716 if (rc) { 717 dprintk("RPC: %s: rdma_connect() failed with %i\n", 718 __func__, rc); 719 goto out; 720 } 721 722 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0); 723 724 /* 725 * Check state. A non-peer reject indicates no listener 726 * (ECONNREFUSED), which may be a transient state. All 727 * others indicate a transport condition which has already 728 * undergone a best-effort. 729 */ 730 if (ep->rep_connected == -ECONNREFUSED && 731 ++retry_count <= RDMA_CONNECT_RETRY_MAX) { 732 dprintk("RPC: %s: non-peer_reject, retry\n", __func__); 733 goto retry; 734 } 735 if (ep->rep_connected <= 0) { 736 /* Sometimes, the only way to reliably connect to remote 737 * CMs is to use same nonzero values for ORD and IRD. */ 738 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 && 739 (ep->rep_remote_cma.responder_resources == 0 || 740 ep->rep_remote_cma.initiator_depth != 741 ep->rep_remote_cma.responder_resources)) { 742 if (ep->rep_remote_cma.responder_resources == 0) 743 ep->rep_remote_cma.responder_resources = 1; 744 ep->rep_remote_cma.initiator_depth = 745 ep->rep_remote_cma.responder_resources; 746 goto retry; 747 } 748 rc = ep->rep_connected; 749 } else { 750 struct rpcrdma_xprt *r_xprt; 751 unsigned int extras; 752 753 dprintk("RPC: %s: connected\n", __func__); 754 755 r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); 756 extras = r_xprt->rx_buf.rb_bc_srv_max_requests; 757 758 if (extras) { 759 rc = rpcrdma_ep_post_extra_recv(r_xprt, extras); 760 if (rc) { 761 pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n", 762 __func__, rc); 763 rc = 0; 764 } 765 } 766 } 767 768 out: 769 if (rc) 770 ep->rep_connected = rc; 771 return rc; 772 } 773 774 /* 775 * rpcrdma_ep_disconnect 776 * 777 * This is separate from destroy to facilitate the ability 778 * to reconnect without recreating the endpoint. 779 * 780 * This call is not reentrant, and must not be made in parallel 781 * on the same endpoint. 782 */ 783 void 784 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 785 { 786 int rc; 787 788 rpcrdma_flush_cqs(ep); 789 rc = rdma_disconnect(ia->ri_id); 790 if (!rc) { 791 /* returns without wait if not connected */ 792 wait_event_interruptible(ep->rep_connect_wait, 793 ep->rep_connected != 1); 794 dprintk("RPC: %s: after wait, %sconnected\n", __func__, 795 (ep->rep_connected == 1) ? "still " : "dis"); 796 } else { 797 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc); 798 ep->rep_connected = rc; 799 } 800 } 801 802 struct rpcrdma_req * 803 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) 804 { 805 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; 806 struct rpcrdma_req *req; 807 808 req = kzalloc(sizeof(*req), GFP_KERNEL); 809 if (req == NULL) 810 return ERR_PTR(-ENOMEM); 811 812 INIT_LIST_HEAD(&req->rl_free); 813 spin_lock(&buffer->rb_reqslock); 814 list_add(&req->rl_all, &buffer->rb_allreqs); 815 spin_unlock(&buffer->rb_reqslock); 816 req->rl_cqe.done = rpcrdma_wc_send; 817 req->rl_buffer = &r_xprt->rx_buf; 818 return req; 819 } 820 821 struct rpcrdma_rep * 822 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) 823 { 824 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 825 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 826 struct rpcrdma_rep *rep; 827 int rc; 828 829 rc = -ENOMEM; 830 rep = kzalloc(sizeof(*rep), GFP_KERNEL); 831 if (rep == NULL) 832 goto out; 833 834 rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize, 835 GFP_KERNEL); 836 if (IS_ERR(rep->rr_rdmabuf)) { 837 rc = PTR_ERR(rep->rr_rdmabuf); 838 goto out_free; 839 } 840 841 rep->rr_device = ia->ri_device; 842 rep->rr_cqe.done = rpcrdma_receive_wc; 843 rep->rr_rxprt = r_xprt; 844 INIT_WORK(&rep->rr_work, rpcrdma_receive_worker); 845 return rep; 846 847 out_free: 848 kfree(rep); 849 out: 850 return ERR_PTR(rc); 851 } 852 853 int 854 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) 855 { 856 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 857 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 858 int i, rc; 859 860 buf->rb_max_requests = r_xprt->rx_data.max_requests; 861 buf->rb_bc_srv_max_requests = 0; 862 spin_lock_init(&buf->rb_lock); 863 atomic_set(&buf->rb_credits, 1); 864 865 rc = ia->ri_ops->ro_init(r_xprt); 866 if (rc) 867 goto out; 868 869 INIT_LIST_HEAD(&buf->rb_send_bufs); 870 INIT_LIST_HEAD(&buf->rb_allreqs); 871 spin_lock_init(&buf->rb_reqslock); 872 for (i = 0; i < buf->rb_max_requests; i++) { 873 struct rpcrdma_req *req; 874 875 req = rpcrdma_create_req(r_xprt); 876 if (IS_ERR(req)) { 877 dprintk("RPC: %s: request buffer %d alloc" 878 " failed\n", __func__, i); 879 rc = PTR_ERR(req); 880 goto out; 881 } 882 req->rl_backchannel = false; 883 list_add(&req->rl_free, &buf->rb_send_bufs); 884 } 885 886 INIT_LIST_HEAD(&buf->rb_recv_bufs); 887 for (i = 0; i < buf->rb_max_requests + 2; i++) { 888 struct rpcrdma_rep *rep; 889 890 rep = rpcrdma_create_rep(r_xprt); 891 if (IS_ERR(rep)) { 892 dprintk("RPC: %s: reply buffer %d alloc failed\n", 893 __func__, i); 894 rc = PTR_ERR(rep); 895 goto out; 896 } 897 list_add(&rep->rr_list, &buf->rb_recv_bufs); 898 } 899 900 return 0; 901 out: 902 rpcrdma_buffer_destroy(buf); 903 return rc; 904 } 905 906 static struct rpcrdma_req * 907 rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf) 908 { 909 struct rpcrdma_req *req; 910 911 req = list_first_entry(&buf->rb_send_bufs, 912 struct rpcrdma_req, rl_free); 913 list_del(&req->rl_free); 914 return req; 915 } 916 917 static struct rpcrdma_rep * 918 rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) 919 { 920 struct rpcrdma_rep *rep; 921 922 rep = list_first_entry(&buf->rb_recv_bufs, 923 struct rpcrdma_rep, rr_list); 924 list_del(&rep->rr_list); 925 return rep; 926 } 927 928 static void 929 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep) 930 { 931 rpcrdma_free_regbuf(ia, rep->rr_rdmabuf); 932 kfree(rep); 933 } 934 935 void 936 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) 937 { 938 rpcrdma_free_regbuf(ia, req->rl_sendbuf); 939 rpcrdma_free_regbuf(ia, req->rl_rdmabuf); 940 kfree(req); 941 } 942 943 void 944 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) 945 { 946 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 947 948 while (!list_empty(&buf->rb_recv_bufs)) { 949 struct rpcrdma_rep *rep; 950 951 rep = rpcrdma_buffer_get_rep_locked(buf); 952 rpcrdma_destroy_rep(ia, rep); 953 } 954 955 spin_lock(&buf->rb_reqslock); 956 while (!list_empty(&buf->rb_allreqs)) { 957 struct rpcrdma_req *req; 958 959 req = list_first_entry(&buf->rb_allreqs, 960 struct rpcrdma_req, rl_all); 961 list_del(&req->rl_all); 962 963 spin_unlock(&buf->rb_reqslock); 964 rpcrdma_destroy_req(ia, req); 965 spin_lock(&buf->rb_reqslock); 966 } 967 spin_unlock(&buf->rb_reqslock); 968 969 ia->ri_ops->ro_destroy(buf); 970 } 971 972 struct rpcrdma_mw * 973 rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt) 974 { 975 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 976 struct rpcrdma_mw *mw = NULL; 977 978 spin_lock(&buf->rb_mwlock); 979 if (!list_empty(&buf->rb_mws)) { 980 mw = list_first_entry(&buf->rb_mws, 981 struct rpcrdma_mw, mw_list); 982 list_del_init(&mw->mw_list); 983 } 984 spin_unlock(&buf->rb_mwlock); 985 986 if (!mw) 987 pr_err("RPC: %s: no MWs available\n", __func__); 988 return mw; 989 } 990 991 void 992 rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) 993 { 994 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 995 996 spin_lock(&buf->rb_mwlock); 997 list_add_tail(&mw->mw_list, &buf->rb_mws); 998 spin_unlock(&buf->rb_mwlock); 999 } 1000 1001 /* 1002 * Get a set of request/reply buffers. 1003 * 1004 * Reply buffer (if available) is attached to send buffer upon return. 1005 */ 1006 struct rpcrdma_req * 1007 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) 1008 { 1009 struct rpcrdma_req *req; 1010 1011 spin_lock(&buffers->rb_lock); 1012 if (list_empty(&buffers->rb_send_bufs)) 1013 goto out_reqbuf; 1014 req = rpcrdma_buffer_get_req_locked(buffers); 1015 if (list_empty(&buffers->rb_recv_bufs)) 1016 goto out_repbuf; 1017 req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers); 1018 spin_unlock(&buffers->rb_lock); 1019 return req; 1020 1021 out_reqbuf: 1022 spin_unlock(&buffers->rb_lock); 1023 pr_warn("RPC: %s: out of request buffers\n", __func__); 1024 return NULL; 1025 out_repbuf: 1026 spin_unlock(&buffers->rb_lock); 1027 pr_warn("RPC: %s: out of reply buffers\n", __func__); 1028 req->rl_reply = NULL; 1029 return req; 1030 } 1031 1032 /* 1033 * Put request/reply buffers back into pool. 1034 * Pre-decrement counter/array index. 1035 */ 1036 void 1037 rpcrdma_buffer_put(struct rpcrdma_req *req) 1038 { 1039 struct rpcrdma_buffer *buffers = req->rl_buffer; 1040 struct rpcrdma_rep *rep = req->rl_reply; 1041 1042 req->rl_niovs = 0; 1043 req->rl_reply = NULL; 1044 1045 spin_lock(&buffers->rb_lock); 1046 list_add_tail(&req->rl_free, &buffers->rb_send_bufs); 1047 if (rep) 1048 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1049 spin_unlock(&buffers->rb_lock); 1050 } 1051 1052 /* 1053 * Recover reply buffers from pool. 1054 * This happens when recovering from disconnect. 1055 */ 1056 void 1057 rpcrdma_recv_buffer_get(struct rpcrdma_req *req) 1058 { 1059 struct rpcrdma_buffer *buffers = req->rl_buffer; 1060 1061 spin_lock(&buffers->rb_lock); 1062 if (!list_empty(&buffers->rb_recv_bufs)) 1063 req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers); 1064 spin_unlock(&buffers->rb_lock); 1065 } 1066 1067 /* 1068 * Put reply buffers back into pool when not attached to 1069 * request. This happens in error conditions. 1070 */ 1071 void 1072 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) 1073 { 1074 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; 1075 1076 spin_lock(&buffers->rb_lock); 1077 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1078 spin_unlock(&buffers->rb_lock); 1079 } 1080 1081 /* 1082 * Wrappers for internal-use kmalloc memory registration, used by buffer code. 1083 */ 1084 1085 void 1086 rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg) 1087 { 1088 dprintk("RPC: map_one: offset %p iova %llx len %zu\n", 1089 seg->mr_offset, 1090 (unsigned long long)seg->mr_dma, seg->mr_dmalen); 1091 } 1092 1093 /** 1094 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers 1095 * @ia: controlling rpcrdma_ia 1096 * @size: size of buffer to be allocated, in bytes 1097 * @flags: GFP flags 1098 * 1099 * Returns pointer to private header of an area of internally 1100 * registered memory, or an ERR_PTR. The registered buffer follows 1101 * the end of the private header. 1102 * 1103 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for 1104 * receiving the payload of RDMA RECV operations. regbufs are not 1105 * used for RDMA READ/WRITE operations, thus are registered only for 1106 * LOCAL access. 1107 */ 1108 struct rpcrdma_regbuf * 1109 rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags) 1110 { 1111 struct rpcrdma_regbuf *rb; 1112 struct ib_sge *iov; 1113 1114 rb = kmalloc(sizeof(*rb) + size, flags); 1115 if (rb == NULL) 1116 goto out; 1117 1118 iov = &rb->rg_iov; 1119 iov->addr = ib_dma_map_single(ia->ri_device, 1120 (void *)rb->rg_base, size, 1121 DMA_BIDIRECTIONAL); 1122 if (ib_dma_mapping_error(ia->ri_device, iov->addr)) 1123 goto out_free; 1124 1125 iov->length = size; 1126 iov->lkey = ia->ri_pd->local_dma_lkey; 1127 rb->rg_size = size; 1128 rb->rg_owner = NULL; 1129 return rb; 1130 1131 out_free: 1132 kfree(rb); 1133 out: 1134 return ERR_PTR(-ENOMEM); 1135 } 1136 1137 /** 1138 * rpcrdma_free_regbuf - deregister and free registered buffer 1139 * @ia: controlling rpcrdma_ia 1140 * @rb: regbuf to be deregistered and freed 1141 */ 1142 void 1143 rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) 1144 { 1145 struct ib_sge *iov; 1146 1147 if (!rb) 1148 return; 1149 1150 iov = &rb->rg_iov; 1151 ib_dma_unmap_single(ia->ri_device, 1152 iov->addr, iov->length, DMA_BIDIRECTIONAL); 1153 kfree(rb); 1154 } 1155 1156 /* 1157 * Prepost any receive buffer, then post send. 1158 * 1159 * Receive buffer is donated to hardware, reclaimed upon recv completion. 1160 */ 1161 int 1162 rpcrdma_ep_post(struct rpcrdma_ia *ia, 1163 struct rpcrdma_ep *ep, 1164 struct rpcrdma_req *req) 1165 { 1166 struct ib_device *device = ia->ri_device; 1167 struct ib_send_wr send_wr, *send_wr_fail; 1168 struct rpcrdma_rep *rep = req->rl_reply; 1169 struct ib_sge *iov = req->rl_send_iov; 1170 int i, rc; 1171 1172 if (rep) { 1173 rc = rpcrdma_ep_post_recv(ia, ep, rep); 1174 if (rc) 1175 goto out; 1176 req->rl_reply = NULL; 1177 } 1178 1179 send_wr.next = NULL; 1180 send_wr.wr_cqe = &req->rl_cqe; 1181 send_wr.sg_list = iov; 1182 send_wr.num_sge = req->rl_niovs; 1183 send_wr.opcode = IB_WR_SEND; 1184 1185 for (i = 0; i < send_wr.num_sge; i++) 1186 ib_dma_sync_single_for_device(device, iov[i].addr, 1187 iov[i].length, DMA_TO_DEVICE); 1188 dprintk("RPC: %s: posting %d s/g entries\n", 1189 __func__, send_wr.num_sge); 1190 1191 if (DECR_CQCOUNT(ep) > 0) 1192 send_wr.send_flags = 0; 1193 else { /* Provider must take a send completion every now and then */ 1194 INIT_CQCOUNT(ep); 1195 send_wr.send_flags = IB_SEND_SIGNALED; 1196 } 1197 1198 rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail); 1199 if (rc) 1200 dprintk("RPC: %s: ib_post_send returned %i\n", __func__, 1201 rc); 1202 out: 1203 return rc; 1204 } 1205 1206 /* 1207 * (Re)post a receive buffer. 1208 */ 1209 int 1210 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, 1211 struct rpcrdma_ep *ep, 1212 struct rpcrdma_rep *rep) 1213 { 1214 struct ib_recv_wr recv_wr, *recv_wr_fail; 1215 int rc; 1216 1217 recv_wr.next = NULL; 1218 recv_wr.wr_cqe = &rep->rr_cqe; 1219 recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 1220 recv_wr.num_sge = 1; 1221 1222 ib_dma_sync_single_for_cpu(ia->ri_device, 1223 rdmab_addr(rep->rr_rdmabuf), 1224 rdmab_length(rep->rr_rdmabuf), 1225 DMA_BIDIRECTIONAL); 1226 1227 rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail); 1228 1229 if (rc) 1230 dprintk("RPC: %s: ib_post_recv returned %i\n", __func__, 1231 rc); 1232 return rc; 1233 } 1234 1235 /** 1236 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests 1237 * @r_xprt: transport associated with these backchannel resources 1238 * @min_reqs: minimum number of incoming requests expected 1239 * 1240 * Returns zero if all requested buffers were posted, or a negative errno. 1241 */ 1242 int 1243 rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) 1244 { 1245 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; 1246 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 1247 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 1248 struct rpcrdma_rep *rep; 1249 int rc; 1250 1251 while (count--) { 1252 spin_lock(&buffers->rb_lock); 1253 if (list_empty(&buffers->rb_recv_bufs)) 1254 goto out_reqbuf; 1255 rep = rpcrdma_buffer_get_rep_locked(buffers); 1256 spin_unlock(&buffers->rb_lock); 1257 1258 rc = rpcrdma_ep_post_recv(ia, ep, rep); 1259 if (rc) 1260 goto out_rc; 1261 } 1262 1263 return 0; 1264 1265 out_reqbuf: 1266 spin_unlock(&buffers->rb_lock); 1267 pr_warn("%s: no extra receive buffers\n", __func__); 1268 return -ENOMEM; 1269 1270 out_rc: 1271 rpcrdma_recv_buffer_put(rep); 1272 return rc; 1273 } 1274 1275 /* How many chunk list items fit within our inline buffers? 1276 */ 1277 unsigned int 1278 rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt) 1279 { 1280 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 1281 int bytes, segments; 1282 1283 bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize); 1284 bytes -= RPCRDMA_HDRLEN_MIN; 1285 if (bytes < sizeof(struct rpcrdma_segment) * 2) { 1286 pr_warn("RPC: %s: inline threshold too small\n", 1287 __func__); 1288 return 0; 1289 } 1290 1291 segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1); 1292 dprintk("RPC: %s: max chunk list size = %d segments\n", 1293 __func__, segments); 1294 return segments; 1295 } 1296