1 /* 2 * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the 8 * OpenIB.org BSD license below: 9 * 10 * Redistribution and use in source and binary forms, with or 11 * without modification, are permitted provided that the following 12 * conditions are met: 13 * 14 * - Redistributions of source code must retain the above 15 * copyright notice, this list of conditions and the following 16 * disclaimer. 17 * 18 * - Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials 21 * provided with the distribution. 22 * 23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 30 * SOFTWARE. 31 * 32 */ 33 #include <linux/kernel.h> 34 #include <linux/slab.h> 35 #include <linux/rculist.h> 36 #include <linux/llist.h> 37 38 #include "rds_single_path.h" 39 #include "ib_mr.h" 40 #include "rds.h" 41 42 struct workqueue_struct *rds_ib_mr_wq; 43 struct rds_ib_dereg_odp_mr { 44 struct work_struct work; 45 struct ib_mr *mr; 46 }; 47 48 static void rds_ib_odp_mr_worker(struct work_struct *work); 49 50 static struct rds_ib_device *rds_ib_get_device(__be32 ipaddr) 51 { 52 struct rds_ib_device *rds_ibdev; 53 struct rds_ib_ipaddr *i_ipaddr; 54 55 rcu_read_lock(); 56 list_for_each_entry_rcu(rds_ibdev, &rds_ib_devices, list) { 57 list_for_each_entry_rcu(i_ipaddr, &rds_ibdev->ipaddr_list, list) { 58 if (i_ipaddr->ipaddr == ipaddr) { 59 refcount_inc(&rds_ibdev->refcount); 60 rcu_read_unlock(); 61 return rds_ibdev; 62 } 63 } 64 } 65 rcu_read_unlock(); 66 67 return NULL; 68 } 69 70 static int rds_ib_add_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr) 71 { 72 struct rds_ib_ipaddr *i_ipaddr; 73 74 i_ipaddr = kmalloc(sizeof *i_ipaddr, GFP_KERNEL); 75 if (!i_ipaddr) 76 return -ENOMEM; 77 78 i_ipaddr->ipaddr = ipaddr; 79 80 spin_lock_irq(&rds_ibdev->spinlock); 81 list_add_tail_rcu(&i_ipaddr->list, &rds_ibdev->ipaddr_list); 82 spin_unlock_irq(&rds_ibdev->spinlock); 83 84 return 0; 85 } 86 87 static void rds_ib_remove_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr) 88 { 89 struct rds_ib_ipaddr *i_ipaddr; 90 struct rds_ib_ipaddr *to_free = NULL; 91 92 93 spin_lock_irq(&rds_ibdev->spinlock); 94 list_for_each_entry_rcu(i_ipaddr, &rds_ibdev->ipaddr_list, list) { 95 if (i_ipaddr->ipaddr == ipaddr) { 96 list_del_rcu(&i_ipaddr->list); 97 to_free = i_ipaddr; 98 break; 99 } 100 } 101 spin_unlock_irq(&rds_ibdev->spinlock); 102 103 if (to_free) 104 kfree_rcu(to_free, rcu); 105 } 106 107 int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, 108 struct in6_addr *ipaddr) 109 { 110 struct rds_ib_device *rds_ibdev_old; 111 112 rds_ibdev_old = rds_ib_get_device(ipaddr->s6_addr32[3]); 113 if (!rds_ibdev_old) 114 return rds_ib_add_ipaddr(rds_ibdev, ipaddr->s6_addr32[3]); 115 116 if (rds_ibdev_old != rds_ibdev) { 117 rds_ib_remove_ipaddr(rds_ibdev_old, ipaddr->s6_addr32[3]); 118 rds_ib_dev_put(rds_ibdev_old); 119 return rds_ib_add_ipaddr(rds_ibdev, ipaddr->s6_addr32[3]); 120 } 121 rds_ib_dev_put(rds_ibdev_old); 122 123 return 0; 124 } 125 126 void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn) 127 { 128 struct rds_ib_connection *ic = conn->c_transport_data; 129 130 /* conn was previously on the nodev_conns_list */ 131 spin_lock_irq(&ib_nodev_conns_lock); 132 BUG_ON(list_empty(&ib_nodev_conns)); 133 BUG_ON(list_empty(&ic->ib_node)); 134 list_del(&ic->ib_node); 135 136 spin_lock(&rds_ibdev->spinlock); 137 list_add_tail(&ic->ib_node, &rds_ibdev->conn_list); 138 spin_unlock(&rds_ibdev->spinlock); 139 spin_unlock_irq(&ib_nodev_conns_lock); 140 141 ic->rds_ibdev = rds_ibdev; 142 refcount_inc(&rds_ibdev->refcount); 143 } 144 145 void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn) 146 { 147 struct rds_ib_connection *ic = conn->c_transport_data; 148 149 /* place conn on nodev_conns_list */ 150 spin_lock(&ib_nodev_conns_lock); 151 152 spin_lock_irq(&rds_ibdev->spinlock); 153 BUG_ON(list_empty(&ic->ib_node)); 154 list_del(&ic->ib_node); 155 spin_unlock_irq(&rds_ibdev->spinlock); 156 157 list_add_tail(&ic->ib_node, &ib_nodev_conns); 158 159 spin_unlock(&ib_nodev_conns_lock); 160 161 ic->rds_ibdev = NULL; 162 rds_ib_dev_put(rds_ibdev); 163 } 164 165 void rds_ib_destroy_nodev_conns(void) 166 { 167 struct rds_ib_connection *ic, *_ic; 168 LIST_HEAD(tmp_list); 169 170 /* avoid calling conn_destroy with irqs off */ 171 spin_lock_irq(&ib_nodev_conns_lock); 172 list_splice(&ib_nodev_conns, &tmp_list); 173 spin_unlock_irq(&ib_nodev_conns_lock); 174 175 list_for_each_entry_safe(ic, _ic, &tmp_list, ib_node) 176 rds_conn_destroy(ic->conn); 177 } 178 179 void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo) 180 { 181 struct rds_ib_mr_pool *pool_1m = rds_ibdev->mr_1m_pool; 182 183 iinfo->rdma_mr_max = pool_1m->max_items; 184 iinfo->rdma_mr_size = pool_1m->max_pages; 185 } 186 187 #if IS_ENABLED(CONFIG_IPV6) 188 void rds6_ib_get_mr_info(struct rds_ib_device *rds_ibdev, 189 struct rds6_info_rdma_connection *iinfo6) 190 { 191 struct rds_ib_mr_pool *pool_1m = rds_ibdev->mr_1m_pool; 192 193 iinfo6->rdma_mr_max = pool_1m->max_items; 194 iinfo6->rdma_mr_size = pool_1m->max_pages; 195 } 196 #endif 197 198 struct rds_ib_mr *rds_ib_reuse_mr(struct rds_ib_mr_pool *pool) 199 { 200 struct rds_ib_mr *ibmr = NULL; 201 struct llist_node *ret; 202 unsigned long flags; 203 204 spin_lock_irqsave(&pool->clean_lock, flags); 205 ret = llist_del_first(&pool->clean_list); 206 spin_unlock_irqrestore(&pool->clean_lock, flags); 207 if (ret) { 208 ibmr = llist_entry(ret, struct rds_ib_mr, llnode); 209 if (pool->pool_type == RDS_IB_MR_8K_POOL) 210 rds_ib_stats_inc(s_ib_rdma_mr_8k_reused); 211 else 212 rds_ib_stats_inc(s_ib_rdma_mr_1m_reused); 213 } 214 215 return ibmr; 216 } 217 218 void rds_ib_sync_mr(void *trans_private, int direction) 219 { 220 struct rds_ib_mr *ibmr = trans_private; 221 struct rds_ib_device *rds_ibdev = ibmr->device; 222 223 if (ibmr->odp) 224 return; 225 226 switch (direction) { 227 case DMA_FROM_DEVICE: 228 ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->sg, 229 ibmr->sg_dma_len, DMA_BIDIRECTIONAL); 230 break; 231 case DMA_TO_DEVICE: 232 ib_dma_sync_sg_for_device(rds_ibdev->dev, ibmr->sg, 233 ibmr->sg_dma_len, DMA_BIDIRECTIONAL); 234 break; 235 } 236 } 237 238 void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr) 239 { 240 struct rds_ib_device *rds_ibdev = ibmr->device; 241 242 if (ibmr->sg_dma_len) { 243 ib_dma_unmap_sg(rds_ibdev->dev, 244 ibmr->sg, ibmr->sg_len, 245 DMA_BIDIRECTIONAL); 246 ibmr->sg_dma_len = 0; 247 } 248 249 /* Release the s/g list */ 250 if (ibmr->sg_len) { 251 unsigned int i; 252 253 for (i = 0; i < ibmr->sg_len; ++i) { 254 struct page *page = sg_page(&ibmr->sg[i]); 255 256 /* FIXME we need a way to tell a r/w MR 257 * from a r/o MR */ 258 WARN_ON(!page->mapping && irqs_disabled()); 259 set_page_dirty(page); 260 put_page(page); 261 } 262 kfree(ibmr->sg); 263 264 ibmr->sg = NULL; 265 ibmr->sg_len = 0; 266 } 267 } 268 269 void rds_ib_teardown_mr(struct rds_ib_mr *ibmr) 270 { 271 unsigned int pinned = ibmr->sg_len; 272 273 __rds_ib_teardown_mr(ibmr); 274 if (pinned) { 275 struct rds_ib_mr_pool *pool = ibmr->pool; 276 277 atomic_sub(pinned, &pool->free_pinned); 278 } 279 } 280 281 static inline unsigned int rds_ib_flush_goal(struct rds_ib_mr_pool *pool, int free_all) 282 { 283 unsigned int item_count; 284 285 item_count = atomic_read(&pool->item_count); 286 if (free_all) 287 return item_count; 288 289 return 0; 290 } 291 292 /* 293 * given an llist of mrs, put them all into the list_head for more processing 294 */ 295 static unsigned int llist_append_to_list(struct llist_head *llist, 296 struct list_head *list) 297 { 298 struct rds_ib_mr *ibmr; 299 struct llist_node *node; 300 struct llist_node *next; 301 unsigned int count = 0; 302 303 node = llist_del_all(llist); 304 while (node) { 305 next = node->next; 306 ibmr = llist_entry(node, struct rds_ib_mr, llnode); 307 list_add_tail(&ibmr->unmap_list, list); 308 node = next; 309 count++; 310 } 311 return count; 312 } 313 314 /* 315 * this takes a list head of mrs and turns it into linked llist nodes 316 * of clusters. Each cluster has linked llist nodes of 317 * MR_CLUSTER_SIZE mrs that are ready for reuse. 318 */ 319 static void list_to_llist_nodes(struct list_head *list, 320 struct llist_node **nodes_head, 321 struct llist_node **nodes_tail) 322 { 323 struct rds_ib_mr *ibmr; 324 struct llist_node *cur = NULL; 325 struct llist_node **next = nodes_head; 326 327 list_for_each_entry(ibmr, list, unmap_list) { 328 cur = &ibmr->llnode; 329 *next = cur; 330 next = &cur->next; 331 } 332 *next = NULL; 333 *nodes_tail = cur; 334 } 335 336 /* 337 * Flush our pool of MRs. 338 * At a minimum, all currently unused MRs are unmapped. 339 * If the number of MRs allocated exceeds the limit, we also try 340 * to free as many MRs as needed to get back to this limit. 341 */ 342 int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, 343 int free_all, struct rds_ib_mr **ibmr_ret) 344 { 345 struct rds_ib_mr *ibmr; 346 struct llist_node *clean_nodes; 347 struct llist_node *clean_tail; 348 LIST_HEAD(unmap_list); 349 unsigned long unpinned = 0; 350 unsigned int nfreed = 0, dirty_to_clean = 0, free_goal; 351 352 if (pool->pool_type == RDS_IB_MR_8K_POOL) 353 rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_flush); 354 else 355 rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_flush); 356 357 if (ibmr_ret) { 358 DEFINE_WAIT(wait); 359 while (!mutex_trylock(&pool->flush_lock)) { 360 ibmr = rds_ib_reuse_mr(pool); 361 if (ibmr) { 362 *ibmr_ret = ibmr; 363 finish_wait(&pool->flush_wait, &wait); 364 goto out_nolock; 365 } 366 367 prepare_to_wait(&pool->flush_wait, &wait, 368 TASK_UNINTERRUPTIBLE); 369 if (llist_empty(&pool->clean_list)) 370 schedule(); 371 372 ibmr = rds_ib_reuse_mr(pool); 373 if (ibmr) { 374 *ibmr_ret = ibmr; 375 finish_wait(&pool->flush_wait, &wait); 376 goto out_nolock; 377 } 378 } 379 finish_wait(&pool->flush_wait, &wait); 380 } else 381 mutex_lock(&pool->flush_lock); 382 383 if (ibmr_ret) { 384 ibmr = rds_ib_reuse_mr(pool); 385 if (ibmr) { 386 *ibmr_ret = ibmr; 387 goto out; 388 } 389 } 390 391 /* Get the list of all MRs to be dropped. Ordering matters - 392 * we want to put drop_list ahead of free_list. 393 */ 394 dirty_to_clean = llist_append_to_list(&pool->drop_list, &unmap_list); 395 dirty_to_clean += llist_append_to_list(&pool->free_list, &unmap_list); 396 if (free_all) { 397 unsigned long flags; 398 399 spin_lock_irqsave(&pool->clean_lock, flags); 400 llist_append_to_list(&pool->clean_list, &unmap_list); 401 spin_unlock_irqrestore(&pool->clean_lock, flags); 402 } 403 404 free_goal = rds_ib_flush_goal(pool, free_all); 405 406 if (list_empty(&unmap_list)) 407 goto out; 408 409 rds_ib_unreg_frmr(&unmap_list, &nfreed, &unpinned, free_goal); 410 411 if (!list_empty(&unmap_list)) { 412 unsigned long flags; 413 414 list_to_llist_nodes(&unmap_list, &clean_nodes, &clean_tail); 415 if (ibmr_ret) { 416 *ibmr_ret = llist_entry(clean_nodes, struct rds_ib_mr, llnode); 417 clean_nodes = clean_nodes->next; 418 } 419 /* more than one entry in llist nodes */ 420 if (clean_nodes) { 421 spin_lock_irqsave(&pool->clean_lock, flags); 422 llist_add_batch(clean_nodes, clean_tail, 423 &pool->clean_list); 424 spin_unlock_irqrestore(&pool->clean_lock, flags); 425 } 426 } 427 428 atomic_sub(unpinned, &pool->free_pinned); 429 atomic_sub(dirty_to_clean, &pool->dirty_count); 430 atomic_sub(nfreed, &pool->item_count); 431 432 out: 433 mutex_unlock(&pool->flush_lock); 434 if (waitqueue_active(&pool->flush_wait)) 435 wake_up(&pool->flush_wait); 436 out_nolock: 437 return 0; 438 } 439 440 struct rds_ib_mr *rds_ib_try_reuse_ibmr(struct rds_ib_mr_pool *pool) 441 { 442 struct rds_ib_mr *ibmr = NULL; 443 int iter = 0; 444 445 while (1) { 446 ibmr = rds_ib_reuse_mr(pool); 447 if (ibmr) 448 return ibmr; 449 450 if (atomic_inc_return(&pool->item_count) <= pool->max_items) 451 break; 452 453 atomic_dec(&pool->item_count); 454 455 if (++iter > 2) { 456 if (pool->pool_type == RDS_IB_MR_8K_POOL) 457 rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_depleted); 458 else 459 rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_depleted); 460 break; 461 } 462 463 /* We do have some empty MRs. Flush them out. */ 464 if (pool->pool_type == RDS_IB_MR_8K_POOL) 465 rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_wait); 466 else 467 rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_wait); 468 469 rds_ib_flush_mr_pool(pool, 0, &ibmr); 470 if (ibmr) 471 return ibmr; 472 } 473 474 return NULL; 475 } 476 477 static void rds_ib_mr_pool_flush_worker(struct work_struct *work) 478 { 479 struct rds_ib_mr_pool *pool = container_of(work, struct rds_ib_mr_pool, flush_worker.work); 480 481 rds_ib_flush_mr_pool(pool, 0, NULL); 482 } 483 484 void rds_ib_free_mr(void *trans_private, int invalidate) 485 { 486 struct rds_ib_mr *ibmr = trans_private; 487 struct rds_ib_mr_pool *pool = ibmr->pool; 488 struct rds_ib_device *rds_ibdev = ibmr->device; 489 490 rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len); 491 492 if (ibmr->odp) { 493 /* A MR created and marked as use_once. We use delayed work, 494 * because there is a change that we are in interrupt and can't 495 * call to ib_dereg_mr() directly. 496 */ 497 INIT_DELAYED_WORK(&ibmr->work, rds_ib_odp_mr_worker); 498 queue_delayed_work(rds_ib_mr_wq, &ibmr->work, 0); 499 return; 500 } 501 502 /* Return it to the pool's free list */ 503 rds_ib_free_frmr_list(ibmr); 504 505 atomic_add(ibmr->sg_len, &pool->free_pinned); 506 atomic_inc(&pool->dirty_count); 507 508 /* If we've pinned too many pages, request a flush */ 509 if (atomic_read(&pool->free_pinned) >= pool->max_free_pinned || 510 atomic_read(&pool->dirty_count) >= pool->max_items / 5) 511 queue_delayed_work(rds_ib_mr_wq, &pool->flush_worker, 10); 512 513 if (invalidate) { 514 if (likely(!in_interrupt())) { 515 rds_ib_flush_mr_pool(pool, 0, NULL); 516 } else { 517 /* We get here if the user created a MR marked 518 * as use_once and invalidate at the same time. 519 */ 520 queue_delayed_work(rds_ib_mr_wq, 521 &pool->flush_worker, 10); 522 } 523 } 524 525 rds_ib_dev_put(rds_ibdev); 526 } 527 528 void rds_ib_flush_mrs(void) 529 { 530 struct rds_ib_device *rds_ibdev; 531 532 down_read(&rds_ib_devices_lock); 533 list_for_each_entry(rds_ibdev, &rds_ib_devices, list) { 534 if (rds_ibdev->mr_8k_pool) 535 rds_ib_flush_mr_pool(rds_ibdev->mr_8k_pool, 0, NULL); 536 537 if (rds_ibdev->mr_1m_pool) 538 rds_ib_flush_mr_pool(rds_ibdev->mr_1m_pool, 0, NULL); 539 } 540 up_read(&rds_ib_devices_lock); 541 } 542 543 u32 rds_ib_get_lkey(void *trans_private) 544 { 545 struct rds_ib_mr *ibmr = trans_private; 546 547 return ibmr->u.mr->lkey; 548 } 549 550 void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents, 551 struct rds_sock *rs, u32 *key_ret, 552 struct rds_connection *conn, 553 u64 start, u64 length, int need_odp) 554 { 555 struct rds_ib_device *rds_ibdev; 556 struct rds_ib_mr *ibmr = NULL; 557 struct rds_ib_connection *ic = NULL; 558 int ret; 559 560 rds_ibdev = rds_ib_get_device(rs->rs_bound_addr.s6_addr32[3]); 561 if (!rds_ibdev) { 562 ret = -ENODEV; 563 goto out; 564 } 565 566 if (need_odp == ODP_ZEROBASED || need_odp == ODP_VIRTUAL) { 567 u64 virt_addr = need_odp == ODP_ZEROBASED ? 0 : start; 568 int access_flags = 569 (IB_ACCESS_LOCAL_WRITE | IB_ACCESS_REMOTE_READ | 570 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_ATOMIC | 571 IB_ACCESS_ON_DEMAND); 572 struct ib_sge sge = {}; 573 struct ib_mr *ib_mr; 574 575 if (!rds_ibdev->odp_capable) { 576 ret = -EOPNOTSUPP; 577 goto out; 578 } 579 580 ib_mr = ib_reg_user_mr(rds_ibdev->pd, start, length, virt_addr, 581 access_flags); 582 583 if (IS_ERR(ib_mr)) { 584 rdsdebug("rds_ib_get_user_mr returned %d\n", 585 IS_ERR(ib_mr)); 586 ret = PTR_ERR(ib_mr); 587 goto out; 588 } 589 if (key_ret) 590 *key_ret = ib_mr->rkey; 591 592 ibmr = kzalloc(sizeof(*ibmr), GFP_KERNEL); 593 if (!ibmr) { 594 ib_dereg_mr(ib_mr); 595 ret = -ENOMEM; 596 goto out; 597 } 598 ibmr->u.mr = ib_mr; 599 ibmr->odp = 1; 600 601 sge.addr = virt_addr; 602 sge.length = length; 603 sge.lkey = ib_mr->lkey; 604 605 ib_advise_mr(rds_ibdev->pd, 606 IB_UVERBS_ADVISE_MR_ADVICE_PREFETCH_WRITE, 607 IB_UVERBS_ADVISE_MR_FLAG_FLUSH, &sge, 1); 608 return ibmr; 609 } 610 611 if (conn) 612 ic = conn->c_transport_data; 613 614 if (!rds_ibdev->mr_8k_pool || !rds_ibdev->mr_1m_pool) { 615 ret = -ENODEV; 616 goto out; 617 } 618 619 ibmr = rds_ib_reg_frmr(rds_ibdev, ic, sg, nents, key_ret); 620 if (IS_ERR(ibmr)) { 621 ret = PTR_ERR(ibmr); 622 pr_warn("RDS/IB: rds_ib_get_mr failed (errno=%d)\n", ret); 623 } else { 624 return ibmr; 625 } 626 627 out: 628 if (rds_ibdev) 629 rds_ib_dev_put(rds_ibdev); 630 631 return ERR_PTR(ret); 632 } 633 634 void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool) 635 { 636 cancel_delayed_work_sync(&pool->flush_worker); 637 rds_ib_flush_mr_pool(pool, 1, NULL); 638 WARN_ON(atomic_read(&pool->item_count)); 639 WARN_ON(atomic_read(&pool->free_pinned)); 640 kfree(pool); 641 } 642 643 struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev, 644 int pool_type) 645 { 646 struct rds_ib_mr_pool *pool; 647 648 pool = kzalloc(sizeof(*pool), GFP_KERNEL); 649 if (!pool) 650 return ERR_PTR(-ENOMEM); 651 652 pool->pool_type = pool_type; 653 init_llist_head(&pool->free_list); 654 init_llist_head(&pool->drop_list); 655 init_llist_head(&pool->clean_list); 656 spin_lock_init(&pool->clean_lock); 657 mutex_init(&pool->flush_lock); 658 init_waitqueue_head(&pool->flush_wait); 659 INIT_DELAYED_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker); 660 661 if (pool_type == RDS_IB_MR_1M_POOL) { 662 /* +1 allows for unaligned MRs */ 663 pool->max_pages = RDS_MR_1M_MSG_SIZE + 1; 664 pool->max_items = rds_ibdev->max_1m_mrs; 665 } else { 666 /* pool_type == RDS_IB_MR_8K_POOL */ 667 pool->max_pages = RDS_MR_8K_MSG_SIZE + 1; 668 pool->max_items = rds_ibdev->max_8k_mrs; 669 } 670 671 pool->max_free_pinned = pool->max_items * pool->max_pages / 4; 672 pool->max_items_soft = rds_ibdev->max_mrs * 3 / 4; 673 674 return pool; 675 } 676 677 int rds_ib_mr_init(void) 678 { 679 rds_ib_mr_wq = alloc_workqueue("rds_mr_flushd", WQ_MEM_RECLAIM, 0); 680 if (!rds_ib_mr_wq) 681 return -ENOMEM; 682 return 0; 683 } 684 685 /* By the time this is called all the IB devices should have been torn down and 686 * had their pools freed. As each pool is freed its work struct is waited on, 687 * so the pool flushing work queue should be idle by the time we get here. 688 */ 689 void rds_ib_mr_exit(void) 690 { 691 destroy_workqueue(rds_ib_mr_wq); 692 } 693 694 static void rds_ib_odp_mr_worker(struct work_struct *work) 695 { 696 struct rds_ib_mr *ibmr; 697 698 ibmr = container_of(work, struct rds_ib_mr, work.work); 699 ib_dereg_mr(ibmr->u.mr); 700 kfree(ibmr); 701 } 702