1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * userdlm.c 5 * 6 * Code which implements the kernel side of a minimal userspace 7 * interface to our DLM. 8 * 9 * Many of the functions here are pared down versions of dlmglue.c 10 * functions. 11 * 12 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 13 * 14 * This program is free software; you can redistribute it and/or 15 * modify it under the terms of the GNU General Public 16 * License as published by the Free Software Foundation; either 17 * version 2 of the License, or (at your option) any later version. 18 * 19 * This program is distributed in the hope that it will be useful, 20 * but WITHOUT ANY WARRANTY; without even the implied warranty of 21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 22 * General Public License for more details. 23 * 24 * You should have received a copy of the GNU General Public 25 * License along with this program; if not, write to the 26 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 27 * Boston, MA 021110-1307, USA. 28 */ 29 30 #include <linux/signal.h> 31 32 #include <linux/module.h> 33 #include <linux/fs.h> 34 #include <linux/types.h> 35 #include <linux/crc32.h> 36 37 #include "ocfs2_lockingver.h" 38 #include "stackglue.h" 39 #include "userdlm.h" 40 41 #define MLOG_MASK_PREFIX ML_DLMFS 42 #include "cluster/masklog.h" 43 44 45 static inline struct user_lock_res *user_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb) 46 { 47 return container_of(lksb, struct user_lock_res, l_lksb); 48 } 49 50 static inline int user_check_wait_flag(struct user_lock_res *lockres, 51 int flag) 52 { 53 int ret; 54 55 spin_lock(&lockres->l_lock); 56 ret = lockres->l_flags & flag; 57 spin_unlock(&lockres->l_lock); 58 59 return ret; 60 } 61 62 static inline void user_wait_on_busy_lock(struct user_lock_res *lockres) 63 64 { 65 wait_event(lockres->l_event, 66 !user_check_wait_flag(lockres, USER_LOCK_BUSY)); 67 } 68 69 static inline void user_wait_on_blocked_lock(struct user_lock_res *lockres) 70 71 { 72 wait_event(lockres->l_event, 73 !user_check_wait_flag(lockres, USER_LOCK_BLOCKED)); 74 } 75 76 /* I heart container_of... */ 77 static inline struct ocfs2_cluster_connection * 78 cluster_connection_from_user_lockres(struct user_lock_res *lockres) 79 { 80 struct dlmfs_inode_private *ip; 81 82 ip = container_of(lockres, 83 struct dlmfs_inode_private, 84 ip_lockres); 85 return ip->ip_conn; 86 } 87 88 static struct inode * 89 user_dlm_inode_from_user_lockres(struct user_lock_res *lockres) 90 { 91 struct dlmfs_inode_private *ip; 92 93 ip = container_of(lockres, 94 struct dlmfs_inode_private, 95 ip_lockres); 96 return &ip->ip_vfs_inode; 97 } 98 99 static inline void user_recover_from_dlm_error(struct user_lock_res *lockres) 100 { 101 spin_lock(&lockres->l_lock); 102 lockres->l_flags &= ~USER_LOCK_BUSY; 103 spin_unlock(&lockres->l_lock); 104 } 105 106 #define user_log_dlm_error(_func, _stat, _lockres) do { \ 107 mlog(ML_ERROR, "Dlm error %d while calling %s on " \ 108 "resource %.*s\n", _stat, _func, \ 109 _lockres->l_namelen, _lockres->l_name); \ 110 } while (0) 111 112 /* WARNING: This function lives in a world where the only three lock 113 * levels are EX, PR, and NL. It *will* have to be adjusted when more 114 * lock types are added. */ 115 static inline int user_highest_compat_lock_level(int level) 116 { 117 int new_level = DLM_LOCK_EX; 118 119 if (level == DLM_LOCK_EX) 120 new_level = DLM_LOCK_NL; 121 else if (level == DLM_LOCK_PR) 122 new_level = DLM_LOCK_PR; 123 return new_level; 124 } 125 126 static void user_ast(struct ocfs2_dlm_lksb *lksb) 127 { 128 struct user_lock_res *lockres = user_lksb_to_lock_res(lksb); 129 int status; 130 131 mlog(ML_BASTS, "AST fired for lockres %.*s, level %d => %d\n", 132 lockres->l_namelen, lockres->l_name, lockres->l_level, 133 lockres->l_requested); 134 135 spin_lock(&lockres->l_lock); 136 137 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 138 if (status) { 139 mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n", 140 status, lockres->l_namelen, lockres->l_name); 141 spin_unlock(&lockres->l_lock); 142 return; 143 } 144 145 mlog_bug_on_msg(lockres->l_requested == DLM_LOCK_IV, 146 "Lockres %.*s, requested ivmode. flags 0x%x\n", 147 lockres->l_namelen, lockres->l_name, lockres->l_flags); 148 149 /* we're downconverting. */ 150 if (lockres->l_requested < lockres->l_level) { 151 if (lockres->l_requested <= 152 user_highest_compat_lock_level(lockres->l_blocking)) { 153 lockres->l_blocking = DLM_LOCK_NL; 154 lockres->l_flags &= ~USER_LOCK_BLOCKED; 155 } 156 } 157 158 lockres->l_level = lockres->l_requested; 159 lockres->l_requested = DLM_LOCK_IV; 160 lockres->l_flags |= USER_LOCK_ATTACHED; 161 lockres->l_flags &= ~USER_LOCK_BUSY; 162 163 spin_unlock(&lockres->l_lock); 164 165 wake_up(&lockres->l_event); 166 } 167 168 static inline void user_dlm_grab_inode_ref(struct user_lock_res *lockres) 169 { 170 struct inode *inode; 171 inode = user_dlm_inode_from_user_lockres(lockres); 172 if (!igrab(inode)) 173 BUG(); 174 } 175 176 static void user_dlm_unblock_lock(struct work_struct *work); 177 178 static void __user_dlm_queue_lockres(struct user_lock_res *lockres) 179 { 180 if (!(lockres->l_flags & USER_LOCK_QUEUED)) { 181 user_dlm_grab_inode_ref(lockres); 182 183 INIT_WORK(&lockres->l_work, user_dlm_unblock_lock); 184 185 queue_work(user_dlm_worker, &lockres->l_work); 186 lockres->l_flags |= USER_LOCK_QUEUED; 187 } 188 } 189 190 static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres) 191 { 192 int queue = 0; 193 194 if (!(lockres->l_flags & USER_LOCK_BLOCKED)) 195 return; 196 197 switch (lockres->l_blocking) { 198 case DLM_LOCK_EX: 199 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 200 queue = 1; 201 break; 202 case DLM_LOCK_PR: 203 if (!lockres->l_ex_holders) 204 queue = 1; 205 break; 206 default: 207 BUG(); 208 } 209 210 if (queue) 211 __user_dlm_queue_lockres(lockres); 212 } 213 214 static void user_bast(struct ocfs2_dlm_lksb *lksb, int level) 215 { 216 struct user_lock_res *lockres = user_lksb_to_lock_res(lksb); 217 218 mlog(ML_BASTS, "BAST fired for lockres %.*s, blocking %d, level %d\n", 219 lockres->l_namelen, lockres->l_name, level, lockres->l_level); 220 221 spin_lock(&lockres->l_lock); 222 lockres->l_flags |= USER_LOCK_BLOCKED; 223 if (level > lockres->l_blocking) 224 lockres->l_blocking = level; 225 226 __user_dlm_queue_lockres(lockres); 227 spin_unlock(&lockres->l_lock); 228 229 wake_up(&lockres->l_event); 230 } 231 232 static void user_unlock_ast(struct ocfs2_dlm_lksb *lksb, int status) 233 { 234 struct user_lock_res *lockres = user_lksb_to_lock_res(lksb); 235 236 mlog(ML_BASTS, "UNLOCK AST fired for lockres %.*s, flags 0x%x\n", 237 lockres->l_namelen, lockres->l_name, lockres->l_flags); 238 239 if (status) 240 mlog(ML_ERROR, "dlm returns status %d\n", status); 241 242 spin_lock(&lockres->l_lock); 243 /* The teardown flag gets set early during the unlock process, 244 * so test the cancel flag to make sure that this ast isn't 245 * for a concurrent cancel. */ 246 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN 247 && !(lockres->l_flags & USER_LOCK_IN_CANCEL)) { 248 lockres->l_level = DLM_LOCK_IV; 249 } else if (status == DLM_CANCELGRANT) { 250 /* We tried to cancel a convert request, but it was 251 * already granted. Don't clear the busy flag - the 252 * ast should've done this already. */ 253 BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL)); 254 lockres->l_flags &= ~USER_LOCK_IN_CANCEL; 255 goto out_noclear; 256 } else { 257 BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL)); 258 /* Cancel succeeded, we want to re-queue */ 259 lockres->l_requested = DLM_LOCK_IV; /* cancel an 260 * upconvert 261 * request. */ 262 lockres->l_flags &= ~USER_LOCK_IN_CANCEL; 263 /* we want the unblock thread to look at it again 264 * now. */ 265 if (lockres->l_flags & USER_LOCK_BLOCKED) 266 __user_dlm_queue_lockres(lockres); 267 } 268 269 lockres->l_flags &= ~USER_LOCK_BUSY; 270 out_noclear: 271 spin_unlock(&lockres->l_lock); 272 273 wake_up(&lockres->l_event); 274 } 275 276 /* 277 * This is the userdlmfs locking protocol version. 278 * 279 * See fs/ocfs2/dlmglue.c for more details on locking versions. 280 */ 281 static struct ocfs2_locking_protocol user_dlm_lproto = { 282 .lp_max_version = { 283 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 284 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 285 }, 286 .lp_lock_ast = user_ast, 287 .lp_blocking_ast = user_bast, 288 .lp_unlock_ast = user_unlock_ast, 289 }; 290 291 static inline void user_dlm_drop_inode_ref(struct user_lock_res *lockres) 292 { 293 struct inode *inode; 294 inode = user_dlm_inode_from_user_lockres(lockres); 295 iput(inode); 296 } 297 298 static void user_dlm_unblock_lock(struct work_struct *work) 299 { 300 int new_level, status; 301 struct user_lock_res *lockres = 302 container_of(work, struct user_lock_res, l_work); 303 struct ocfs2_cluster_connection *conn = 304 cluster_connection_from_user_lockres(lockres); 305 306 mlog(0, "lockres %.*s\n", lockres->l_namelen, lockres->l_name); 307 308 spin_lock(&lockres->l_lock); 309 310 mlog_bug_on_msg(!(lockres->l_flags & USER_LOCK_QUEUED), 311 "Lockres %.*s, flags 0x%x\n", 312 lockres->l_namelen, lockres->l_name, lockres->l_flags); 313 314 /* notice that we don't clear USER_LOCK_BLOCKED here. If it's 315 * set, we want user_ast clear it. */ 316 lockres->l_flags &= ~USER_LOCK_QUEUED; 317 318 /* It's valid to get here and no longer be blocked - if we get 319 * several basts in a row, we might be queued by the first 320 * one, the unblock thread might run and clear the queued 321 * flag, and finally we might get another bast which re-queues 322 * us before our ast for the downconvert is called. */ 323 if (!(lockres->l_flags & USER_LOCK_BLOCKED)) { 324 mlog(ML_BASTS, "lockres %.*s USER_LOCK_BLOCKED\n", 325 lockres->l_namelen, lockres->l_name); 326 spin_unlock(&lockres->l_lock); 327 goto drop_ref; 328 } 329 330 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) { 331 mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_TEARDOWN\n", 332 lockres->l_namelen, lockres->l_name); 333 spin_unlock(&lockres->l_lock); 334 goto drop_ref; 335 } 336 337 if (lockres->l_flags & USER_LOCK_BUSY) { 338 if (lockres->l_flags & USER_LOCK_IN_CANCEL) { 339 mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_CANCEL\n", 340 lockres->l_namelen, lockres->l_name); 341 spin_unlock(&lockres->l_lock); 342 goto drop_ref; 343 } 344 345 lockres->l_flags |= USER_LOCK_IN_CANCEL; 346 spin_unlock(&lockres->l_lock); 347 348 status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, 349 DLM_LKF_CANCEL); 350 if (status) 351 user_log_dlm_error("ocfs2_dlm_unlock", status, lockres); 352 goto drop_ref; 353 } 354 355 /* If there are still incompat holders, we can exit safely 356 * without worrying about re-queueing this lock as that will 357 * happen on the last call to user_cluster_unlock. */ 358 if ((lockres->l_blocking == DLM_LOCK_EX) 359 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 360 spin_unlock(&lockres->l_lock); 361 mlog(ML_BASTS, "lockres %.*s, EX/PR Holders %u,%u\n", 362 lockres->l_namelen, lockres->l_name, 363 lockres->l_ex_holders, lockres->l_ro_holders); 364 goto drop_ref; 365 } 366 367 if ((lockres->l_blocking == DLM_LOCK_PR) 368 && lockres->l_ex_holders) { 369 spin_unlock(&lockres->l_lock); 370 mlog(ML_BASTS, "lockres %.*s, EX Holders %u\n", 371 lockres->l_namelen, lockres->l_name, 372 lockres->l_ex_holders); 373 goto drop_ref; 374 } 375 376 /* yay, we can downconvert now. */ 377 new_level = user_highest_compat_lock_level(lockres->l_blocking); 378 lockres->l_requested = new_level; 379 lockres->l_flags |= USER_LOCK_BUSY; 380 mlog(ML_BASTS, "lockres %.*s, downconvert %d => %d\n", 381 lockres->l_namelen, lockres->l_name, lockres->l_level, new_level); 382 spin_unlock(&lockres->l_lock); 383 384 /* need lock downconvert request now... */ 385 status = ocfs2_dlm_lock(conn, new_level, &lockres->l_lksb, 386 DLM_LKF_CONVERT|DLM_LKF_VALBLK, 387 lockres->l_name, 388 lockres->l_namelen); 389 if (status) { 390 user_log_dlm_error("ocfs2_dlm_lock", status, lockres); 391 user_recover_from_dlm_error(lockres); 392 } 393 394 drop_ref: 395 user_dlm_drop_inode_ref(lockres); 396 } 397 398 static inline void user_dlm_inc_holders(struct user_lock_res *lockres, 399 int level) 400 { 401 switch(level) { 402 case DLM_LOCK_EX: 403 lockres->l_ex_holders++; 404 break; 405 case DLM_LOCK_PR: 406 lockres->l_ro_holders++; 407 break; 408 default: 409 BUG(); 410 } 411 } 412 413 /* predict what lock level we'll be dropping down to on behalf 414 * of another node, and return true if the currently wanted 415 * level will be compatible with it. */ 416 static inline int 417 user_may_continue_on_blocked_lock(struct user_lock_res *lockres, 418 int wanted) 419 { 420 BUG_ON(!(lockres->l_flags & USER_LOCK_BLOCKED)); 421 422 return wanted <= user_highest_compat_lock_level(lockres->l_blocking); 423 } 424 425 int user_dlm_cluster_lock(struct user_lock_res *lockres, 426 int level, 427 int lkm_flags) 428 { 429 int status, local_flags; 430 struct ocfs2_cluster_connection *conn = 431 cluster_connection_from_user_lockres(lockres); 432 433 if (level != DLM_LOCK_EX && 434 level != DLM_LOCK_PR) { 435 mlog(ML_ERROR, "lockres %.*s: invalid request!\n", 436 lockres->l_namelen, lockres->l_name); 437 status = -EINVAL; 438 goto bail; 439 } 440 441 mlog(ML_BASTS, "lockres %.*s, level %d, flags = 0x%x\n", 442 lockres->l_namelen, lockres->l_name, level, lkm_flags); 443 444 again: 445 if (signal_pending(current)) { 446 status = -ERESTARTSYS; 447 goto bail; 448 } 449 450 spin_lock(&lockres->l_lock); 451 452 /* We only compare against the currently granted level 453 * here. If the lock is blocked waiting on a downconvert, 454 * we'll get caught below. */ 455 if ((lockres->l_flags & USER_LOCK_BUSY) && 456 (level > lockres->l_level)) { 457 /* is someone sitting in dlm_lock? If so, wait on 458 * them. */ 459 spin_unlock(&lockres->l_lock); 460 461 user_wait_on_busy_lock(lockres); 462 goto again; 463 } 464 465 if ((lockres->l_flags & USER_LOCK_BLOCKED) && 466 (!user_may_continue_on_blocked_lock(lockres, level))) { 467 /* is the lock is currently blocked on behalf of 468 * another node */ 469 spin_unlock(&lockres->l_lock); 470 471 user_wait_on_blocked_lock(lockres); 472 goto again; 473 } 474 475 if (level > lockres->l_level) { 476 local_flags = lkm_flags | DLM_LKF_VALBLK; 477 if (lockres->l_level != DLM_LOCK_IV) 478 local_flags |= DLM_LKF_CONVERT; 479 480 lockres->l_requested = level; 481 lockres->l_flags |= USER_LOCK_BUSY; 482 spin_unlock(&lockres->l_lock); 483 484 BUG_ON(level == DLM_LOCK_IV); 485 BUG_ON(level == DLM_LOCK_NL); 486 487 /* call dlm_lock to upgrade lock now */ 488 status = ocfs2_dlm_lock(conn, level, &lockres->l_lksb, 489 local_flags, lockres->l_name, 490 lockres->l_namelen); 491 if (status) { 492 if ((lkm_flags & DLM_LKF_NOQUEUE) && 493 (status != -EAGAIN)) 494 user_log_dlm_error("ocfs2_dlm_lock", 495 status, lockres); 496 user_recover_from_dlm_error(lockres); 497 goto bail; 498 } 499 500 user_wait_on_busy_lock(lockres); 501 goto again; 502 } 503 504 user_dlm_inc_holders(lockres, level); 505 spin_unlock(&lockres->l_lock); 506 507 status = 0; 508 bail: 509 return status; 510 } 511 512 static inline void user_dlm_dec_holders(struct user_lock_res *lockres, 513 int level) 514 { 515 switch(level) { 516 case DLM_LOCK_EX: 517 BUG_ON(!lockres->l_ex_holders); 518 lockres->l_ex_holders--; 519 break; 520 case DLM_LOCK_PR: 521 BUG_ON(!lockres->l_ro_holders); 522 lockres->l_ro_holders--; 523 break; 524 default: 525 BUG(); 526 } 527 } 528 529 void user_dlm_cluster_unlock(struct user_lock_res *lockres, 530 int level) 531 { 532 if (level != DLM_LOCK_EX && 533 level != DLM_LOCK_PR) { 534 mlog(ML_ERROR, "lockres %.*s: invalid request!\n", 535 lockres->l_namelen, lockres->l_name); 536 return; 537 } 538 539 spin_lock(&lockres->l_lock); 540 user_dlm_dec_holders(lockres, level); 541 __user_dlm_cond_queue_lockres(lockres); 542 spin_unlock(&lockres->l_lock); 543 } 544 545 void user_dlm_write_lvb(struct inode *inode, 546 const char *val, 547 unsigned int len) 548 { 549 struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres; 550 char *lvb; 551 552 BUG_ON(len > DLM_LVB_LEN); 553 554 spin_lock(&lockres->l_lock); 555 556 BUG_ON(lockres->l_level < DLM_LOCK_EX); 557 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 558 memcpy(lvb, val, len); 559 560 spin_unlock(&lockres->l_lock); 561 } 562 563 ssize_t user_dlm_read_lvb(struct inode *inode, 564 char *val, 565 unsigned int len) 566 { 567 struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres; 568 char *lvb; 569 ssize_t ret = len; 570 571 BUG_ON(len > DLM_LVB_LEN); 572 573 spin_lock(&lockres->l_lock); 574 575 BUG_ON(lockres->l_level < DLM_LOCK_PR); 576 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)) { 577 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 578 memcpy(val, lvb, len); 579 } else 580 ret = 0; 581 582 spin_unlock(&lockres->l_lock); 583 return ret; 584 } 585 586 void user_dlm_lock_res_init(struct user_lock_res *lockres, 587 struct dentry *dentry) 588 { 589 memset(lockres, 0, sizeof(*lockres)); 590 591 spin_lock_init(&lockres->l_lock); 592 init_waitqueue_head(&lockres->l_event); 593 lockres->l_level = DLM_LOCK_IV; 594 lockres->l_requested = DLM_LOCK_IV; 595 lockres->l_blocking = DLM_LOCK_IV; 596 597 /* should have been checked before getting here. */ 598 BUG_ON(dentry->d_name.len >= USER_DLM_LOCK_ID_MAX_LEN); 599 600 memcpy(lockres->l_name, 601 dentry->d_name.name, 602 dentry->d_name.len); 603 lockres->l_namelen = dentry->d_name.len; 604 } 605 606 int user_dlm_destroy_lock(struct user_lock_res *lockres) 607 { 608 int status = -EBUSY; 609 struct ocfs2_cluster_connection *conn = 610 cluster_connection_from_user_lockres(lockres); 611 612 mlog(ML_BASTS, "lockres %.*s\n", lockres->l_namelen, lockres->l_name); 613 614 spin_lock(&lockres->l_lock); 615 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) { 616 spin_unlock(&lockres->l_lock); 617 return 0; 618 } 619 620 lockres->l_flags |= USER_LOCK_IN_TEARDOWN; 621 622 while (lockres->l_flags & USER_LOCK_BUSY) { 623 spin_unlock(&lockres->l_lock); 624 625 user_wait_on_busy_lock(lockres); 626 627 spin_lock(&lockres->l_lock); 628 } 629 630 if (lockres->l_ro_holders || lockres->l_ex_holders) { 631 spin_unlock(&lockres->l_lock); 632 goto bail; 633 } 634 635 status = 0; 636 if (!(lockres->l_flags & USER_LOCK_ATTACHED)) { 637 spin_unlock(&lockres->l_lock); 638 goto bail; 639 } 640 641 lockres->l_flags &= ~USER_LOCK_ATTACHED; 642 lockres->l_flags |= USER_LOCK_BUSY; 643 spin_unlock(&lockres->l_lock); 644 645 status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, DLM_LKF_VALBLK); 646 if (status) { 647 user_log_dlm_error("ocfs2_dlm_unlock", status, lockres); 648 goto bail; 649 } 650 651 user_wait_on_busy_lock(lockres); 652 653 status = 0; 654 bail: 655 return status; 656 } 657 658 static void user_dlm_recovery_handler_noop(int node_num, 659 void *recovery_data) 660 { 661 /* We ignore recovery events */ 662 return; 663 } 664 665 void user_dlm_set_locking_protocol(void) 666 { 667 ocfs2_stack_glue_set_max_proto_version(&user_dlm_lproto.lp_max_version); 668 } 669 670 struct ocfs2_cluster_connection *user_dlm_register(struct qstr *name) 671 { 672 int rc; 673 struct ocfs2_cluster_connection *conn; 674 675 rc = ocfs2_cluster_connect_agnostic(name->name, name->len, 676 &user_dlm_lproto, 677 user_dlm_recovery_handler_noop, 678 NULL, &conn); 679 if (rc) 680 mlog_errno(rc); 681 682 return rc ? ERR_PTR(rc) : conn; 683 } 684 685 void user_dlm_unregister(struct ocfs2_cluster_connection *conn) 686 { 687 ocfs2_cluster_disconnect(conn, 0); 688 } 689