1 // SPDX-License-Identifier: GPL-2.0-only 2 /* -*- mode: c; c-basic-offset: 8; -*- 3 * vim: noexpandtab sw=8 ts=8 sts=0: 4 * 5 * stack_user.c 6 * 7 * Code which interfaces ocfs2 with fs/dlm and a userspace stack. 8 * 9 * Copyright (C) 2007 Oracle. All rights reserved. 10 */ 11 12 #include <linux/module.h> 13 #include <linux/fs.h> 14 #include <linux/miscdevice.h> 15 #include <linux/mutex.h> 16 #include <linux/slab.h> 17 #include <linux/reboot.h> 18 #include <linux/sched.h> 19 #include <linux/uaccess.h> 20 21 #include "stackglue.h" 22 23 #include <linux/dlm_plock.h> 24 25 /* 26 * The control protocol starts with a handshake. Until the handshake 27 * is complete, the control device will fail all write(2)s. 28 * 29 * The handshake is simple. First, the client reads until EOF. Each line 30 * of output is a supported protocol tag. All protocol tags are a single 31 * character followed by a two hex digit version number. Currently the 32 * only things supported is T01, for "Text-base version 0x01". Next, the 33 * client writes the version they would like to use, including the newline. 34 * Thus, the protocol tag is 'T01\n'. If the version tag written is 35 * unknown, -EINVAL is returned. Once the negotiation is complete, the 36 * client can start sending messages. 37 * 38 * The T01 protocol has three messages. First is the "SETN" message. 39 * It has the following syntax: 40 * 41 * SETN<space><8-char-hex-nodenum><newline> 42 * 43 * This is 14 characters. 44 * 45 * The "SETN" message must be the first message following the protocol. 46 * It tells ocfs2_control the local node number. 47 * 48 * Next comes the "SETV" message. It has the following syntax: 49 * 50 * SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> 51 * 52 * This is 11 characters. 53 * 54 * The "SETV" message sets the filesystem locking protocol version as 55 * negotiated by the client. The client negotiates based on the maximum 56 * version advertised in /sys/fs/ocfs2/max_locking_protocol. The major 57 * number from the "SETV" message must match 58 * ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number 59 * must be less than or equal to ...sp_max_version.pv_minor. 60 * 61 * Once this information has been set, mounts will be allowed. From this 62 * point on, the "DOWN" message can be sent for node down notification. 63 * It has the following syntax: 64 * 65 * DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> 66 * 67 * eg: 68 * 69 * DOWN 632A924FDD844190BDA93C0DF6B94899 00000001\n 70 * 71 * This is 47 characters. 72 */ 73 74 /* 75 * Whether or not the client has done the handshake. 76 * For now, we have just one protocol version. 77 */ 78 #define OCFS2_CONTROL_PROTO "T01\n" 79 #define OCFS2_CONTROL_PROTO_LEN 4 80 81 /* Handshake states */ 82 #define OCFS2_CONTROL_HANDSHAKE_INVALID (0) 83 #define OCFS2_CONTROL_HANDSHAKE_READ (1) 84 #define OCFS2_CONTROL_HANDSHAKE_PROTOCOL (2) 85 #define OCFS2_CONTROL_HANDSHAKE_VALID (3) 86 87 /* Messages */ 88 #define OCFS2_CONTROL_MESSAGE_OP_LEN 4 89 #define OCFS2_CONTROL_MESSAGE_SETNODE_OP "SETN" 90 #define OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN 14 91 #define OCFS2_CONTROL_MESSAGE_SETVERSION_OP "SETV" 92 #define OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN 11 93 #define OCFS2_CONTROL_MESSAGE_DOWN_OP "DOWN" 94 #define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN 47 95 #define OCFS2_TEXT_UUID_LEN 32 96 #define OCFS2_CONTROL_MESSAGE_VERNUM_LEN 2 97 #define OCFS2_CONTROL_MESSAGE_NODENUM_LEN 8 98 #define VERSION_LOCK "version_lock" 99 100 enum ocfs2_connection_type { 101 WITH_CONTROLD, 102 NO_CONTROLD 103 }; 104 105 /* 106 * ocfs2_live_connection is refcounted because the filesystem and 107 * miscdevice sides can detach in different order. Let's just be safe. 108 */ 109 struct ocfs2_live_connection { 110 struct list_head oc_list; 111 struct ocfs2_cluster_connection *oc_conn; 112 enum ocfs2_connection_type oc_type; 113 atomic_t oc_this_node; 114 int oc_our_slot; 115 struct dlm_lksb oc_version_lksb; 116 char oc_lvb[DLM_LVB_LEN]; 117 struct completion oc_sync_wait; 118 wait_queue_head_t oc_wait; 119 }; 120 121 struct ocfs2_control_private { 122 struct list_head op_list; 123 int op_state; 124 int op_this_node; 125 struct ocfs2_protocol_version op_proto; 126 }; 127 128 /* SETN<space><8-char-hex-nodenum><newline> */ 129 struct ocfs2_control_message_setn { 130 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 131 char space; 132 char nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN]; 133 char newline; 134 }; 135 136 /* SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> */ 137 struct ocfs2_control_message_setv { 138 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 139 char space1; 140 char major[OCFS2_CONTROL_MESSAGE_VERNUM_LEN]; 141 char space2; 142 char minor[OCFS2_CONTROL_MESSAGE_VERNUM_LEN]; 143 char newline; 144 }; 145 146 /* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */ 147 struct ocfs2_control_message_down { 148 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 149 char space1; 150 char uuid[OCFS2_TEXT_UUID_LEN]; 151 char space2; 152 char nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN]; 153 char newline; 154 }; 155 156 union ocfs2_control_message { 157 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 158 struct ocfs2_control_message_setn u_setn; 159 struct ocfs2_control_message_setv u_setv; 160 struct ocfs2_control_message_down u_down; 161 }; 162 163 static struct ocfs2_stack_plugin ocfs2_user_plugin; 164 165 static atomic_t ocfs2_control_opened; 166 static int ocfs2_control_this_node = -1; 167 static struct ocfs2_protocol_version running_proto; 168 169 static LIST_HEAD(ocfs2_live_connection_list); 170 static LIST_HEAD(ocfs2_control_private_list); 171 static DEFINE_MUTEX(ocfs2_control_lock); 172 173 static inline void ocfs2_control_set_handshake_state(struct file *file, 174 int state) 175 { 176 struct ocfs2_control_private *p = file->private_data; 177 p->op_state = state; 178 } 179 180 static inline int ocfs2_control_get_handshake_state(struct file *file) 181 { 182 struct ocfs2_control_private *p = file->private_data; 183 return p->op_state; 184 } 185 186 static struct ocfs2_live_connection *ocfs2_connection_find(const char *name) 187 { 188 size_t len = strlen(name); 189 struct ocfs2_live_connection *c; 190 191 BUG_ON(!mutex_is_locked(&ocfs2_control_lock)); 192 193 list_for_each_entry(c, &ocfs2_live_connection_list, oc_list) { 194 if ((c->oc_conn->cc_namelen == len) && 195 !strncmp(c->oc_conn->cc_name, name, len)) 196 return c; 197 } 198 199 return NULL; 200 } 201 202 /* 203 * ocfs2_live_connection structures are created underneath the ocfs2 204 * mount path. Since the VFS prevents multiple calls to 205 * fill_super(), we can't get dupes here. 206 */ 207 static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn, 208 struct ocfs2_live_connection *c) 209 { 210 int rc = 0; 211 212 mutex_lock(&ocfs2_control_lock); 213 c->oc_conn = conn; 214 215 if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened)) 216 list_add(&c->oc_list, &ocfs2_live_connection_list); 217 else { 218 printk(KERN_ERR 219 "ocfs2: Userspace control daemon is not present\n"); 220 rc = -ESRCH; 221 } 222 223 mutex_unlock(&ocfs2_control_lock); 224 return rc; 225 } 226 227 /* 228 * This function disconnects the cluster connection from ocfs2_control. 229 * Afterwards, userspace can't affect the cluster connection. 230 */ 231 static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c) 232 { 233 mutex_lock(&ocfs2_control_lock); 234 list_del_init(&c->oc_list); 235 c->oc_conn = NULL; 236 mutex_unlock(&ocfs2_control_lock); 237 238 kfree(c); 239 } 240 241 static int ocfs2_control_cfu(void *target, size_t target_len, 242 const char __user *buf, size_t count) 243 { 244 /* The T01 expects write(2) calls to have exactly one command */ 245 if ((count != target_len) || 246 (count > sizeof(union ocfs2_control_message))) 247 return -EINVAL; 248 249 if (copy_from_user(target, buf, target_len)) 250 return -EFAULT; 251 252 return 0; 253 } 254 255 static ssize_t ocfs2_control_validate_protocol(struct file *file, 256 const char __user *buf, 257 size_t count) 258 { 259 ssize_t ret; 260 char kbuf[OCFS2_CONTROL_PROTO_LEN]; 261 262 ret = ocfs2_control_cfu(kbuf, OCFS2_CONTROL_PROTO_LEN, 263 buf, count); 264 if (ret) 265 return ret; 266 267 if (strncmp(kbuf, OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN)) 268 return -EINVAL; 269 270 ocfs2_control_set_handshake_state(file, 271 OCFS2_CONTROL_HANDSHAKE_PROTOCOL); 272 273 return count; 274 } 275 276 static void ocfs2_control_send_down(const char *uuid, 277 int nodenum) 278 { 279 struct ocfs2_live_connection *c; 280 281 mutex_lock(&ocfs2_control_lock); 282 283 c = ocfs2_connection_find(uuid); 284 if (c) { 285 BUG_ON(c->oc_conn == NULL); 286 c->oc_conn->cc_recovery_handler(nodenum, 287 c->oc_conn->cc_recovery_data); 288 } 289 290 mutex_unlock(&ocfs2_control_lock); 291 } 292 293 /* 294 * Called whenever configuration elements are sent to /dev/ocfs2_control. 295 * If all configuration elements are present, try to set the global 296 * values. If there is a problem, return an error. Skip any missing 297 * elements, and only bump ocfs2_control_opened when we have all elements 298 * and are successful. 299 */ 300 static int ocfs2_control_install_private(struct file *file) 301 { 302 int rc = 0; 303 int set_p = 1; 304 struct ocfs2_control_private *p = file->private_data; 305 306 BUG_ON(p->op_state != OCFS2_CONTROL_HANDSHAKE_PROTOCOL); 307 308 mutex_lock(&ocfs2_control_lock); 309 310 if (p->op_this_node < 0) { 311 set_p = 0; 312 } else if ((ocfs2_control_this_node >= 0) && 313 (ocfs2_control_this_node != p->op_this_node)) { 314 rc = -EINVAL; 315 goto out_unlock; 316 } 317 318 if (!p->op_proto.pv_major) { 319 set_p = 0; 320 } else if (!list_empty(&ocfs2_live_connection_list) && 321 ((running_proto.pv_major != p->op_proto.pv_major) || 322 (running_proto.pv_minor != p->op_proto.pv_minor))) { 323 rc = -EINVAL; 324 goto out_unlock; 325 } 326 327 if (set_p) { 328 ocfs2_control_this_node = p->op_this_node; 329 running_proto.pv_major = p->op_proto.pv_major; 330 running_proto.pv_minor = p->op_proto.pv_minor; 331 } 332 333 out_unlock: 334 mutex_unlock(&ocfs2_control_lock); 335 336 if (!rc && set_p) { 337 /* We set the global values successfully */ 338 atomic_inc(&ocfs2_control_opened); 339 ocfs2_control_set_handshake_state(file, 340 OCFS2_CONTROL_HANDSHAKE_VALID); 341 } 342 343 return rc; 344 } 345 346 static int ocfs2_control_get_this_node(void) 347 { 348 int rc; 349 350 mutex_lock(&ocfs2_control_lock); 351 if (ocfs2_control_this_node < 0) 352 rc = -EINVAL; 353 else 354 rc = ocfs2_control_this_node; 355 mutex_unlock(&ocfs2_control_lock); 356 357 return rc; 358 } 359 360 static int ocfs2_control_do_setnode_msg(struct file *file, 361 struct ocfs2_control_message_setn *msg) 362 { 363 long nodenum; 364 char *ptr = NULL; 365 struct ocfs2_control_private *p = file->private_data; 366 367 if (ocfs2_control_get_handshake_state(file) != 368 OCFS2_CONTROL_HANDSHAKE_PROTOCOL) 369 return -EINVAL; 370 371 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP, 372 OCFS2_CONTROL_MESSAGE_OP_LEN)) 373 return -EINVAL; 374 375 if ((msg->space != ' ') || (msg->newline != '\n')) 376 return -EINVAL; 377 msg->space = msg->newline = '\0'; 378 379 nodenum = simple_strtol(msg->nodestr, &ptr, 16); 380 if (!ptr || *ptr) 381 return -EINVAL; 382 383 if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) || 384 (nodenum > INT_MAX) || (nodenum < 0)) 385 return -ERANGE; 386 p->op_this_node = nodenum; 387 388 return ocfs2_control_install_private(file); 389 } 390 391 static int ocfs2_control_do_setversion_msg(struct file *file, 392 struct ocfs2_control_message_setv *msg) 393 { 394 long major, minor; 395 char *ptr = NULL; 396 struct ocfs2_control_private *p = file->private_data; 397 struct ocfs2_protocol_version *max = 398 &ocfs2_user_plugin.sp_max_proto; 399 400 if (ocfs2_control_get_handshake_state(file) != 401 OCFS2_CONTROL_HANDSHAKE_PROTOCOL) 402 return -EINVAL; 403 404 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP, 405 OCFS2_CONTROL_MESSAGE_OP_LEN)) 406 return -EINVAL; 407 408 if ((msg->space1 != ' ') || (msg->space2 != ' ') || 409 (msg->newline != '\n')) 410 return -EINVAL; 411 msg->space1 = msg->space2 = msg->newline = '\0'; 412 413 major = simple_strtol(msg->major, &ptr, 16); 414 if (!ptr || *ptr) 415 return -EINVAL; 416 minor = simple_strtol(msg->minor, &ptr, 16); 417 if (!ptr || *ptr) 418 return -EINVAL; 419 420 /* 421 * The major must be between 1 and 255, inclusive. The minor 422 * must be between 0 and 255, inclusive. The version passed in 423 * must be within the maximum version supported by the filesystem. 424 */ 425 if ((major == LONG_MIN) || (major == LONG_MAX) || 426 (major > (u8)-1) || (major < 1)) 427 return -ERANGE; 428 if ((minor == LONG_MIN) || (minor == LONG_MAX) || 429 (minor > (u8)-1) || (minor < 0)) 430 return -ERANGE; 431 if ((major != max->pv_major) || 432 (minor > max->pv_minor)) 433 return -EINVAL; 434 435 p->op_proto.pv_major = major; 436 p->op_proto.pv_minor = minor; 437 438 return ocfs2_control_install_private(file); 439 } 440 441 static int ocfs2_control_do_down_msg(struct file *file, 442 struct ocfs2_control_message_down *msg) 443 { 444 long nodenum; 445 char *p = NULL; 446 447 if (ocfs2_control_get_handshake_state(file) != 448 OCFS2_CONTROL_HANDSHAKE_VALID) 449 return -EINVAL; 450 451 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_DOWN_OP, 452 OCFS2_CONTROL_MESSAGE_OP_LEN)) 453 return -EINVAL; 454 455 if ((msg->space1 != ' ') || (msg->space2 != ' ') || 456 (msg->newline != '\n')) 457 return -EINVAL; 458 msg->space1 = msg->space2 = msg->newline = '\0'; 459 460 nodenum = simple_strtol(msg->nodestr, &p, 16); 461 if (!p || *p) 462 return -EINVAL; 463 464 if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) || 465 (nodenum > INT_MAX) || (nodenum < 0)) 466 return -ERANGE; 467 468 ocfs2_control_send_down(msg->uuid, nodenum); 469 470 return 0; 471 } 472 473 static ssize_t ocfs2_control_message(struct file *file, 474 const char __user *buf, 475 size_t count) 476 { 477 ssize_t ret; 478 union ocfs2_control_message msg; 479 480 /* Try to catch padding issues */ 481 WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) != 482 (sizeof(msg.u_down.tag) + sizeof(msg.u_down.space1))); 483 484 memset(&msg, 0, sizeof(union ocfs2_control_message)); 485 ret = ocfs2_control_cfu(&msg, count, buf, count); 486 if (ret) 487 goto out; 488 489 if ((count == OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN) && 490 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP, 491 OCFS2_CONTROL_MESSAGE_OP_LEN)) 492 ret = ocfs2_control_do_setnode_msg(file, &msg.u_setn); 493 else if ((count == OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN) && 494 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP, 495 OCFS2_CONTROL_MESSAGE_OP_LEN)) 496 ret = ocfs2_control_do_setversion_msg(file, &msg.u_setv); 497 else if ((count == OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN) && 498 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN_OP, 499 OCFS2_CONTROL_MESSAGE_OP_LEN)) 500 ret = ocfs2_control_do_down_msg(file, &msg.u_down); 501 else 502 ret = -EINVAL; 503 504 out: 505 return ret ? ret : count; 506 } 507 508 static ssize_t ocfs2_control_write(struct file *file, 509 const char __user *buf, 510 size_t count, 511 loff_t *ppos) 512 { 513 ssize_t ret; 514 515 switch (ocfs2_control_get_handshake_state(file)) { 516 case OCFS2_CONTROL_HANDSHAKE_INVALID: 517 ret = -EINVAL; 518 break; 519 520 case OCFS2_CONTROL_HANDSHAKE_READ: 521 ret = ocfs2_control_validate_protocol(file, buf, 522 count); 523 break; 524 525 case OCFS2_CONTROL_HANDSHAKE_PROTOCOL: 526 case OCFS2_CONTROL_HANDSHAKE_VALID: 527 ret = ocfs2_control_message(file, buf, count); 528 break; 529 530 default: 531 BUG(); 532 ret = -EIO; 533 break; 534 } 535 536 return ret; 537 } 538 539 /* 540 * This is a naive version. If we ever have a new protocol, we'll expand 541 * it. Probably using seq_file. 542 */ 543 static ssize_t ocfs2_control_read(struct file *file, 544 char __user *buf, 545 size_t count, 546 loff_t *ppos) 547 { 548 ssize_t ret; 549 550 ret = simple_read_from_buffer(buf, count, ppos, 551 OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN); 552 553 /* Have we read the whole protocol list? */ 554 if (ret > 0 && *ppos >= OCFS2_CONTROL_PROTO_LEN) 555 ocfs2_control_set_handshake_state(file, 556 OCFS2_CONTROL_HANDSHAKE_READ); 557 558 return ret; 559 } 560 561 static int ocfs2_control_release(struct inode *inode, struct file *file) 562 { 563 struct ocfs2_control_private *p = file->private_data; 564 565 mutex_lock(&ocfs2_control_lock); 566 567 if (ocfs2_control_get_handshake_state(file) != 568 OCFS2_CONTROL_HANDSHAKE_VALID) 569 goto out; 570 571 if (atomic_dec_and_test(&ocfs2_control_opened)) { 572 if (!list_empty(&ocfs2_live_connection_list)) { 573 /* XXX: Do bad things! */ 574 printk(KERN_ERR 575 "ocfs2: Unexpected release of ocfs2_control!\n" 576 " Loss of cluster connection requires " 577 "an emergency restart!\n"); 578 emergency_restart(); 579 } 580 /* 581 * Last valid close clears the node number and resets 582 * the locking protocol version 583 */ 584 ocfs2_control_this_node = -1; 585 running_proto.pv_major = 0; 586 running_proto.pv_minor = 0; 587 } 588 589 out: 590 list_del_init(&p->op_list); 591 file->private_data = NULL; 592 593 mutex_unlock(&ocfs2_control_lock); 594 595 kfree(p); 596 597 return 0; 598 } 599 600 static int ocfs2_control_open(struct inode *inode, struct file *file) 601 { 602 struct ocfs2_control_private *p; 603 604 p = kzalloc(sizeof(struct ocfs2_control_private), GFP_KERNEL); 605 if (!p) 606 return -ENOMEM; 607 p->op_this_node = -1; 608 609 mutex_lock(&ocfs2_control_lock); 610 file->private_data = p; 611 list_add(&p->op_list, &ocfs2_control_private_list); 612 mutex_unlock(&ocfs2_control_lock); 613 614 return 0; 615 } 616 617 static const struct file_operations ocfs2_control_fops = { 618 .open = ocfs2_control_open, 619 .release = ocfs2_control_release, 620 .read = ocfs2_control_read, 621 .write = ocfs2_control_write, 622 .owner = THIS_MODULE, 623 .llseek = default_llseek, 624 }; 625 626 static struct miscdevice ocfs2_control_device = { 627 .minor = MISC_DYNAMIC_MINOR, 628 .name = "ocfs2_control", 629 .fops = &ocfs2_control_fops, 630 }; 631 632 static int ocfs2_control_init(void) 633 { 634 int rc; 635 636 atomic_set(&ocfs2_control_opened, 0); 637 638 rc = misc_register(&ocfs2_control_device); 639 if (rc) 640 printk(KERN_ERR 641 "ocfs2: Unable to register ocfs2_control device " 642 "(errno %d)\n", 643 -rc); 644 645 return rc; 646 } 647 648 static void ocfs2_control_exit(void) 649 { 650 misc_deregister(&ocfs2_control_device); 651 } 652 653 static void fsdlm_lock_ast_wrapper(void *astarg) 654 { 655 struct ocfs2_dlm_lksb *lksb = astarg; 656 int status = lksb->lksb_fsdlm.sb_status; 657 658 /* 659 * For now we're punting on the issue of other non-standard errors 660 * where we can't tell if the unlock_ast or lock_ast should be called. 661 * The main "other error" that's possible is EINVAL which means the 662 * function was called with invalid args, which shouldn't be possible 663 * since the caller here is under our control. Other non-standard 664 * errors probably fall into the same category, or otherwise are fatal 665 * which means we can't carry on anyway. 666 */ 667 668 if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL) 669 lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0); 670 else 671 lksb->lksb_conn->cc_proto->lp_lock_ast(lksb); 672 } 673 674 static void fsdlm_blocking_ast_wrapper(void *astarg, int level) 675 { 676 struct ocfs2_dlm_lksb *lksb = astarg; 677 678 lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level); 679 } 680 681 static int user_dlm_lock(struct ocfs2_cluster_connection *conn, 682 int mode, 683 struct ocfs2_dlm_lksb *lksb, 684 u32 flags, 685 void *name, 686 unsigned int namelen) 687 { 688 int ret; 689 690 if (!lksb->lksb_fsdlm.sb_lvbptr) 691 lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb + 692 sizeof(struct dlm_lksb); 693 694 ret = dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm, 695 flags|DLM_LKF_NODLCKWT, name, namelen, 0, 696 fsdlm_lock_ast_wrapper, lksb, 697 fsdlm_blocking_ast_wrapper); 698 return ret; 699 } 700 701 static int user_dlm_unlock(struct ocfs2_cluster_connection *conn, 702 struct ocfs2_dlm_lksb *lksb, 703 u32 flags) 704 { 705 int ret; 706 707 ret = dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid, 708 flags, &lksb->lksb_fsdlm, lksb); 709 return ret; 710 } 711 712 static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb) 713 { 714 return lksb->lksb_fsdlm.sb_status; 715 } 716 717 static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb) 718 { 719 int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID; 720 721 return !invalid; 722 } 723 724 static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb) 725 { 726 if (!lksb->lksb_fsdlm.sb_lvbptr) 727 lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb + 728 sizeof(struct dlm_lksb); 729 return (void *)(lksb->lksb_fsdlm.sb_lvbptr); 730 } 731 732 static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb) 733 { 734 } 735 736 static int user_plock(struct ocfs2_cluster_connection *conn, 737 u64 ino, 738 struct file *file, 739 int cmd, 740 struct file_lock *fl) 741 { 742 /* 743 * This more or less just demuxes the plock request into any 744 * one of three dlm calls. 745 * 746 * Internally, fs/dlm will pass these to a misc device, which 747 * a userspace daemon will read and write to. 748 * 749 * For now, cancel requests (which happen internally only), 750 * are turned into unlocks. Most of this function taken from 751 * gfs2_lock. 752 */ 753 754 if (cmd == F_CANCELLK) { 755 cmd = F_SETLK; 756 fl->fl_type = F_UNLCK; 757 } 758 759 if (IS_GETLK(cmd)) 760 return dlm_posix_get(conn->cc_lockspace, ino, file, fl); 761 else if (fl->fl_type == F_UNLCK) 762 return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl); 763 else 764 return dlm_posix_lock(conn->cc_lockspace, ino, file, cmd, fl); 765 } 766 767 /* 768 * Compare a requested locking protocol version against the current one. 769 * 770 * If the major numbers are different, they are incompatible. 771 * If the current minor is greater than the request, they are incompatible. 772 * If the current minor is less than or equal to the request, they are 773 * compatible, and the requester should run at the current minor version. 774 */ 775 static int fs_protocol_compare(struct ocfs2_protocol_version *existing, 776 struct ocfs2_protocol_version *request) 777 { 778 if (existing->pv_major != request->pv_major) 779 return 1; 780 781 if (existing->pv_minor > request->pv_minor) 782 return 1; 783 784 if (existing->pv_minor < request->pv_minor) 785 request->pv_minor = existing->pv_minor; 786 787 return 0; 788 } 789 790 static void lvb_to_version(char *lvb, struct ocfs2_protocol_version *ver) 791 { 792 struct ocfs2_protocol_version *pv = 793 (struct ocfs2_protocol_version *)lvb; 794 /* 795 * ocfs2_protocol_version has two u8 variables, so we don't 796 * need any endian conversion. 797 */ 798 ver->pv_major = pv->pv_major; 799 ver->pv_minor = pv->pv_minor; 800 } 801 802 static void version_to_lvb(struct ocfs2_protocol_version *ver, char *lvb) 803 { 804 struct ocfs2_protocol_version *pv = 805 (struct ocfs2_protocol_version *)lvb; 806 /* 807 * ocfs2_protocol_version has two u8 variables, so we don't 808 * need any endian conversion. 809 */ 810 pv->pv_major = ver->pv_major; 811 pv->pv_minor = ver->pv_minor; 812 } 813 814 static void sync_wait_cb(void *arg) 815 { 816 struct ocfs2_cluster_connection *conn = arg; 817 struct ocfs2_live_connection *lc = conn->cc_private; 818 complete(&lc->oc_sync_wait); 819 } 820 821 static int sync_unlock(struct ocfs2_cluster_connection *conn, 822 struct dlm_lksb *lksb, char *name) 823 { 824 int error; 825 struct ocfs2_live_connection *lc = conn->cc_private; 826 827 error = dlm_unlock(conn->cc_lockspace, lksb->sb_lkid, 0, lksb, conn); 828 if (error) { 829 printk(KERN_ERR "%s lkid %x error %d\n", 830 name, lksb->sb_lkid, error); 831 return error; 832 } 833 834 wait_for_completion(&lc->oc_sync_wait); 835 836 if (lksb->sb_status != -DLM_EUNLOCK) { 837 printk(KERN_ERR "%s lkid %x status %d\n", 838 name, lksb->sb_lkid, lksb->sb_status); 839 return -1; 840 } 841 return 0; 842 } 843 844 static int sync_lock(struct ocfs2_cluster_connection *conn, 845 int mode, uint32_t flags, 846 struct dlm_lksb *lksb, char *name) 847 { 848 int error, status; 849 struct ocfs2_live_connection *lc = conn->cc_private; 850 851 error = dlm_lock(conn->cc_lockspace, mode, lksb, flags, 852 name, strlen(name), 853 0, sync_wait_cb, conn, NULL); 854 if (error) { 855 printk(KERN_ERR "%s lkid %x flags %x mode %d error %d\n", 856 name, lksb->sb_lkid, flags, mode, error); 857 return error; 858 } 859 860 wait_for_completion(&lc->oc_sync_wait); 861 862 status = lksb->sb_status; 863 864 if (status && status != -EAGAIN) { 865 printk(KERN_ERR "%s lkid %x flags %x mode %d status %d\n", 866 name, lksb->sb_lkid, flags, mode, status); 867 } 868 869 return status; 870 } 871 872 873 static int version_lock(struct ocfs2_cluster_connection *conn, int mode, 874 int flags) 875 { 876 struct ocfs2_live_connection *lc = conn->cc_private; 877 return sync_lock(conn, mode, flags, 878 &lc->oc_version_lksb, VERSION_LOCK); 879 } 880 881 static int version_unlock(struct ocfs2_cluster_connection *conn) 882 { 883 struct ocfs2_live_connection *lc = conn->cc_private; 884 return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK); 885 } 886 887 /* get_protocol_version() 888 * 889 * To exchange ocfs2 versioning, we use the LVB of the version dlm lock. 890 * The algorithm is: 891 * 1. Attempt to take the lock in EX mode (non-blocking). 892 * 2. If successful (which means it is the first mount), write the 893 * version number and downconvert to PR lock. 894 * 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after 895 * taking the PR lock. 896 */ 897 898 static int get_protocol_version(struct ocfs2_cluster_connection *conn) 899 { 900 int ret; 901 struct ocfs2_live_connection *lc = conn->cc_private; 902 struct ocfs2_protocol_version pv; 903 904 running_proto.pv_major = 905 ocfs2_user_plugin.sp_max_proto.pv_major; 906 running_proto.pv_minor = 907 ocfs2_user_plugin.sp_max_proto.pv_minor; 908 909 lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb; 910 ret = version_lock(conn, DLM_LOCK_EX, 911 DLM_LKF_VALBLK|DLM_LKF_NOQUEUE); 912 if (!ret) { 913 conn->cc_version.pv_major = running_proto.pv_major; 914 conn->cc_version.pv_minor = running_proto.pv_minor; 915 version_to_lvb(&running_proto, lc->oc_lvb); 916 version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK); 917 } else if (ret == -EAGAIN) { 918 ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK); 919 if (ret) 920 goto out; 921 lvb_to_version(lc->oc_lvb, &pv); 922 923 if ((pv.pv_major != running_proto.pv_major) || 924 (pv.pv_minor > running_proto.pv_minor)) { 925 ret = -EINVAL; 926 goto out; 927 } 928 929 conn->cc_version.pv_major = pv.pv_major; 930 conn->cc_version.pv_minor = pv.pv_minor; 931 } 932 out: 933 return ret; 934 } 935 936 static void user_recover_prep(void *arg) 937 { 938 } 939 940 static void user_recover_slot(void *arg, struct dlm_slot *slot) 941 { 942 struct ocfs2_cluster_connection *conn = arg; 943 printk(KERN_INFO "ocfs2: Node %d/%d down. Initiating recovery.\n", 944 slot->nodeid, slot->slot); 945 conn->cc_recovery_handler(slot->nodeid, conn->cc_recovery_data); 946 947 } 948 949 static void user_recover_done(void *arg, struct dlm_slot *slots, 950 int num_slots, int our_slot, 951 uint32_t generation) 952 { 953 struct ocfs2_cluster_connection *conn = arg; 954 struct ocfs2_live_connection *lc = conn->cc_private; 955 int i; 956 957 for (i = 0; i < num_slots; i++) 958 if (slots[i].slot == our_slot) { 959 atomic_set(&lc->oc_this_node, slots[i].nodeid); 960 break; 961 } 962 963 lc->oc_our_slot = our_slot; 964 wake_up(&lc->oc_wait); 965 } 966 967 static const struct dlm_lockspace_ops ocfs2_ls_ops = { 968 .recover_prep = user_recover_prep, 969 .recover_slot = user_recover_slot, 970 .recover_done = user_recover_done, 971 }; 972 973 static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn) 974 { 975 version_unlock(conn); 976 dlm_release_lockspace(conn->cc_lockspace, 2); 977 conn->cc_lockspace = NULL; 978 ocfs2_live_connection_drop(conn->cc_private); 979 conn->cc_private = NULL; 980 return 0; 981 } 982 983 static int user_cluster_connect(struct ocfs2_cluster_connection *conn) 984 { 985 dlm_lockspace_t *fsdlm; 986 struct ocfs2_live_connection *lc; 987 int rc, ops_rv; 988 989 BUG_ON(conn == NULL); 990 991 lc = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL); 992 if (!lc) 993 return -ENOMEM; 994 995 init_waitqueue_head(&lc->oc_wait); 996 init_completion(&lc->oc_sync_wait); 997 atomic_set(&lc->oc_this_node, 0); 998 conn->cc_private = lc; 999 lc->oc_type = NO_CONTROLD; 1000 1001 rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name, 1002 DLM_LSFL_FS | DLM_LSFL_NEWEXCL, DLM_LVB_LEN, 1003 &ocfs2_ls_ops, conn, &ops_rv, &fsdlm); 1004 if (rc) { 1005 if (rc == -EEXIST || rc == -EPROTO) 1006 printk(KERN_ERR "ocfs2: Unable to create the " 1007 "lockspace %s (%d), because a ocfs2-tools " 1008 "program is running on this file system " 1009 "with the same name lockspace\n", 1010 conn->cc_name, rc); 1011 goto out; 1012 } 1013 1014 if (ops_rv == -EOPNOTSUPP) { 1015 lc->oc_type = WITH_CONTROLD; 1016 printk(KERN_NOTICE "ocfs2: You seem to be using an older " 1017 "version of dlm_controld and/or ocfs2-tools." 1018 " Please consider upgrading.\n"); 1019 } else if (ops_rv) { 1020 rc = ops_rv; 1021 goto out; 1022 } 1023 conn->cc_lockspace = fsdlm; 1024 1025 rc = ocfs2_live_connection_attach(conn, lc); 1026 if (rc) 1027 goto out; 1028 1029 if (lc->oc_type == NO_CONTROLD) { 1030 rc = get_protocol_version(conn); 1031 if (rc) { 1032 printk(KERN_ERR "ocfs2: Could not determine" 1033 " locking version\n"); 1034 user_cluster_disconnect(conn); 1035 goto out; 1036 } 1037 wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0)); 1038 } 1039 1040 /* 1041 * running_proto must have been set before we allowed any mounts 1042 * to proceed. 1043 */ 1044 if (fs_protocol_compare(&running_proto, &conn->cc_version)) { 1045 printk(KERN_ERR 1046 "Unable to mount with fs locking protocol version " 1047 "%u.%u because negotiated protocol is %u.%u\n", 1048 conn->cc_version.pv_major, conn->cc_version.pv_minor, 1049 running_proto.pv_major, running_proto.pv_minor); 1050 rc = -EPROTO; 1051 ocfs2_live_connection_drop(lc); 1052 lc = NULL; 1053 } 1054 1055 out: 1056 if (rc) 1057 kfree(lc); 1058 return rc; 1059 } 1060 1061 1062 static int user_cluster_this_node(struct ocfs2_cluster_connection *conn, 1063 unsigned int *this_node) 1064 { 1065 int rc; 1066 struct ocfs2_live_connection *lc = conn->cc_private; 1067 1068 if (lc->oc_type == WITH_CONTROLD) 1069 rc = ocfs2_control_get_this_node(); 1070 else if (lc->oc_type == NO_CONTROLD) 1071 rc = atomic_read(&lc->oc_this_node); 1072 else 1073 rc = -EINVAL; 1074 1075 if (rc < 0) 1076 return rc; 1077 1078 *this_node = rc; 1079 return 0; 1080 } 1081 1082 static struct ocfs2_stack_operations ocfs2_user_plugin_ops = { 1083 .connect = user_cluster_connect, 1084 .disconnect = user_cluster_disconnect, 1085 .this_node = user_cluster_this_node, 1086 .dlm_lock = user_dlm_lock, 1087 .dlm_unlock = user_dlm_unlock, 1088 .lock_status = user_dlm_lock_status, 1089 .lvb_valid = user_dlm_lvb_valid, 1090 .lock_lvb = user_dlm_lvb, 1091 .plock = user_plock, 1092 .dump_lksb = user_dlm_dump_lksb, 1093 }; 1094 1095 static struct ocfs2_stack_plugin ocfs2_user_plugin = { 1096 .sp_name = "user", 1097 .sp_ops = &ocfs2_user_plugin_ops, 1098 .sp_owner = THIS_MODULE, 1099 }; 1100 1101 1102 static int __init ocfs2_user_plugin_init(void) 1103 { 1104 int rc; 1105 1106 rc = ocfs2_control_init(); 1107 if (!rc) { 1108 rc = ocfs2_stack_glue_register(&ocfs2_user_plugin); 1109 if (rc) 1110 ocfs2_control_exit(); 1111 } 1112 1113 return rc; 1114 } 1115 1116 static void __exit ocfs2_user_plugin_exit(void) 1117 { 1118 ocfs2_stack_glue_unregister(&ocfs2_user_plugin); 1119 ocfs2_control_exit(); 1120 } 1121 1122 MODULE_AUTHOR("Oracle"); 1123 MODULE_DESCRIPTION("ocfs2 driver for userspace cluster stacks"); 1124 MODULE_LICENSE("GPL"); 1125 module_init(ocfs2_user_plugin_init); 1126 module_exit(ocfs2_user_plugin_exit); 1127