1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * stack_user.c 4 * 5 * Code which interfaces ocfs2 with fs/dlm and a userspace stack. 6 * 7 * Copyright (C) 2007 Oracle. All rights reserved. 8 */ 9 10 #include <linux/module.h> 11 #include <linux/fs.h> 12 #include <linux/filelock.h> 13 #include <linux/miscdevice.h> 14 #include <linux/mutex.h> 15 #include <linux/slab.h> 16 #include <linux/reboot.h> 17 #include <linux/sched.h> 18 #include <linux/uaccess.h> 19 20 #include "stackglue.h" 21 22 #include <linux/dlm_plock.h> 23 24 /* 25 * The control protocol starts with a handshake. Until the handshake 26 * is complete, the control device will fail all write(2)s. 27 * 28 * The handshake is simple. First, the client reads until EOF. Each line 29 * of output is a supported protocol tag. All protocol tags are a single 30 * character followed by a two hex digit version number. Currently the 31 * only things supported is T01, for "Text-base version 0x01". Next, the 32 * client writes the version they would like to use, including the newline. 33 * Thus, the protocol tag is 'T01\n'. If the version tag written is 34 * unknown, -EINVAL is returned. Once the negotiation is complete, the 35 * client can start sending messages. 36 * 37 * The T01 protocol has three messages. First is the "SETN" message. 38 * It has the following syntax: 39 * 40 * SETN<space><8-char-hex-nodenum><newline> 41 * 42 * This is 14 characters. 43 * 44 * The "SETN" message must be the first message following the protocol. 45 * It tells ocfs2_control the local node number. 46 * 47 * Next comes the "SETV" message. It has the following syntax: 48 * 49 * SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> 50 * 51 * This is 11 characters. 52 * 53 * The "SETV" message sets the filesystem locking protocol version as 54 * negotiated by the client. The client negotiates based on the maximum 55 * version advertised in /sys/fs/ocfs2/max_locking_protocol. The major 56 * number from the "SETV" message must match 57 * ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number 58 * must be less than or equal to ...sp_max_version.pv_minor. 59 * 60 * Once this information has been set, mounts will be allowed. From this 61 * point on, the "DOWN" message can be sent for node down notification. 62 * It has the following syntax: 63 * 64 * DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> 65 * 66 * eg: 67 * 68 * DOWN 632A924FDD844190BDA93C0DF6B94899 00000001\n 69 * 70 * This is 47 characters. 71 */ 72 73 /* 74 * Whether or not the client has done the handshake. 75 * For now, we have just one protocol version. 76 */ 77 #define OCFS2_CONTROL_PROTO "T01\n" 78 #define OCFS2_CONTROL_PROTO_LEN 4 79 80 /* Handshake states */ 81 #define OCFS2_CONTROL_HANDSHAKE_INVALID (0) 82 #define OCFS2_CONTROL_HANDSHAKE_READ (1) 83 #define OCFS2_CONTROL_HANDSHAKE_PROTOCOL (2) 84 #define OCFS2_CONTROL_HANDSHAKE_VALID (3) 85 86 /* Messages */ 87 #define OCFS2_CONTROL_MESSAGE_OP_LEN 4 88 #define OCFS2_CONTROL_MESSAGE_SETNODE_OP "SETN" 89 #define OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN 14 90 #define OCFS2_CONTROL_MESSAGE_SETVERSION_OP "SETV" 91 #define OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN 11 92 #define OCFS2_CONTROL_MESSAGE_DOWN_OP "DOWN" 93 #define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN 47 94 #define OCFS2_TEXT_UUID_LEN 32 95 #define OCFS2_CONTROL_MESSAGE_VERNUM_LEN 2 96 #define OCFS2_CONTROL_MESSAGE_NODENUM_LEN 8 97 #define VERSION_LOCK "version_lock" 98 99 enum ocfs2_connection_type { 100 WITH_CONTROLD, 101 NO_CONTROLD 102 }; 103 104 /* 105 * ocfs2_live_connection is refcounted because the filesystem and 106 * miscdevice sides can detach in different order. Let's just be safe. 107 */ 108 struct ocfs2_live_connection { 109 struct list_head oc_list; 110 struct ocfs2_cluster_connection *oc_conn; 111 enum ocfs2_connection_type oc_type; 112 atomic_t oc_this_node; 113 int oc_our_slot; 114 struct dlm_lksb oc_version_lksb; 115 char oc_lvb[DLM_LVB_LEN]; 116 struct completion oc_sync_wait; 117 wait_queue_head_t oc_wait; 118 }; 119 120 struct ocfs2_control_private { 121 struct list_head op_list; 122 int op_state; 123 int op_this_node; 124 struct ocfs2_protocol_version op_proto; 125 }; 126 127 /* SETN<space><8-char-hex-nodenum><newline> */ 128 struct ocfs2_control_message_setn { 129 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 130 char space; 131 char nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN]; 132 char newline; 133 }; 134 135 /* SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> */ 136 struct ocfs2_control_message_setv { 137 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 138 char space1; 139 char major[OCFS2_CONTROL_MESSAGE_VERNUM_LEN]; 140 char space2; 141 char minor[OCFS2_CONTROL_MESSAGE_VERNUM_LEN]; 142 char newline; 143 }; 144 145 /* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */ 146 struct ocfs2_control_message_down { 147 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 148 char space1; 149 char uuid[OCFS2_TEXT_UUID_LEN]; 150 char space2; 151 char nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN]; 152 char newline; 153 }; 154 155 union ocfs2_control_message { 156 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 157 struct ocfs2_control_message_setn u_setn; 158 struct ocfs2_control_message_setv u_setv; 159 struct ocfs2_control_message_down u_down; 160 }; 161 162 static struct ocfs2_stack_plugin ocfs2_user_plugin; 163 164 static atomic_t ocfs2_control_opened; 165 static int ocfs2_control_this_node = -1; 166 static struct ocfs2_protocol_version running_proto; 167 168 static LIST_HEAD(ocfs2_live_connection_list); 169 static LIST_HEAD(ocfs2_control_private_list); 170 static DEFINE_MUTEX(ocfs2_control_lock); 171 172 static inline void ocfs2_control_set_handshake_state(struct file *file, 173 int state) 174 { 175 struct ocfs2_control_private *p = file->private_data; 176 p->op_state = state; 177 } 178 179 static inline int ocfs2_control_get_handshake_state(struct file *file) 180 { 181 struct ocfs2_control_private *p = file->private_data; 182 return p->op_state; 183 } 184 185 static struct ocfs2_live_connection *ocfs2_connection_find(const char *name) 186 { 187 size_t len = strlen(name); 188 struct ocfs2_live_connection *c; 189 190 BUG_ON(!mutex_is_locked(&ocfs2_control_lock)); 191 192 list_for_each_entry(c, &ocfs2_live_connection_list, oc_list) { 193 if ((c->oc_conn->cc_namelen == len) && 194 !strncmp(c->oc_conn->cc_name, name, len)) 195 return c; 196 } 197 198 return NULL; 199 } 200 201 /* 202 * ocfs2_live_connection structures are created underneath the ocfs2 203 * mount path. Since the VFS prevents multiple calls to 204 * fill_super(), we can't get dupes here. 205 */ 206 static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn, 207 struct ocfs2_live_connection *c) 208 { 209 int rc = 0; 210 211 mutex_lock(&ocfs2_control_lock); 212 c->oc_conn = conn; 213 214 if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened)) 215 list_add(&c->oc_list, &ocfs2_live_connection_list); 216 else { 217 printk(KERN_ERR 218 "ocfs2: Userspace control daemon is not present\n"); 219 rc = -ESRCH; 220 } 221 222 mutex_unlock(&ocfs2_control_lock); 223 return rc; 224 } 225 226 /* 227 * This function disconnects the cluster connection from ocfs2_control. 228 * Afterwards, userspace can't affect the cluster connection. 229 */ 230 static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c) 231 { 232 mutex_lock(&ocfs2_control_lock); 233 list_del_init(&c->oc_list); 234 c->oc_conn = NULL; 235 mutex_unlock(&ocfs2_control_lock); 236 237 kfree(c); 238 } 239 240 static int ocfs2_control_cfu(void *target, size_t target_len, 241 const char __user *buf, size_t count) 242 { 243 /* The T01 expects write(2) calls to have exactly one command */ 244 if ((count != target_len) || 245 (count > sizeof(union ocfs2_control_message))) 246 return -EINVAL; 247 248 if (copy_from_user(target, buf, target_len)) 249 return -EFAULT; 250 251 return 0; 252 } 253 254 static ssize_t ocfs2_control_validate_protocol(struct file *file, 255 const char __user *buf, 256 size_t count) 257 { 258 ssize_t ret; 259 char kbuf[OCFS2_CONTROL_PROTO_LEN]; 260 261 ret = ocfs2_control_cfu(kbuf, OCFS2_CONTROL_PROTO_LEN, 262 buf, count); 263 if (ret) 264 return ret; 265 266 if (strncmp(kbuf, OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN)) 267 return -EINVAL; 268 269 ocfs2_control_set_handshake_state(file, 270 OCFS2_CONTROL_HANDSHAKE_PROTOCOL); 271 272 return count; 273 } 274 275 static void ocfs2_control_send_down(const char *uuid, 276 int nodenum) 277 { 278 struct ocfs2_live_connection *c; 279 280 mutex_lock(&ocfs2_control_lock); 281 282 c = ocfs2_connection_find(uuid); 283 if (c) { 284 BUG_ON(c->oc_conn == NULL); 285 c->oc_conn->cc_recovery_handler(nodenum, 286 c->oc_conn->cc_recovery_data); 287 } 288 289 mutex_unlock(&ocfs2_control_lock); 290 } 291 292 /* 293 * Called whenever configuration elements are sent to /dev/ocfs2_control. 294 * If all configuration elements are present, try to set the global 295 * values. If there is a problem, return an error. Skip any missing 296 * elements, and only bump ocfs2_control_opened when we have all elements 297 * and are successful. 298 */ 299 static int ocfs2_control_install_private(struct file *file) 300 { 301 int rc = 0; 302 int set_p = 1; 303 struct ocfs2_control_private *p = file->private_data; 304 305 BUG_ON(p->op_state != OCFS2_CONTROL_HANDSHAKE_PROTOCOL); 306 307 mutex_lock(&ocfs2_control_lock); 308 309 if (p->op_this_node < 0) { 310 set_p = 0; 311 } else if ((ocfs2_control_this_node >= 0) && 312 (ocfs2_control_this_node != p->op_this_node)) { 313 rc = -EINVAL; 314 goto out_unlock; 315 } 316 317 if (!p->op_proto.pv_major) { 318 set_p = 0; 319 } else if (!list_empty(&ocfs2_live_connection_list) && 320 ((running_proto.pv_major != p->op_proto.pv_major) || 321 (running_proto.pv_minor != p->op_proto.pv_minor))) { 322 rc = -EINVAL; 323 goto out_unlock; 324 } 325 326 if (set_p) { 327 ocfs2_control_this_node = p->op_this_node; 328 running_proto.pv_major = p->op_proto.pv_major; 329 running_proto.pv_minor = p->op_proto.pv_minor; 330 } 331 332 out_unlock: 333 mutex_unlock(&ocfs2_control_lock); 334 335 if (!rc && set_p) { 336 /* We set the global values successfully */ 337 atomic_inc(&ocfs2_control_opened); 338 ocfs2_control_set_handshake_state(file, 339 OCFS2_CONTROL_HANDSHAKE_VALID); 340 } 341 342 return rc; 343 } 344 345 static int ocfs2_control_get_this_node(void) 346 { 347 int rc; 348 349 mutex_lock(&ocfs2_control_lock); 350 if (ocfs2_control_this_node < 0) 351 rc = -EINVAL; 352 else 353 rc = ocfs2_control_this_node; 354 mutex_unlock(&ocfs2_control_lock); 355 356 return rc; 357 } 358 359 static int ocfs2_control_do_setnode_msg(struct file *file, 360 struct ocfs2_control_message_setn *msg) 361 { 362 long nodenum; 363 char *ptr = NULL; 364 struct ocfs2_control_private *p = file->private_data; 365 366 if (ocfs2_control_get_handshake_state(file) != 367 OCFS2_CONTROL_HANDSHAKE_PROTOCOL) 368 return -EINVAL; 369 370 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP, 371 OCFS2_CONTROL_MESSAGE_OP_LEN)) 372 return -EINVAL; 373 374 if ((msg->space != ' ') || (msg->newline != '\n')) 375 return -EINVAL; 376 msg->space = msg->newline = '\0'; 377 378 nodenum = simple_strtol(msg->nodestr, &ptr, 16); 379 if (!ptr || *ptr) 380 return -EINVAL; 381 382 if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) || 383 (nodenum > INT_MAX) || (nodenum < 0)) 384 return -ERANGE; 385 p->op_this_node = nodenum; 386 387 return ocfs2_control_install_private(file); 388 } 389 390 static int ocfs2_control_do_setversion_msg(struct file *file, 391 struct ocfs2_control_message_setv *msg) 392 { 393 long major, minor; 394 char *ptr = NULL; 395 struct ocfs2_control_private *p = file->private_data; 396 struct ocfs2_protocol_version *max = 397 &ocfs2_user_plugin.sp_max_proto; 398 399 if (ocfs2_control_get_handshake_state(file) != 400 OCFS2_CONTROL_HANDSHAKE_PROTOCOL) 401 return -EINVAL; 402 403 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP, 404 OCFS2_CONTROL_MESSAGE_OP_LEN)) 405 return -EINVAL; 406 407 if ((msg->space1 != ' ') || (msg->space2 != ' ') || 408 (msg->newline != '\n')) 409 return -EINVAL; 410 msg->space1 = msg->space2 = msg->newline = '\0'; 411 412 major = simple_strtol(msg->major, &ptr, 16); 413 if (!ptr || *ptr) 414 return -EINVAL; 415 minor = simple_strtol(msg->minor, &ptr, 16); 416 if (!ptr || *ptr) 417 return -EINVAL; 418 419 /* 420 * The major must be between 1 and 255, inclusive. The minor 421 * must be between 0 and 255, inclusive. The version passed in 422 * must be within the maximum version supported by the filesystem. 423 */ 424 if ((major == LONG_MIN) || (major == LONG_MAX) || 425 (major > (u8)-1) || (major < 1)) 426 return -ERANGE; 427 if ((minor == LONG_MIN) || (minor == LONG_MAX) || 428 (minor > (u8)-1) || (minor < 0)) 429 return -ERANGE; 430 if ((major != max->pv_major) || 431 (minor > max->pv_minor)) 432 return -EINVAL; 433 434 p->op_proto.pv_major = major; 435 p->op_proto.pv_minor = minor; 436 437 return ocfs2_control_install_private(file); 438 } 439 440 static int ocfs2_control_do_down_msg(struct file *file, 441 struct ocfs2_control_message_down *msg) 442 { 443 long nodenum; 444 char *p = NULL; 445 446 if (ocfs2_control_get_handshake_state(file) != 447 OCFS2_CONTROL_HANDSHAKE_VALID) 448 return -EINVAL; 449 450 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_DOWN_OP, 451 OCFS2_CONTROL_MESSAGE_OP_LEN)) 452 return -EINVAL; 453 454 if ((msg->space1 != ' ') || (msg->space2 != ' ') || 455 (msg->newline != '\n')) 456 return -EINVAL; 457 msg->space1 = msg->space2 = msg->newline = '\0'; 458 459 nodenum = simple_strtol(msg->nodestr, &p, 16); 460 if (!p || *p) 461 return -EINVAL; 462 463 if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) || 464 (nodenum > INT_MAX) || (nodenum < 0)) 465 return -ERANGE; 466 467 ocfs2_control_send_down(msg->uuid, nodenum); 468 469 return 0; 470 } 471 472 static ssize_t ocfs2_control_message(struct file *file, 473 const char __user *buf, 474 size_t count) 475 { 476 ssize_t ret; 477 union ocfs2_control_message msg; 478 479 /* Try to catch padding issues */ 480 WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) != 481 (sizeof(msg.u_down.tag) + sizeof(msg.u_down.space1))); 482 483 memset(&msg, 0, sizeof(union ocfs2_control_message)); 484 ret = ocfs2_control_cfu(&msg, count, buf, count); 485 if (ret) 486 goto out; 487 488 if ((count == OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN) && 489 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP, 490 OCFS2_CONTROL_MESSAGE_OP_LEN)) 491 ret = ocfs2_control_do_setnode_msg(file, &msg.u_setn); 492 else if ((count == OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN) && 493 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP, 494 OCFS2_CONTROL_MESSAGE_OP_LEN)) 495 ret = ocfs2_control_do_setversion_msg(file, &msg.u_setv); 496 else if ((count == OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN) && 497 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN_OP, 498 OCFS2_CONTROL_MESSAGE_OP_LEN)) 499 ret = ocfs2_control_do_down_msg(file, &msg.u_down); 500 else 501 ret = -EINVAL; 502 503 out: 504 return ret ? ret : count; 505 } 506 507 static ssize_t ocfs2_control_write(struct file *file, 508 const char __user *buf, 509 size_t count, 510 loff_t *ppos) 511 { 512 ssize_t ret; 513 514 switch (ocfs2_control_get_handshake_state(file)) { 515 case OCFS2_CONTROL_HANDSHAKE_INVALID: 516 ret = -EINVAL; 517 break; 518 519 case OCFS2_CONTROL_HANDSHAKE_READ: 520 ret = ocfs2_control_validate_protocol(file, buf, 521 count); 522 break; 523 524 case OCFS2_CONTROL_HANDSHAKE_PROTOCOL: 525 case OCFS2_CONTROL_HANDSHAKE_VALID: 526 ret = ocfs2_control_message(file, buf, count); 527 break; 528 529 default: 530 BUG(); 531 ret = -EIO; 532 break; 533 } 534 535 return ret; 536 } 537 538 /* 539 * This is a naive version. If we ever have a new protocol, we'll expand 540 * it. Probably using seq_file. 541 */ 542 static ssize_t ocfs2_control_read(struct file *file, 543 char __user *buf, 544 size_t count, 545 loff_t *ppos) 546 { 547 ssize_t ret; 548 549 ret = simple_read_from_buffer(buf, count, ppos, 550 OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN); 551 552 /* Have we read the whole protocol list? */ 553 if (ret > 0 && *ppos >= OCFS2_CONTROL_PROTO_LEN) 554 ocfs2_control_set_handshake_state(file, 555 OCFS2_CONTROL_HANDSHAKE_READ); 556 557 return ret; 558 } 559 560 static int ocfs2_control_release(struct inode *inode, struct file *file) 561 { 562 struct ocfs2_control_private *p = file->private_data; 563 564 mutex_lock(&ocfs2_control_lock); 565 566 if (ocfs2_control_get_handshake_state(file) != 567 OCFS2_CONTROL_HANDSHAKE_VALID) 568 goto out; 569 570 if (atomic_dec_and_test(&ocfs2_control_opened)) { 571 if (!list_empty(&ocfs2_live_connection_list)) { 572 /* XXX: Do bad things! */ 573 printk(KERN_ERR 574 "ocfs2: Unexpected release of ocfs2_control!\n" 575 " Loss of cluster connection requires " 576 "an emergency restart!\n"); 577 emergency_restart(); 578 } 579 /* 580 * Last valid close clears the node number and resets 581 * the locking protocol version 582 */ 583 ocfs2_control_this_node = -1; 584 running_proto.pv_major = 0; 585 running_proto.pv_minor = 0; 586 } 587 588 out: 589 list_del_init(&p->op_list); 590 file->private_data = NULL; 591 592 mutex_unlock(&ocfs2_control_lock); 593 594 kfree(p); 595 596 return 0; 597 } 598 599 static int ocfs2_control_open(struct inode *inode, struct file *file) 600 { 601 struct ocfs2_control_private *p; 602 603 p = kzalloc(sizeof(struct ocfs2_control_private), GFP_KERNEL); 604 if (!p) 605 return -ENOMEM; 606 p->op_this_node = -1; 607 608 mutex_lock(&ocfs2_control_lock); 609 file->private_data = p; 610 list_add(&p->op_list, &ocfs2_control_private_list); 611 mutex_unlock(&ocfs2_control_lock); 612 613 return 0; 614 } 615 616 static const struct file_operations ocfs2_control_fops = { 617 .open = ocfs2_control_open, 618 .release = ocfs2_control_release, 619 .read = ocfs2_control_read, 620 .write = ocfs2_control_write, 621 .owner = THIS_MODULE, 622 .llseek = default_llseek, 623 }; 624 625 static struct miscdevice ocfs2_control_device = { 626 .minor = MISC_DYNAMIC_MINOR, 627 .name = "ocfs2_control", 628 .fops = &ocfs2_control_fops, 629 }; 630 631 static int ocfs2_control_init(void) 632 { 633 int rc; 634 635 atomic_set(&ocfs2_control_opened, 0); 636 637 rc = misc_register(&ocfs2_control_device); 638 if (rc) 639 printk(KERN_ERR 640 "ocfs2: Unable to register ocfs2_control device " 641 "(errno %d)\n", 642 -rc); 643 644 return rc; 645 } 646 647 static void ocfs2_control_exit(void) 648 { 649 misc_deregister(&ocfs2_control_device); 650 } 651 652 static void fsdlm_lock_ast_wrapper(void *astarg) 653 { 654 struct ocfs2_dlm_lksb *lksb = astarg; 655 int status = lksb->lksb_fsdlm.sb_status; 656 657 /* 658 * For now we're punting on the issue of other non-standard errors 659 * where we can't tell if the unlock_ast or lock_ast should be called. 660 * The main "other error" that's possible is EINVAL which means the 661 * function was called with invalid args, which shouldn't be possible 662 * since the caller here is under our control. Other non-standard 663 * errors probably fall into the same category, or otherwise are fatal 664 * which means we can't carry on anyway. 665 */ 666 667 if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL) 668 lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0); 669 else 670 lksb->lksb_conn->cc_proto->lp_lock_ast(lksb); 671 } 672 673 static void fsdlm_blocking_ast_wrapper(void *astarg, int level) 674 { 675 struct ocfs2_dlm_lksb *lksb = astarg; 676 677 lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level); 678 } 679 680 static int user_dlm_lock(struct ocfs2_cluster_connection *conn, 681 int mode, 682 struct ocfs2_dlm_lksb *lksb, 683 u32 flags, 684 void *name, 685 unsigned int namelen) 686 { 687 if (!lksb->lksb_fsdlm.sb_lvbptr) 688 lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb + 689 sizeof(struct dlm_lksb); 690 691 return dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm, 692 flags|DLM_LKF_NODLCKWT, name, namelen, 0, 693 fsdlm_lock_ast_wrapper, lksb, 694 fsdlm_blocking_ast_wrapper); 695 } 696 697 static int user_dlm_unlock(struct ocfs2_cluster_connection *conn, 698 struct ocfs2_dlm_lksb *lksb, 699 u32 flags) 700 { 701 return dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid, 702 flags, &lksb->lksb_fsdlm, lksb); 703 } 704 705 static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb) 706 { 707 return lksb->lksb_fsdlm.sb_status; 708 } 709 710 static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb) 711 { 712 int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID; 713 714 return !invalid; 715 } 716 717 static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb) 718 { 719 if (!lksb->lksb_fsdlm.sb_lvbptr) 720 lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb + 721 sizeof(struct dlm_lksb); 722 return (void *)(lksb->lksb_fsdlm.sb_lvbptr); 723 } 724 725 static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb) 726 { 727 } 728 729 static int user_plock(struct ocfs2_cluster_connection *conn, 730 u64 ino, 731 struct file *file, 732 int cmd, 733 struct file_lock *fl) 734 { 735 /* 736 * This more or less just demuxes the plock request into any 737 * one of three dlm calls. 738 * 739 * Internally, fs/dlm will pass these to a misc device, which 740 * a userspace daemon will read and write to. 741 * 742 * For now, cancel requests (which happen internally only), 743 * are turned into unlocks. Most of this function taken from 744 * gfs2_lock. 745 */ 746 747 if (cmd == F_CANCELLK) { 748 cmd = F_SETLK; 749 fl->fl_type = F_UNLCK; 750 } 751 752 if (IS_GETLK(cmd)) 753 return dlm_posix_get(conn->cc_lockspace, ino, file, fl); 754 else if (fl->fl_type == F_UNLCK) 755 return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl); 756 else 757 return dlm_posix_lock(conn->cc_lockspace, ino, file, cmd, fl); 758 } 759 760 /* 761 * Compare a requested locking protocol version against the current one. 762 * 763 * If the major numbers are different, they are incompatible. 764 * If the current minor is greater than the request, they are incompatible. 765 * If the current minor is less than or equal to the request, they are 766 * compatible, and the requester should run at the current minor version. 767 */ 768 static int fs_protocol_compare(struct ocfs2_protocol_version *existing, 769 struct ocfs2_protocol_version *request) 770 { 771 if (existing->pv_major != request->pv_major) 772 return 1; 773 774 if (existing->pv_minor > request->pv_minor) 775 return 1; 776 777 if (existing->pv_minor < request->pv_minor) 778 request->pv_minor = existing->pv_minor; 779 780 return 0; 781 } 782 783 static void lvb_to_version(char *lvb, struct ocfs2_protocol_version *ver) 784 { 785 struct ocfs2_protocol_version *pv = 786 (struct ocfs2_protocol_version *)lvb; 787 /* 788 * ocfs2_protocol_version has two u8 variables, so we don't 789 * need any endian conversion. 790 */ 791 ver->pv_major = pv->pv_major; 792 ver->pv_minor = pv->pv_minor; 793 } 794 795 static void version_to_lvb(struct ocfs2_protocol_version *ver, char *lvb) 796 { 797 struct ocfs2_protocol_version *pv = 798 (struct ocfs2_protocol_version *)lvb; 799 /* 800 * ocfs2_protocol_version has two u8 variables, so we don't 801 * need any endian conversion. 802 */ 803 pv->pv_major = ver->pv_major; 804 pv->pv_minor = ver->pv_minor; 805 } 806 807 static void sync_wait_cb(void *arg) 808 { 809 struct ocfs2_cluster_connection *conn = arg; 810 struct ocfs2_live_connection *lc = conn->cc_private; 811 complete(&lc->oc_sync_wait); 812 } 813 814 static int sync_unlock(struct ocfs2_cluster_connection *conn, 815 struct dlm_lksb *lksb, char *name) 816 { 817 int error; 818 struct ocfs2_live_connection *lc = conn->cc_private; 819 820 error = dlm_unlock(conn->cc_lockspace, lksb->sb_lkid, 0, lksb, conn); 821 if (error) { 822 printk(KERN_ERR "%s lkid %x error %d\n", 823 name, lksb->sb_lkid, error); 824 return error; 825 } 826 827 wait_for_completion(&lc->oc_sync_wait); 828 829 if (lksb->sb_status != -DLM_EUNLOCK) { 830 printk(KERN_ERR "%s lkid %x status %d\n", 831 name, lksb->sb_lkid, lksb->sb_status); 832 return -1; 833 } 834 return 0; 835 } 836 837 static int sync_lock(struct ocfs2_cluster_connection *conn, 838 int mode, uint32_t flags, 839 struct dlm_lksb *lksb, char *name) 840 { 841 int error, status; 842 struct ocfs2_live_connection *lc = conn->cc_private; 843 844 error = dlm_lock(conn->cc_lockspace, mode, lksb, flags, 845 name, strlen(name), 846 0, sync_wait_cb, conn, NULL); 847 if (error) { 848 printk(KERN_ERR "%s lkid %x flags %x mode %d error %d\n", 849 name, lksb->sb_lkid, flags, mode, error); 850 return error; 851 } 852 853 wait_for_completion(&lc->oc_sync_wait); 854 855 status = lksb->sb_status; 856 857 if (status && status != -EAGAIN) { 858 printk(KERN_ERR "%s lkid %x flags %x mode %d status %d\n", 859 name, lksb->sb_lkid, flags, mode, status); 860 } 861 862 return status; 863 } 864 865 866 static int version_lock(struct ocfs2_cluster_connection *conn, int mode, 867 int flags) 868 { 869 struct ocfs2_live_connection *lc = conn->cc_private; 870 return sync_lock(conn, mode, flags, 871 &lc->oc_version_lksb, VERSION_LOCK); 872 } 873 874 static int version_unlock(struct ocfs2_cluster_connection *conn) 875 { 876 struct ocfs2_live_connection *lc = conn->cc_private; 877 return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK); 878 } 879 880 /* get_protocol_version() 881 * 882 * To exchange ocfs2 versioning, we use the LVB of the version dlm lock. 883 * The algorithm is: 884 * 1. Attempt to take the lock in EX mode (non-blocking). 885 * 2. If successful (which means it is the first mount), write the 886 * version number and downconvert to PR lock. 887 * 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after 888 * taking the PR lock. 889 */ 890 891 static int get_protocol_version(struct ocfs2_cluster_connection *conn) 892 { 893 int ret; 894 struct ocfs2_live_connection *lc = conn->cc_private; 895 struct ocfs2_protocol_version pv; 896 897 running_proto.pv_major = 898 ocfs2_user_plugin.sp_max_proto.pv_major; 899 running_proto.pv_minor = 900 ocfs2_user_plugin.sp_max_proto.pv_minor; 901 902 lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb; 903 ret = version_lock(conn, DLM_LOCK_EX, 904 DLM_LKF_VALBLK|DLM_LKF_NOQUEUE); 905 if (!ret) { 906 conn->cc_version.pv_major = running_proto.pv_major; 907 conn->cc_version.pv_minor = running_proto.pv_minor; 908 version_to_lvb(&running_proto, lc->oc_lvb); 909 version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK); 910 } else if (ret == -EAGAIN) { 911 ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK); 912 if (ret) 913 goto out; 914 lvb_to_version(lc->oc_lvb, &pv); 915 916 if ((pv.pv_major != running_proto.pv_major) || 917 (pv.pv_minor > running_proto.pv_minor)) { 918 ret = -EINVAL; 919 goto out; 920 } 921 922 conn->cc_version.pv_major = pv.pv_major; 923 conn->cc_version.pv_minor = pv.pv_minor; 924 } 925 out: 926 return ret; 927 } 928 929 static void user_recover_prep(void *arg) 930 { 931 } 932 933 static void user_recover_slot(void *arg, struct dlm_slot *slot) 934 { 935 struct ocfs2_cluster_connection *conn = arg; 936 printk(KERN_INFO "ocfs2: Node %d/%d down. Initiating recovery.\n", 937 slot->nodeid, slot->slot); 938 conn->cc_recovery_handler(slot->nodeid, conn->cc_recovery_data); 939 940 } 941 942 static void user_recover_done(void *arg, struct dlm_slot *slots, 943 int num_slots, int our_slot, 944 uint32_t generation) 945 { 946 struct ocfs2_cluster_connection *conn = arg; 947 struct ocfs2_live_connection *lc = conn->cc_private; 948 int i; 949 950 for (i = 0; i < num_slots; i++) 951 if (slots[i].slot == our_slot) { 952 atomic_set(&lc->oc_this_node, slots[i].nodeid); 953 break; 954 } 955 956 lc->oc_our_slot = our_slot; 957 wake_up(&lc->oc_wait); 958 } 959 960 static const struct dlm_lockspace_ops ocfs2_ls_ops = { 961 .recover_prep = user_recover_prep, 962 .recover_slot = user_recover_slot, 963 .recover_done = user_recover_done, 964 }; 965 966 static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn) 967 { 968 version_unlock(conn); 969 dlm_release_lockspace(conn->cc_lockspace, 2); 970 conn->cc_lockspace = NULL; 971 ocfs2_live_connection_drop(conn->cc_private); 972 conn->cc_private = NULL; 973 return 0; 974 } 975 976 static int user_cluster_connect(struct ocfs2_cluster_connection *conn) 977 { 978 dlm_lockspace_t *fsdlm; 979 struct ocfs2_live_connection *lc; 980 int rc, ops_rv; 981 982 BUG_ON(conn == NULL); 983 984 lc = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL); 985 if (!lc) 986 return -ENOMEM; 987 988 init_waitqueue_head(&lc->oc_wait); 989 init_completion(&lc->oc_sync_wait); 990 atomic_set(&lc->oc_this_node, 0); 991 conn->cc_private = lc; 992 lc->oc_type = NO_CONTROLD; 993 994 rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name, 995 DLM_LSFL_NEWEXCL, DLM_LVB_LEN, 996 &ocfs2_ls_ops, conn, &ops_rv, &fsdlm); 997 if (rc) { 998 if (rc == -EEXIST || rc == -EPROTO) 999 printk(KERN_ERR "ocfs2: Unable to create the " 1000 "lockspace %s (%d), because a ocfs2-tools " 1001 "program is running on this file system " 1002 "with the same name lockspace\n", 1003 conn->cc_name, rc); 1004 goto out; 1005 } 1006 1007 if (ops_rv == -EOPNOTSUPP) { 1008 lc->oc_type = WITH_CONTROLD; 1009 printk(KERN_NOTICE "ocfs2: You seem to be using an older " 1010 "version of dlm_controld and/or ocfs2-tools." 1011 " Please consider upgrading.\n"); 1012 } else if (ops_rv) { 1013 rc = ops_rv; 1014 goto out; 1015 } 1016 conn->cc_lockspace = fsdlm; 1017 1018 rc = ocfs2_live_connection_attach(conn, lc); 1019 if (rc) 1020 goto out; 1021 1022 if (lc->oc_type == NO_CONTROLD) { 1023 rc = get_protocol_version(conn); 1024 if (rc) { 1025 printk(KERN_ERR "ocfs2: Could not determine" 1026 " locking version\n"); 1027 user_cluster_disconnect(conn); 1028 goto out; 1029 } 1030 wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0)); 1031 } 1032 1033 /* 1034 * running_proto must have been set before we allowed any mounts 1035 * to proceed. 1036 */ 1037 if (fs_protocol_compare(&running_proto, &conn->cc_version)) { 1038 printk(KERN_ERR 1039 "Unable to mount with fs locking protocol version " 1040 "%u.%u because negotiated protocol is %u.%u\n", 1041 conn->cc_version.pv_major, conn->cc_version.pv_minor, 1042 running_proto.pv_major, running_proto.pv_minor); 1043 rc = -EPROTO; 1044 ocfs2_live_connection_drop(lc); 1045 lc = NULL; 1046 } 1047 1048 out: 1049 if (rc) 1050 kfree(lc); 1051 return rc; 1052 } 1053 1054 1055 static int user_cluster_this_node(struct ocfs2_cluster_connection *conn, 1056 unsigned int *this_node) 1057 { 1058 int rc; 1059 struct ocfs2_live_connection *lc = conn->cc_private; 1060 1061 if (lc->oc_type == WITH_CONTROLD) 1062 rc = ocfs2_control_get_this_node(); 1063 else if (lc->oc_type == NO_CONTROLD) 1064 rc = atomic_read(&lc->oc_this_node); 1065 else 1066 rc = -EINVAL; 1067 1068 if (rc < 0) 1069 return rc; 1070 1071 *this_node = rc; 1072 return 0; 1073 } 1074 1075 static struct ocfs2_stack_operations ocfs2_user_plugin_ops = { 1076 .connect = user_cluster_connect, 1077 .disconnect = user_cluster_disconnect, 1078 .this_node = user_cluster_this_node, 1079 .dlm_lock = user_dlm_lock, 1080 .dlm_unlock = user_dlm_unlock, 1081 .lock_status = user_dlm_lock_status, 1082 .lvb_valid = user_dlm_lvb_valid, 1083 .lock_lvb = user_dlm_lvb, 1084 .plock = user_plock, 1085 .dump_lksb = user_dlm_dump_lksb, 1086 }; 1087 1088 static struct ocfs2_stack_plugin ocfs2_user_plugin = { 1089 .sp_name = "user", 1090 .sp_ops = &ocfs2_user_plugin_ops, 1091 .sp_owner = THIS_MODULE, 1092 }; 1093 1094 1095 static int __init ocfs2_user_plugin_init(void) 1096 { 1097 int rc; 1098 1099 rc = ocfs2_control_init(); 1100 if (!rc) { 1101 rc = ocfs2_stack_glue_register(&ocfs2_user_plugin); 1102 if (rc) 1103 ocfs2_control_exit(); 1104 } 1105 1106 return rc; 1107 } 1108 1109 static void __exit ocfs2_user_plugin_exit(void) 1110 { 1111 ocfs2_stack_glue_unregister(&ocfs2_user_plugin); 1112 ocfs2_control_exit(); 1113 } 1114 1115 MODULE_AUTHOR("Oracle"); 1116 MODULE_DESCRIPTION("ocfs2 driver for userspace cluster stacks"); 1117 MODULE_LICENSE("GPL"); 1118 module_init(ocfs2_user_plugin_init); 1119 module_exit(ocfs2_user_plugin_exit); 1120