1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * userdlm.c 5 * 6 * Code which implements the kernel side of a minimal userspace 7 * interface to our DLM. 8 * 9 * Many of the functions here are pared down versions of dlmglue.c 10 * functions. 11 * 12 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 13 * 14 * This program is free software; you can redistribute it and/or 15 * modify it under the terms of the GNU General Public 16 * License as published by the Free Software Foundation; either 17 * version 2 of the License, or (at your option) any later version. 18 * 19 * This program is distributed in the hope that it will be useful, 20 * but WITHOUT ANY WARRANTY; without even the implied warranty of 21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 22 * General Public License for more details. 23 * 24 * You should have received a copy of the GNU General Public 25 * License along with this program; if not, write to the 26 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 27 * Boston, MA 021110-1307, USA. 28 */ 29 30 #include <linux/signal.h> 31 #include <linux/sched/signal.h> 32 33 #include <linux/module.h> 34 #include <linux/fs.h> 35 #include <linux/types.h> 36 #include <linux/crc32.h> 37 38 #include "ocfs2_lockingver.h" 39 #include "stackglue.h" 40 #include "userdlm.h" 41 42 #define MLOG_MASK_PREFIX ML_DLMFS 43 #include "cluster/masklog.h" 44 45 46 static inline struct user_lock_res *user_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb) 47 { 48 return container_of(lksb, struct user_lock_res, l_lksb); 49 } 50 51 static inline int user_check_wait_flag(struct user_lock_res *lockres, 52 int flag) 53 { 54 int ret; 55 56 spin_lock(&lockres->l_lock); 57 ret = lockres->l_flags & flag; 58 spin_unlock(&lockres->l_lock); 59 60 return ret; 61 } 62 63 static inline void user_wait_on_busy_lock(struct user_lock_res *lockres) 64 65 { 66 wait_event(lockres->l_event, 67 !user_check_wait_flag(lockres, USER_LOCK_BUSY)); 68 } 69 70 static inline void user_wait_on_blocked_lock(struct user_lock_res *lockres) 71 72 { 73 wait_event(lockres->l_event, 74 !user_check_wait_flag(lockres, USER_LOCK_BLOCKED)); 75 } 76 77 /* I heart container_of... */ 78 static inline struct ocfs2_cluster_connection * 79 cluster_connection_from_user_lockres(struct user_lock_res *lockres) 80 { 81 struct dlmfs_inode_private *ip; 82 83 ip = container_of(lockres, 84 struct dlmfs_inode_private, 85 ip_lockres); 86 return ip->ip_conn; 87 } 88 89 static struct inode * 90 user_dlm_inode_from_user_lockres(struct user_lock_res *lockres) 91 { 92 struct dlmfs_inode_private *ip; 93 94 ip = container_of(lockres, 95 struct dlmfs_inode_private, 96 ip_lockres); 97 return &ip->ip_vfs_inode; 98 } 99 100 static inline void user_recover_from_dlm_error(struct user_lock_res *lockres) 101 { 102 spin_lock(&lockres->l_lock); 103 lockres->l_flags &= ~USER_LOCK_BUSY; 104 spin_unlock(&lockres->l_lock); 105 } 106 107 #define user_log_dlm_error(_func, _stat, _lockres) do { \ 108 mlog(ML_ERROR, "Dlm error %d while calling %s on " \ 109 "resource %.*s\n", _stat, _func, \ 110 _lockres->l_namelen, _lockres->l_name); \ 111 } while (0) 112 113 /* WARNING: This function lives in a world where the only three lock 114 * levels are EX, PR, and NL. It *will* have to be adjusted when more 115 * lock types are added. */ 116 static inline int user_highest_compat_lock_level(int level) 117 { 118 int new_level = DLM_LOCK_EX; 119 120 if (level == DLM_LOCK_EX) 121 new_level = DLM_LOCK_NL; 122 else if (level == DLM_LOCK_PR) 123 new_level = DLM_LOCK_PR; 124 return new_level; 125 } 126 127 static void user_ast(struct ocfs2_dlm_lksb *lksb) 128 { 129 struct user_lock_res *lockres = user_lksb_to_lock_res(lksb); 130 int status; 131 132 mlog(ML_BASTS, "AST fired for lockres %.*s, level %d => %d\n", 133 lockres->l_namelen, lockres->l_name, lockres->l_level, 134 lockres->l_requested); 135 136 spin_lock(&lockres->l_lock); 137 138 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 139 if (status) { 140 mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n", 141 status, lockres->l_namelen, lockres->l_name); 142 spin_unlock(&lockres->l_lock); 143 return; 144 } 145 146 mlog_bug_on_msg(lockres->l_requested == DLM_LOCK_IV, 147 "Lockres %.*s, requested ivmode. flags 0x%x\n", 148 lockres->l_namelen, lockres->l_name, lockres->l_flags); 149 150 /* we're downconverting. */ 151 if (lockres->l_requested < lockres->l_level) { 152 if (lockres->l_requested <= 153 user_highest_compat_lock_level(lockres->l_blocking)) { 154 lockres->l_blocking = DLM_LOCK_NL; 155 lockres->l_flags &= ~USER_LOCK_BLOCKED; 156 } 157 } 158 159 lockres->l_level = lockres->l_requested; 160 lockres->l_requested = DLM_LOCK_IV; 161 lockres->l_flags |= USER_LOCK_ATTACHED; 162 lockres->l_flags &= ~USER_LOCK_BUSY; 163 164 spin_unlock(&lockres->l_lock); 165 166 wake_up(&lockres->l_event); 167 } 168 169 static inline void user_dlm_grab_inode_ref(struct user_lock_res *lockres) 170 { 171 struct inode *inode; 172 inode = user_dlm_inode_from_user_lockres(lockres); 173 if (!igrab(inode)) 174 BUG(); 175 } 176 177 static void user_dlm_unblock_lock(struct work_struct *work); 178 179 static void __user_dlm_queue_lockres(struct user_lock_res *lockres) 180 { 181 if (!(lockres->l_flags & USER_LOCK_QUEUED)) { 182 user_dlm_grab_inode_ref(lockres); 183 184 INIT_WORK(&lockres->l_work, user_dlm_unblock_lock); 185 186 queue_work(user_dlm_worker, &lockres->l_work); 187 lockres->l_flags |= USER_LOCK_QUEUED; 188 } 189 } 190 191 static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres) 192 { 193 int queue = 0; 194 195 if (!(lockres->l_flags & USER_LOCK_BLOCKED)) 196 return; 197 198 switch (lockres->l_blocking) { 199 case DLM_LOCK_EX: 200 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 201 queue = 1; 202 break; 203 case DLM_LOCK_PR: 204 if (!lockres->l_ex_holders) 205 queue = 1; 206 break; 207 default: 208 BUG(); 209 } 210 211 if (queue) 212 __user_dlm_queue_lockres(lockres); 213 } 214 215 static void user_bast(struct ocfs2_dlm_lksb *lksb, int level) 216 { 217 struct user_lock_res *lockres = user_lksb_to_lock_res(lksb); 218 219 mlog(ML_BASTS, "BAST fired for lockres %.*s, blocking %d, level %d\n", 220 lockres->l_namelen, lockres->l_name, level, lockres->l_level); 221 222 spin_lock(&lockres->l_lock); 223 lockres->l_flags |= USER_LOCK_BLOCKED; 224 if (level > lockres->l_blocking) 225 lockres->l_blocking = level; 226 227 __user_dlm_queue_lockres(lockres); 228 spin_unlock(&lockres->l_lock); 229 230 wake_up(&lockres->l_event); 231 } 232 233 static void user_unlock_ast(struct ocfs2_dlm_lksb *lksb, int status) 234 { 235 struct user_lock_res *lockres = user_lksb_to_lock_res(lksb); 236 237 mlog(ML_BASTS, "UNLOCK AST fired for lockres %.*s, flags 0x%x\n", 238 lockres->l_namelen, lockres->l_name, lockres->l_flags); 239 240 if (status) 241 mlog(ML_ERROR, "dlm returns status %d\n", status); 242 243 spin_lock(&lockres->l_lock); 244 /* The teardown flag gets set early during the unlock process, 245 * so test the cancel flag to make sure that this ast isn't 246 * for a concurrent cancel. */ 247 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN 248 && !(lockres->l_flags & USER_LOCK_IN_CANCEL)) { 249 lockres->l_level = DLM_LOCK_IV; 250 } else if (status == DLM_CANCELGRANT) { 251 /* We tried to cancel a convert request, but it was 252 * already granted. Don't clear the busy flag - the 253 * ast should've done this already. */ 254 BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL)); 255 lockres->l_flags &= ~USER_LOCK_IN_CANCEL; 256 goto out_noclear; 257 } else { 258 BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL)); 259 /* Cancel succeeded, we want to re-queue */ 260 lockres->l_requested = DLM_LOCK_IV; /* cancel an 261 * upconvert 262 * request. */ 263 lockres->l_flags &= ~USER_LOCK_IN_CANCEL; 264 /* we want the unblock thread to look at it again 265 * now. */ 266 if (lockres->l_flags & USER_LOCK_BLOCKED) 267 __user_dlm_queue_lockres(lockres); 268 } 269 270 lockres->l_flags &= ~USER_LOCK_BUSY; 271 out_noclear: 272 spin_unlock(&lockres->l_lock); 273 274 wake_up(&lockres->l_event); 275 } 276 277 /* 278 * This is the userdlmfs locking protocol version. 279 * 280 * See fs/ocfs2/dlmglue.c for more details on locking versions. 281 */ 282 static struct ocfs2_locking_protocol user_dlm_lproto = { 283 .lp_max_version = { 284 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 285 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 286 }, 287 .lp_lock_ast = user_ast, 288 .lp_blocking_ast = user_bast, 289 .lp_unlock_ast = user_unlock_ast, 290 }; 291 292 static inline void user_dlm_drop_inode_ref(struct user_lock_res *lockres) 293 { 294 struct inode *inode; 295 inode = user_dlm_inode_from_user_lockres(lockres); 296 iput(inode); 297 } 298 299 static void user_dlm_unblock_lock(struct work_struct *work) 300 { 301 int new_level, status; 302 struct user_lock_res *lockres = 303 container_of(work, struct user_lock_res, l_work); 304 struct ocfs2_cluster_connection *conn = 305 cluster_connection_from_user_lockres(lockres); 306 307 mlog(0, "lockres %.*s\n", lockres->l_namelen, lockres->l_name); 308 309 spin_lock(&lockres->l_lock); 310 311 mlog_bug_on_msg(!(lockres->l_flags & USER_LOCK_QUEUED), 312 "Lockres %.*s, flags 0x%x\n", 313 lockres->l_namelen, lockres->l_name, lockres->l_flags); 314 315 /* notice that we don't clear USER_LOCK_BLOCKED here. If it's 316 * set, we want user_ast clear it. */ 317 lockres->l_flags &= ~USER_LOCK_QUEUED; 318 319 /* It's valid to get here and no longer be blocked - if we get 320 * several basts in a row, we might be queued by the first 321 * one, the unblock thread might run and clear the queued 322 * flag, and finally we might get another bast which re-queues 323 * us before our ast for the downconvert is called. */ 324 if (!(lockres->l_flags & USER_LOCK_BLOCKED)) { 325 mlog(ML_BASTS, "lockres %.*s USER_LOCK_BLOCKED\n", 326 lockres->l_namelen, lockres->l_name); 327 spin_unlock(&lockres->l_lock); 328 goto drop_ref; 329 } 330 331 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) { 332 mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_TEARDOWN\n", 333 lockres->l_namelen, lockres->l_name); 334 spin_unlock(&lockres->l_lock); 335 goto drop_ref; 336 } 337 338 if (lockres->l_flags & USER_LOCK_BUSY) { 339 if (lockres->l_flags & USER_LOCK_IN_CANCEL) { 340 mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_CANCEL\n", 341 lockres->l_namelen, lockres->l_name); 342 spin_unlock(&lockres->l_lock); 343 goto drop_ref; 344 } 345 346 lockres->l_flags |= USER_LOCK_IN_CANCEL; 347 spin_unlock(&lockres->l_lock); 348 349 status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, 350 DLM_LKF_CANCEL); 351 if (status) 352 user_log_dlm_error("ocfs2_dlm_unlock", status, lockres); 353 goto drop_ref; 354 } 355 356 /* If there are still incompat holders, we can exit safely 357 * without worrying about re-queueing this lock as that will 358 * happen on the last call to user_cluster_unlock. */ 359 if ((lockres->l_blocking == DLM_LOCK_EX) 360 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 361 spin_unlock(&lockres->l_lock); 362 mlog(ML_BASTS, "lockres %.*s, EX/PR Holders %u,%u\n", 363 lockres->l_namelen, lockres->l_name, 364 lockres->l_ex_holders, lockres->l_ro_holders); 365 goto drop_ref; 366 } 367 368 if ((lockres->l_blocking == DLM_LOCK_PR) 369 && lockres->l_ex_holders) { 370 spin_unlock(&lockres->l_lock); 371 mlog(ML_BASTS, "lockres %.*s, EX Holders %u\n", 372 lockres->l_namelen, lockres->l_name, 373 lockres->l_ex_holders); 374 goto drop_ref; 375 } 376 377 /* yay, we can downconvert now. */ 378 new_level = user_highest_compat_lock_level(lockres->l_blocking); 379 lockres->l_requested = new_level; 380 lockres->l_flags |= USER_LOCK_BUSY; 381 mlog(ML_BASTS, "lockres %.*s, downconvert %d => %d\n", 382 lockres->l_namelen, lockres->l_name, lockres->l_level, new_level); 383 spin_unlock(&lockres->l_lock); 384 385 /* need lock downconvert request now... */ 386 status = ocfs2_dlm_lock(conn, new_level, &lockres->l_lksb, 387 DLM_LKF_CONVERT|DLM_LKF_VALBLK, 388 lockres->l_name, 389 lockres->l_namelen); 390 if (status) { 391 user_log_dlm_error("ocfs2_dlm_lock", status, lockres); 392 user_recover_from_dlm_error(lockres); 393 } 394 395 drop_ref: 396 user_dlm_drop_inode_ref(lockres); 397 } 398 399 static inline void user_dlm_inc_holders(struct user_lock_res *lockres, 400 int level) 401 { 402 switch(level) { 403 case DLM_LOCK_EX: 404 lockres->l_ex_holders++; 405 break; 406 case DLM_LOCK_PR: 407 lockres->l_ro_holders++; 408 break; 409 default: 410 BUG(); 411 } 412 } 413 414 /* predict what lock level we'll be dropping down to on behalf 415 * of another node, and return true if the currently wanted 416 * level will be compatible with it. */ 417 static inline int 418 user_may_continue_on_blocked_lock(struct user_lock_res *lockres, 419 int wanted) 420 { 421 BUG_ON(!(lockres->l_flags & USER_LOCK_BLOCKED)); 422 423 return wanted <= user_highest_compat_lock_level(lockres->l_blocking); 424 } 425 426 int user_dlm_cluster_lock(struct user_lock_res *lockres, 427 int level, 428 int lkm_flags) 429 { 430 int status, local_flags; 431 struct ocfs2_cluster_connection *conn = 432 cluster_connection_from_user_lockres(lockres); 433 434 if (level != DLM_LOCK_EX && 435 level != DLM_LOCK_PR) { 436 mlog(ML_ERROR, "lockres %.*s: invalid request!\n", 437 lockres->l_namelen, lockres->l_name); 438 status = -EINVAL; 439 goto bail; 440 } 441 442 mlog(ML_BASTS, "lockres %.*s, level %d, flags = 0x%x\n", 443 lockres->l_namelen, lockres->l_name, level, lkm_flags); 444 445 again: 446 if (signal_pending(current)) { 447 status = -ERESTARTSYS; 448 goto bail; 449 } 450 451 spin_lock(&lockres->l_lock); 452 453 /* We only compare against the currently granted level 454 * here. If the lock is blocked waiting on a downconvert, 455 * we'll get caught below. */ 456 if ((lockres->l_flags & USER_LOCK_BUSY) && 457 (level > lockres->l_level)) { 458 /* is someone sitting in dlm_lock? If so, wait on 459 * them. */ 460 spin_unlock(&lockres->l_lock); 461 462 user_wait_on_busy_lock(lockres); 463 goto again; 464 } 465 466 if ((lockres->l_flags & USER_LOCK_BLOCKED) && 467 (!user_may_continue_on_blocked_lock(lockres, level))) { 468 /* is the lock is currently blocked on behalf of 469 * another node */ 470 spin_unlock(&lockres->l_lock); 471 472 user_wait_on_blocked_lock(lockres); 473 goto again; 474 } 475 476 if (level > lockres->l_level) { 477 local_flags = lkm_flags | DLM_LKF_VALBLK; 478 if (lockres->l_level != DLM_LOCK_IV) 479 local_flags |= DLM_LKF_CONVERT; 480 481 lockres->l_requested = level; 482 lockres->l_flags |= USER_LOCK_BUSY; 483 spin_unlock(&lockres->l_lock); 484 485 BUG_ON(level == DLM_LOCK_IV); 486 BUG_ON(level == DLM_LOCK_NL); 487 488 /* call dlm_lock to upgrade lock now */ 489 status = ocfs2_dlm_lock(conn, level, &lockres->l_lksb, 490 local_flags, lockres->l_name, 491 lockres->l_namelen); 492 if (status) { 493 if ((lkm_flags & DLM_LKF_NOQUEUE) && 494 (status != -EAGAIN)) 495 user_log_dlm_error("ocfs2_dlm_lock", 496 status, lockres); 497 user_recover_from_dlm_error(lockres); 498 goto bail; 499 } 500 501 user_wait_on_busy_lock(lockres); 502 goto again; 503 } 504 505 user_dlm_inc_holders(lockres, level); 506 spin_unlock(&lockres->l_lock); 507 508 status = 0; 509 bail: 510 return status; 511 } 512 513 static inline void user_dlm_dec_holders(struct user_lock_res *lockres, 514 int level) 515 { 516 switch(level) { 517 case DLM_LOCK_EX: 518 BUG_ON(!lockres->l_ex_holders); 519 lockres->l_ex_holders--; 520 break; 521 case DLM_LOCK_PR: 522 BUG_ON(!lockres->l_ro_holders); 523 lockres->l_ro_holders--; 524 break; 525 default: 526 BUG(); 527 } 528 } 529 530 void user_dlm_cluster_unlock(struct user_lock_res *lockres, 531 int level) 532 { 533 if (level != DLM_LOCK_EX && 534 level != DLM_LOCK_PR) { 535 mlog(ML_ERROR, "lockres %.*s: invalid request!\n", 536 lockres->l_namelen, lockres->l_name); 537 return; 538 } 539 540 spin_lock(&lockres->l_lock); 541 user_dlm_dec_holders(lockres, level); 542 __user_dlm_cond_queue_lockres(lockres); 543 spin_unlock(&lockres->l_lock); 544 } 545 546 void user_dlm_write_lvb(struct inode *inode, 547 const char *val, 548 unsigned int len) 549 { 550 struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres; 551 char *lvb; 552 553 BUG_ON(len > DLM_LVB_LEN); 554 555 spin_lock(&lockres->l_lock); 556 557 BUG_ON(lockres->l_level < DLM_LOCK_EX); 558 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 559 memcpy(lvb, val, len); 560 561 spin_unlock(&lockres->l_lock); 562 } 563 564 ssize_t user_dlm_read_lvb(struct inode *inode, 565 char *val, 566 unsigned int len) 567 { 568 struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres; 569 char *lvb; 570 ssize_t ret = len; 571 572 BUG_ON(len > DLM_LVB_LEN); 573 574 spin_lock(&lockres->l_lock); 575 576 BUG_ON(lockres->l_level < DLM_LOCK_PR); 577 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)) { 578 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 579 memcpy(val, lvb, len); 580 } else 581 ret = 0; 582 583 spin_unlock(&lockres->l_lock); 584 return ret; 585 } 586 587 void user_dlm_lock_res_init(struct user_lock_res *lockres, 588 struct dentry *dentry) 589 { 590 memset(lockres, 0, sizeof(*lockres)); 591 592 spin_lock_init(&lockres->l_lock); 593 init_waitqueue_head(&lockres->l_event); 594 lockres->l_level = DLM_LOCK_IV; 595 lockres->l_requested = DLM_LOCK_IV; 596 lockres->l_blocking = DLM_LOCK_IV; 597 598 /* should have been checked before getting here. */ 599 BUG_ON(dentry->d_name.len >= USER_DLM_LOCK_ID_MAX_LEN); 600 601 memcpy(lockres->l_name, 602 dentry->d_name.name, 603 dentry->d_name.len); 604 lockres->l_namelen = dentry->d_name.len; 605 } 606 607 int user_dlm_destroy_lock(struct user_lock_res *lockres) 608 { 609 int status = -EBUSY; 610 struct ocfs2_cluster_connection *conn = 611 cluster_connection_from_user_lockres(lockres); 612 613 mlog(ML_BASTS, "lockres %.*s\n", lockres->l_namelen, lockres->l_name); 614 615 spin_lock(&lockres->l_lock); 616 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) { 617 spin_unlock(&lockres->l_lock); 618 return 0; 619 } 620 621 lockres->l_flags |= USER_LOCK_IN_TEARDOWN; 622 623 while (lockres->l_flags & USER_LOCK_BUSY) { 624 spin_unlock(&lockres->l_lock); 625 626 user_wait_on_busy_lock(lockres); 627 628 spin_lock(&lockres->l_lock); 629 } 630 631 if (lockres->l_ro_holders || lockres->l_ex_holders) { 632 spin_unlock(&lockres->l_lock); 633 goto bail; 634 } 635 636 status = 0; 637 if (!(lockres->l_flags & USER_LOCK_ATTACHED)) { 638 spin_unlock(&lockres->l_lock); 639 goto bail; 640 } 641 642 lockres->l_flags &= ~USER_LOCK_ATTACHED; 643 lockres->l_flags |= USER_LOCK_BUSY; 644 spin_unlock(&lockres->l_lock); 645 646 status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, DLM_LKF_VALBLK); 647 if (status) { 648 user_log_dlm_error("ocfs2_dlm_unlock", status, lockres); 649 goto bail; 650 } 651 652 user_wait_on_busy_lock(lockres); 653 654 status = 0; 655 bail: 656 return status; 657 } 658 659 static void user_dlm_recovery_handler_noop(int node_num, 660 void *recovery_data) 661 { 662 /* We ignore recovery events */ 663 return; 664 } 665 666 void user_dlm_set_locking_protocol(void) 667 { 668 ocfs2_stack_glue_set_max_proto_version(&user_dlm_lproto.lp_max_version); 669 } 670 671 struct ocfs2_cluster_connection *user_dlm_register(const struct qstr *name) 672 { 673 int rc; 674 struct ocfs2_cluster_connection *conn; 675 676 rc = ocfs2_cluster_connect_agnostic(name->name, name->len, 677 &user_dlm_lproto, 678 user_dlm_recovery_handler_noop, 679 NULL, &conn); 680 if (rc) 681 mlog_errno(rc); 682 683 return rc ? ERR_PTR(rc) : conn; 684 } 685 686 void user_dlm_unregister(struct ocfs2_cluster_connection *conn) 687 { 688 ocfs2_cluster_disconnect(conn, 0); 689 } 690