1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * stack_user.c 5 * 6 * Code which interfaces ocfs2 with fs/dlm and a userspace stack. 7 * 8 * Copyright (C) 2007 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation, version 2. 13 * 14 * This program is distributed in the hope that it will be useful, 15 * but WITHOUT ANY WARRANTY; without even the implied warranty of 16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 17 * General Public License for more details. 18 */ 19 20 #include <linux/module.h> 21 #include <linux/fs.h> 22 #include <linux/miscdevice.h> 23 #include <linux/mutex.h> 24 #include <linux/slab.h> 25 #include <linux/reboot.h> 26 #include <linux/sched.h> 27 #include <asm/uaccess.h> 28 29 #include "stackglue.h" 30 31 #include <linux/dlm_plock.h> 32 33 /* 34 * The control protocol starts with a handshake. Until the handshake 35 * is complete, the control device will fail all write(2)s. 36 * 37 * The handshake is simple. First, the client reads until EOF. Each line 38 * of output is a supported protocol tag. All protocol tags are a single 39 * character followed by a two hex digit version number. Currently the 40 * only things supported is T01, for "Text-base version 0x01". Next, the 41 * client writes the version they would like to use, including the newline. 42 * Thus, the protocol tag is 'T01\n'. If the version tag written is 43 * unknown, -EINVAL is returned. Once the negotiation is complete, the 44 * client can start sending messages. 45 * 46 * The T01 protocol has three messages. First is the "SETN" message. 47 * It has the following syntax: 48 * 49 * SETN<space><8-char-hex-nodenum><newline> 50 * 51 * This is 14 characters. 52 * 53 * The "SETN" message must be the first message following the protocol. 54 * It tells ocfs2_control the local node number. 55 * 56 * Next comes the "SETV" message. It has the following syntax: 57 * 58 * SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> 59 * 60 * This is 11 characters. 61 * 62 * The "SETV" message sets the filesystem locking protocol version as 63 * negotiated by the client. The client negotiates based on the maximum 64 * version advertised in /sys/fs/ocfs2/max_locking_protocol. The major 65 * number from the "SETV" message must match 66 * ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number 67 * must be less than or equal to ...sp_max_version.pv_minor. 68 * 69 * Once this information has been set, mounts will be allowed. From this 70 * point on, the "DOWN" message can be sent for node down notification. 71 * It has the following syntax: 72 * 73 * DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> 74 * 75 * eg: 76 * 77 * DOWN 632A924FDD844190BDA93C0DF6B94899 00000001\n 78 * 79 * This is 47 characters. 80 */ 81 82 /* 83 * Whether or not the client has done the handshake. 84 * For now, we have just one protocol version. 85 */ 86 #define OCFS2_CONTROL_PROTO "T01\n" 87 #define OCFS2_CONTROL_PROTO_LEN 4 88 89 /* Handshake states */ 90 #define OCFS2_CONTROL_HANDSHAKE_INVALID (0) 91 #define OCFS2_CONTROL_HANDSHAKE_READ (1) 92 #define OCFS2_CONTROL_HANDSHAKE_PROTOCOL (2) 93 #define OCFS2_CONTROL_HANDSHAKE_VALID (3) 94 95 /* Messages */ 96 #define OCFS2_CONTROL_MESSAGE_OP_LEN 4 97 #define OCFS2_CONTROL_MESSAGE_SETNODE_OP "SETN" 98 #define OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN 14 99 #define OCFS2_CONTROL_MESSAGE_SETVERSION_OP "SETV" 100 #define OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN 11 101 #define OCFS2_CONTROL_MESSAGE_DOWN_OP "DOWN" 102 #define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN 47 103 #define OCFS2_TEXT_UUID_LEN 32 104 #define OCFS2_CONTROL_MESSAGE_VERNUM_LEN 2 105 #define OCFS2_CONTROL_MESSAGE_NODENUM_LEN 8 106 #define VERSION_LOCK "version_lock" 107 108 enum ocfs2_connection_type { 109 WITH_CONTROLD, 110 NO_CONTROLD 111 }; 112 113 /* 114 * ocfs2_live_connection is refcounted because the filesystem and 115 * miscdevice sides can detach in different order. Let's just be safe. 116 */ 117 struct ocfs2_live_connection { 118 struct list_head oc_list; 119 struct ocfs2_cluster_connection *oc_conn; 120 enum ocfs2_connection_type oc_type; 121 atomic_t oc_this_node; 122 int oc_our_slot; 123 struct dlm_lksb oc_version_lksb; 124 char oc_lvb[DLM_LVB_LEN]; 125 struct completion oc_sync_wait; 126 wait_queue_head_t oc_wait; 127 }; 128 129 struct ocfs2_control_private { 130 struct list_head op_list; 131 int op_state; 132 int op_this_node; 133 struct ocfs2_protocol_version op_proto; 134 }; 135 136 /* SETN<space><8-char-hex-nodenum><newline> */ 137 struct ocfs2_control_message_setn { 138 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 139 char space; 140 char nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN]; 141 char newline; 142 }; 143 144 /* SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> */ 145 struct ocfs2_control_message_setv { 146 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 147 char space1; 148 char major[OCFS2_CONTROL_MESSAGE_VERNUM_LEN]; 149 char space2; 150 char minor[OCFS2_CONTROL_MESSAGE_VERNUM_LEN]; 151 char newline; 152 }; 153 154 /* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */ 155 struct ocfs2_control_message_down { 156 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 157 char space1; 158 char uuid[OCFS2_TEXT_UUID_LEN]; 159 char space2; 160 char nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN]; 161 char newline; 162 }; 163 164 union ocfs2_control_message { 165 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 166 struct ocfs2_control_message_setn u_setn; 167 struct ocfs2_control_message_setv u_setv; 168 struct ocfs2_control_message_down u_down; 169 }; 170 171 static struct ocfs2_stack_plugin ocfs2_user_plugin; 172 173 static atomic_t ocfs2_control_opened; 174 static int ocfs2_control_this_node = -1; 175 static struct ocfs2_protocol_version running_proto; 176 177 static LIST_HEAD(ocfs2_live_connection_list); 178 static LIST_HEAD(ocfs2_control_private_list); 179 static DEFINE_MUTEX(ocfs2_control_lock); 180 181 static inline void ocfs2_control_set_handshake_state(struct file *file, 182 int state) 183 { 184 struct ocfs2_control_private *p = file->private_data; 185 p->op_state = state; 186 } 187 188 static inline int ocfs2_control_get_handshake_state(struct file *file) 189 { 190 struct ocfs2_control_private *p = file->private_data; 191 return p->op_state; 192 } 193 194 static struct ocfs2_live_connection *ocfs2_connection_find(const char *name) 195 { 196 size_t len = strlen(name); 197 struct ocfs2_live_connection *c; 198 199 BUG_ON(!mutex_is_locked(&ocfs2_control_lock)); 200 201 list_for_each_entry(c, &ocfs2_live_connection_list, oc_list) { 202 if ((c->oc_conn->cc_namelen == len) && 203 !strncmp(c->oc_conn->cc_name, name, len)) 204 return c; 205 } 206 207 return NULL; 208 } 209 210 /* 211 * ocfs2_live_connection structures are created underneath the ocfs2 212 * mount path. Since the VFS prevents multiple calls to 213 * fill_super(), we can't get dupes here. 214 */ 215 static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn, 216 struct ocfs2_live_connection *c) 217 { 218 int rc = 0; 219 220 mutex_lock(&ocfs2_control_lock); 221 c->oc_conn = conn; 222 223 if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened)) 224 list_add(&c->oc_list, &ocfs2_live_connection_list); 225 else { 226 printk(KERN_ERR 227 "ocfs2: Userspace control daemon is not present\n"); 228 rc = -ESRCH; 229 } 230 231 mutex_unlock(&ocfs2_control_lock); 232 return rc; 233 } 234 235 /* 236 * This function disconnects the cluster connection from ocfs2_control. 237 * Afterwards, userspace can't affect the cluster connection. 238 */ 239 static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c) 240 { 241 mutex_lock(&ocfs2_control_lock); 242 list_del_init(&c->oc_list); 243 c->oc_conn = NULL; 244 mutex_unlock(&ocfs2_control_lock); 245 246 kfree(c); 247 } 248 249 static int ocfs2_control_cfu(void *target, size_t target_len, 250 const char __user *buf, size_t count) 251 { 252 /* The T01 expects write(2) calls to have exactly one command */ 253 if ((count != target_len) || 254 (count > sizeof(union ocfs2_control_message))) 255 return -EINVAL; 256 257 if (copy_from_user(target, buf, target_len)) 258 return -EFAULT; 259 260 return 0; 261 } 262 263 static ssize_t ocfs2_control_validate_protocol(struct file *file, 264 const char __user *buf, 265 size_t count) 266 { 267 ssize_t ret; 268 char kbuf[OCFS2_CONTROL_PROTO_LEN]; 269 270 ret = ocfs2_control_cfu(kbuf, OCFS2_CONTROL_PROTO_LEN, 271 buf, count); 272 if (ret) 273 return ret; 274 275 if (strncmp(kbuf, OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN)) 276 return -EINVAL; 277 278 ocfs2_control_set_handshake_state(file, 279 OCFS2_CONTROL_HANDSHAKE_PROTOCOL); 280 281 return count; 282 } 283 284 static void ocfs2_control_send_down(const char *uuid, 285 int nodenum) 286 { 287 struct ocfs2_live_connection *c; 288 289 mutex_lock(&ocfs2_control_lock); 290 291 c = ocfs2_connection_find(uuid); 292 if (c) { 293 BUG_ON(c->oc_conn == NULL); 294 c->oc_conn->cc_recovery_handler(nodenum, 295 c->oc_conn->cc_recovery_data); 296 } 297 298 mutex_unlock(&ocfs2_control_lock); 299 } 300 301 /* 302 * Called whenever configuration elements are sent to /dev/ocfs2_control. 303 * If all configuration elements are present, try to set the global 304 * values. If there is a problem, return an error. Skip any missing 305 * elements, and only bump ocfs2_control_opened when we have all elements 306 * and are successful. 307 */ 308 static int ocfs2_control_install_private(struct file *file) 309 { 310 int rc = 0; 311 int set_p = 1; 312 struct ocfs2_control_private *p = file->private_data; 313 314 BUG_ON(p->op_state != OCFS2_CONTROL_HANDSHAKE_PROTOCOL); 315 316 mutex_lock(&ocfs2_control_lock); 317 318 if (p->op_this_node < 0) { 319 set_p = 0; 320 } else if ((ocfs2_control_this_node >= 0) && 321 (ocfs2_control_this_node != p->op_this_node)) { 322 rc = -EINVAL; 323 goto out_unlock; 324 } 325 326 if (!p->op_proto.pv_major) { 327 set_p = 0; 328 } else if (!list_empty(&ocfs2_live_connection_list) && 329 ((running_proto.pv_major != p->op_proto.pv_major) || 330 (running_proto.pv_minor != p->op_proto.pv_minor))) { 331 rc = -EINVAL; 332 goto out_unlock; 333 } 334 335 if (set_p) { 336 ocfs2_control_this_node = p->op_this_node; 337 running_proto.pv_major = p->op_proto.pv_major; 338 running_proto.pv_minor = p->op_proto.pv_minor; 339 } 340 341 out_unlock: 342 mutex_unlock(&ocfs2_control_lock); 343 344 if (!rc && set_p) { 345 /* We set the global values successfully */ 346 atomic_inc(&ocfs2_control_opened); 347 ocfs2_control_set_handshake_state(file, 348 OCFS2_CONTROL_HANDSHAKE_VALID); 349 } 350 351 return rc; 352 } 353 354 static int ocfs2_control_get_this_node(void) 355 { 356 int rc; 357 358 mutex_lock(&ocfs2_control_lock); 359 if (ocfs2_control_this_node < 0) 360 rc = -EINVAL; 361 else 362 rc = ocfs2_control_this_node; 363 mutex_unlock(&ocfs2_control_lock); 364 365 return rc; 366 } 367 368 static int ocfs2_control_do_setnode_msg(struct file *file, 369 struct ocfs2_control_message_setn *msg) 370 { 371 long nodenum; 372 char *ptr = NULL; 373 struct ocfs2_control_private *p = file->private_data; 374 375 if (ocfs2_control_get_handshake_state(file) != 376 OCFS2_CONTROL_HANDSHAKE_PROTOCOL) 377 return -EINVAL; 378 379 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP, 380 OCFS2_CONTROL_MESSAGE_OP_LEN)) 381 return -EINVAL; 382 383 if ((msg->space != ' ') || (msg->newline != '\n')) 384 return -EINVAL; 385 msg->space = msg->newline = '\0'; 386 387 nodenum = simple_strtol(msg->nodestr, &ptr, 16); 388 if (!ptr || *ptr) 389 return -EINVAL; 390 391 if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) || 392 (nodenum > INT_MAX) || (nodenum < 0)) 393 return -ERANGE; 394 p->op_this_node = nodenum; 395 396 return ocfs2_control_install_private(file); 397 } 398 399 static int ocfs2_control_do_setversion_msg(struct file *file, 400 struct ocfs2_control_message_setv *msg) 401 { 402 long major, minor; 403 char *ptr = NULL; 404 struct ocfs2_control_private *p = file->private_data; 405 struct ocfs2_protocol_version *max = 406 &ocfs2_user_plugin.sp_max_proto; 407 408 if (ocfs2_control_get_handshake_state(file) != 409 OCFS2_CONTROL_HANDSHAKE_PROTOCOL) 410 return -EINVAL; 411 412 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP, 413 OCFS2_CONTROL_MESSAGE_OP_LEN)) 414 return -EINVAL; 415 416 if ((msg->space1 != ' ') || (msg->space2 != ' ') || 417 (msg->newline != '\n')) 418 return -EINVAL; 419 msg->space1 = msg->space2 = msg->newline = '\0'; 420 421 major = simple_strtol(msg->major, &ptr, 16); 422 if (!ptr || *ptr) 423 return -EINVAL; 424 minor = simple_strtol(msg->minor, &ptr, 16); 425 if (!ptr || *ptr) 426 return -EINVAL; 427 428 /* 429 * The major must be between 1 and 255, inclusive. The minor 430 * must be between 0 and 255, inclusive. The version passed in 431 * must be within the maximum version supported by the filesystem. 432 */ 433 if ((major == LONG_MIN) || (major == LONG_MAX) || 434 (major > (u8)-1) || (major < 1)) 435 return -ERANGE; 436 if ((minor == LONG_MIN) || (minor == LONG_MAX) || 437 (minor > (u8)-1) || (minor < 0)) 438 return -ERANGE; 439 if ((major != max->pv_major) || 440 (minor > max->pv_minor)) 441 return -EINVAL; 442 443 p->op_proto.pv_major = major; 444 p->op_proto.pv_minor = minor; 445 446 return ocfs2_control_install_private(file); 447 } 448 449 static int ocfs2_control_do_down_msg(struct file *file, 450 struct ocfs2_control_message_down *msg) 451 { 452 long nodenum; 453 char *p = NULL; 454 455 if (ocfs2_control_get_handshake_state(file) != 456 OCFS2_CONTROL_HANDSHAKE_VALID) 457 return -EINVAL; 458 459 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_DOWN_OP, 460 OCFS2_CONTROL_MESSAGE_OP_LEN)) 461 return -EINVAL; 462 463 if ((msg->space1 != ' ') || (msg->space2 != ' ') || 464 (msg->newline != '\n')) 465 return -EINVAL; 466 msg->space1 = msg->space2 = msg->newline = '\0'; 467 468 nodenum = simple_strtol(msg->nodestr, &p, 16); 469 if (!p || *p) 470 return -EINVAL; 471 472 if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) || 473 (nodenum > INT_MAX) || (nodenum < 0)) 474 return -ERANGE; 475 476 ocfs2_control_send_down(msg->uuid, nodenum); 477 478 return 0; 479 } 480 481 static ssize_t ocfs2_control_message(struct file *file, 482 const char __user *buf, 483 size_t count) 484 { 485 ssize_t ret; 486 union ocfs2_control_message msg; 487 488 /* Try to catch padding issues */ 489 WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) != 490 (sizeof(msg.u_down.tag) + sizeof(msg.u_down.space1))); 491 492 memset(&msg, 0, sizeof(union ocfs2_control_message)); 493 ret = ocfs2_control_cfu(&msg, count, buf, count); 494 if (ret) 495 goto out; 496 497 if ((count == OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN) && 498 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP, 499 OCFS2_CONTROL_MESSAGE_OP_LEN)) 500 ret = ocfs2_control_do_setnode_msg(file, &msg.u_setn); 501 else if ((count == OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN) && 502 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP, 503 OCFS2_CONTROL_MESSAGE_OP_LEN)) 504 ret = ocfs2_control_do_setversion_msg(file, &msg.u_setv); 505 else if ((count == OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN) && 506 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN_OP, 507 OCFS2_CONTROL_MESSAGE_OP_LEN)) 508 ret = ocfs2_control_do_down_msg(file, &msg.u_down); 509 else 510 ret = -EINVAL; 511 512 out: 513 return ret ? ret : count; 514 } 515 516 static ssize_t ocfs2_control_write(struct file *file, 517 const char __user *buf, 518 size_t count, 519 loff_t *ppos) 520 { 521 ssize_t ret; 522 523 switch (ocfs2_control_get_handshake_state(file)) { 524 case OCFS2_CONTROL_HANDSHAKE_INVALID: 525 ret = -EINVAL; 526 break; 527 528 case OCFS2_CONTROL_HANDSHAKE_READ: 529 ret = ocfs2_control_validate_protocol(file, buf, 530 count); 531 break; 532 533 case OCFS2_CONTROL_HANDSHAKE_PROTOCOL: 534 case OCFS2_CONTROL_HANDSHAKE_VALID: 535 ret = ocfs2_control_message(file, buf, count); 536 break; 537 538 default: 539 BUG(); 540 ret = -EIO; 541 break; 542 } 543 544 return ret; 545 } 546 547 /* 548 * This is a naive version. If we ever have a new protocol, we'll expand 549 * it. Probably using seq_file. 550 */ 551 static ssize_t ocfs2_control_read(struct file *file, 552 char __user *buf, 553 size_t count, 554 loff_t *ppos) 555 { 556 ssize_t ret; 557 558 ret = simple_read_from_buffer(buf, count, ppos, 559 OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN); 560 561 /* Have we read the whole protocol list? */ 562 if (ret > 0 && *ppos >= OCFS2_CONTROL_PROTO_LEN) 563 ocfs2_control_set_handshake_state(file, 564 OCFS2_CONTROL_HANDSHAKE_READ); 565 566 return ret; 567 } 568 569 static int ocfs2_control_release(struct inode *inode, struct file *file) 570 { 571 struct ocfs2_control_private *p = file->private_data; 572 573 mutex_lock(&ocfs2_control_lock); 574 575 if (ocfs2_control_get_handshake_state(file) != 576 OCFS2_CONTROL_HANDSHAKE_VALID) 577 goto out; 578 579 if (atomic_dec_and_test(&ocfs2_control_opened)) { 580 if (!list_empty(&ocfs2_live_connection_list)) { 581 /* XXX: Do bad things! */ 582 printk(KERN_ERR 583 "ocfs2: Unexpected release of ocfs2_control!\n" 584 " Loss of cluster connection requires " 585 "an emergency restart!\n"); 586 emergency_restart(); 587 } 588 /* 589 * Last valid close clears the node number and resets 590 * the locking protocol version 591 */ 592 ocfs2_control_this_node = -1; 593 running_proto.pv_major = 0; 594 running_proto.pv_minor = 0; 595 } 596 597 out: 598 list_del_init(&p->op_list); 599 file->private_data = NULL; 600 601 mutex_unlock(&ocfs2_control_lock); 602 603 kfree(p); 604 605 return 0; 606 } 607 608 static int ocfs2_control_open(struct inode *inode, struct file *file) 609 { 610 struct ocfs2_control_private *p; 611 612 p = kzalloc(sizeof(struct ocfs2_control_private), GFP_KERNEL); 613 if (!p) 614 return -ENOMEM; 615 p->op_this_node = -1; 616 617 mutex_lock(&ocfs2_control_lock); 618 file->private_data = p; 619 list_add(&p->op_list, &ocfs2_control_private_list); 620 mutex_unlock(&ocfs2_control_lock); 621 622 return 0; 623 } 624 625 static const struct file_operations ocfs2_control_fops = { 626 .open = ocfs2_control_open, 627 .release = ocfs2_control_release, 628 .read = ocfs2_control_read, 629 .write = ocfs2_control_write, 630 .owner = THIS_MODULE, 631 .llseek = default_llseek, 632 }; 633 634 static struct miscdevice ocfs2_control_device = { 635 .minor = MISC_DYNAMIC_MINOR, 636 .name = "ocfs2_control", 637 .fops = &ocfs2_control_fops, 638 }; 639 640 static int ocfs2_control_init(void) 641 { 642 int rc; 643 644 atomic_set(&ocfs2_control_opened, 0); 645 646 rc = misc_register(&ocfs2_control_device); 647 if (rc) 648 printk(KERN_ERR 649 "ocfs2: Unable to register ocfs2_control device " 650 "(errno %d)\n", 651 -rc); 652 653 return rc; 654 } 655 656 static void ocfs2_control_exit(void) 657 { 658 int rc; 659 660 rc = misc_deregister(&ocfs2_control_device); 661 if (rc) 662 printk(KERN_ERR 663 "ocfs2: Unable to deregister ocfs2_control device " 664 "(errno %d)\n", 665 -rc); 666 } 667 668 static void fsdlm_lock_ast_wrapper(void *astarg) 669 { 670 struct ocfs2_dlm_lksb *lksb = astarg; 671 int status = lksb->lksb_fsdlm.sb_status; 672 673 /* 674 * For now we're punting on the issue of other non-standard errors 675 * where we can't tell if the unlock_ast or lock_ast should be called. 676 * The main "other error" that's possible is EINVAL which means the 677 * function was called with invalid args, which shouldn't be possible 678 * since the caller here is under our control. Other non-standard 679 * errors probably fall into the same category, or otherwise are fatal 680 * which means we can't carry on anyway. 681 */ 682 683 if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL) 684 lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0); 685 else 686 lksb->lksb_conn->cc_proto->lp_lock_ast(lksb); 687 } 688 689 static void fsdlm_blocking_ast_wrapper(void *astarg, int level) 690 { 691 struct ocfs2_dlm_lksb *lksb = astarg; 692 693 lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level); 694 } 695 696 static int user_dlm_lock(struct ocfs2_cluster_connection *conn, 697 int mode, 698 struct ocfs2_dlm_lksb *lksb, 699 u32 flags, 700 void *name, 701 unsigned int namelen) 702 { 703 int ret; 704 705 if (!lksb->lksb_fsdlm.sb_lvbptr) 706 lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb + 707 sizeof(struct dlm_lksb); 708 709 ret = dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm, 710 flags|DLM_LKF_NODLCKWT, name, namelen, 0, 711 fsdlm_lock_ast_wrapper, lksb, 712 fsdlm_blocking_ast_wrapper); 713 return ret; 714 } 715 716 static int user_dlm_unlock(struct ocfs2_cluster_connection *conn, 717 struct ocfs2_dlm_lksb *lksb, 718 u32 flags) 719 { 720 int ret; 721 722 ret = dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid, 723 flags, &lksb->lksb_fsdlm, lksb); 724 return ret; 725 } 726 727 static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb) 728 { 729 return lksb->lksb_fsdlm.sb_status; 730 } 731 732 static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb) 733 { 734 int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID; 735 736 return !invalid; 737 } 738 739 static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb) 740 { 741 if (!lksb->lksb_fsdlm.sb_lvbptr) 742 lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb + 743 sizeof(struct dlm_lksb); 744 return (void *)(lksb->lksb_fsdlm.sb_lvbptr); 745 } 746 747 static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb) 748 { 749 } 750 751 static int user_plock(struct ocfs2_cluster_connection *conn, 752 u64 ino, 753 struct file *file, 754 int cmd, 755 struct file_lock *fl) 756 { 757 /* 758 * This more or less just demuxes the plock request into any 759 * one of three dlm calls. 760 * 761 * Internally, fs/dlm will pass these to a misc device, which 762 * a userspace daemon will read and write to. 763 * 764 * For now, cancel requests (which happen internally only), 765 * are turned into unlocks. Most of this function taken from 766 * gfs2_lock. 767 */ 768 769 if (cmd == F_CANCELLK) { 770 cmd = F_SETLK; 771 fl->fl_type = F_UNLCK; 772 } 773 774 if (IS_GETLK(cmd)) 775 return dlm_posix_get(conn->cc_lockspace, ino, file, fl); 776 else if (fl->fl_type == F_UNLCK) 777 return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl); 778 else 779 return dlm_posix_lock(conn->cc_lockspace, ino, file, cmd, fl); 780 } 781 782 /* 783 * Compare a requested locking protocol version against the current one. 784 * 785 * If the major numbers are different, they are incompatible. 786 * If the current minor is greater than the request, they are incompatible. 787 * If the current minor is less than or equal to the request, they are 788 * compatible, and the requester should run at the current minor version. 789 */ 790 static int fs_protocol_compare(struct ocfs2_protocol_version *existing, 791 struct ocfs2_protocol_version *request) 792 { 793 if (existing->pv_major != request->pv_major) 794 return 1; 795 796 if (existing->pv_minor > request->pv_minor) 797 return 1; 798 799 if (existing->pv_minor < request->pv_minor) 800 request->pv_minor = existing->pv_minor; 801 802 return 0; 803 } 804 805 static void lvb_to_version(char *lvb, struct ocfs2_protocol_version *ver) 806 { 807 struct ocfs2_protocol_version *pv = 808 (struct ocfs2_protocol_version *)lvb; 809 /* 810 * ocfs2_protocol_version has two u8 variables, so we don't 811 * need any endian conversion. 812 */ 813 ver->pv_major = pv->pv_major; 814 ver->pv_minor = pv->pv_minor; 815 } 816 817 static void version_to_lvb(struct ocfs2_protocol_version *ver, char *lvb) 818 { 819 struct ocfs2_protocol_version *pv = 820 (struct ocfs2_protocol_version *)lvb; 821 /* 822 * ocfs2_protocol_version has two u8 variables, so we don't 823 * need any endian conversion. 824 */ 825 pv->pv_major = ver->pv_major; 826 pv->pv_minor = ver->pv_minor; 827 } 828 829 static void sync_wait_cb(void *arg) 830 { 831 struct ocfs2_cluster_connection *conn = arg; 832 struct ocfs2_live_connection *lc = conn->cc_private; 833 complete(&lc->oc_sync_wait); 834 } 835 836 static int sync_unlock(struct ocfs2_cluster_connection *conn, 837 struct dlm_lksb *lksb, char *name) 838 { 839 int error; 840 struct ocfs2_live_connection *lc = conn->cc_private; 841 842 error = dlm_unlock(conn->cc_lockspace, lksb->sb_lkid, 0, lksb, conn); 843 if (error) { 844 printk(KERN_ERR "%s lkid %x error %d\n", 845 name, lksb->sb_lkid, error); 846 return error; 847 } 848 849 wait_for_completion(&lc->oc_sync_wait); 850 851 if (lksb->sb_status != -DLM_EUNLOCK) { 852 printk(KERN_ERR "%s lkid %x status %d\n", 853 name, lksb->sb_lkid, lksb->sb_status); 854 return -1; 855 } 856 return 0; 857 } 858 859 static int sync_lock(struct ocfs2_cluster_connection *conn, 860 int mode, uint32_t flags, 861 struct dlm_lksb *lksb, char *name) 862 { 863 int error, status; 864 struct ocfs2_live_connection *lc = conn->cc_private; 865 866 error = dlm_lock(conn->cc_lockspace, mode, lksb, flags, 867 name, strlen(name), 868 0, sync_wait_cb, conn, NULL); 869 if (error) { 870 printk(KERN_ERR "%s lkid %x flags %x mode %d error %d\n", 871 name, lksb->sb_lkid, flags, mode, error); 872 return error; 873 } 874 875 wait_for_completion(&lc->oc_sync_wait); 876 877 status = lksb->sb_status; 878 879 if (status && status != -EAGAIN) { 880 printk(KERN_ERR "%s lkid %x flags %x mode %d status %d\n", 881 name, lksb->sb_lkid, flags, mode, status); 882 } 883 884 return status; 885 } 886 887 888 static int version_lock(struct ocfs2_cluster_connection *conn, int mode, 889 int flags) 890 { 891 struct ocfs2_live_connection *lc = conn->cc_private; 892 return sync_lock(conn, mode, flags, 893 &lc->oc_version_lksb, VERSION_LOCK); 894 } 895 896 static int version_unlock(struct ocfs2_cluster_connection *conn) 897 { 898 struct ocfs2_live_connection *lc = conn->cc_private; 899 return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK); 900 } 901 902 /* get_protocol_version() 903 * 904 * To exchange ocfs2 versioning, we use the LVB of the version dlm lock. 905 * The algorithm is: 906 * 1. Attempt to take the lock in EX mode (non-blocking). 907 * 2. If successful (which means it is the first mount), write the 908 * version number and downconvert to PR lock. 909 * 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after 910 * taking the PR lock. 911 */ 912 913 static int get_protocol_version(struct ocfs2_cluster_connection *conn) 914 { 915 int ret; 916 struct ocfs2_live_connection *lc = conn->cc_private; 917 struct ocfs2_protocol_version pv; 918 919 running_proto.pv_major = 920 ocfs2_user_plugin.sp_max_proto.pv_major; 921 running_proto.pv_minor = 922 ocfs2_user_plugin.sp_max_proto.pv_minor; 923 924 lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb; 925 ret = version_lock(conn, DLM_LOCK_EX, 926 DLM_LKF_VALBLK|DLM_LKF_NOQUEUE); 927 if (!ret) { 928 conn->cc_version.pv_major = running_proto.pv_major; 929 conn->cc_version.pv_minor = running_proto.pv_minor; 930 version_to_lvb(&running_proto, lc->oc_lvb); 931 version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK); 932 } else if (ret == -EAGAIN) { 933 ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK); 934 if (ret) 935 goto out; 936 lvb_to_version(lc->oc_lvb, &pv); 937 938 if ((pv.pv_major != running_proto.pv_major) || 939 (pv.pv_minor > running_proto.pv_minor)) { 940 ret = -EINVAL; 941 goto out; 942 } 943 944 conn->cc_version.pv_major = pv.pv_major; 945 conn->cc_version.pv_minor = pv.pv_minor; 946 } 947 out: 948 return ret; 949 } 950 951 static void user_recover_prep(void *arg) 952 { 953 } 954 955 static void user_recover_slot(void *arg, struct dlm_slot *slot) 956 { 957 struct ocfs2_cluster_connection *conn = arg; 958 printk(KERN_INFO "ocfs2: Node %d/%d down. Initiating recovery.\n", 959 slot->nodeid, slot->slot); 960 conn->cc_recovery_handler(slot->nodeid, conn->cc_recovery_data); 961 962 } 963 964 static void user_recover_done(void *arg, struct dlm_slot *slots, 965 int num_slots, int our_slot, 966 uint32_t generation) 967 { 968 struct ocfs2_cluster_connection *conn = arg; 969 struct ocfs2_live_connection *lc = conn->cc_private; 970 int i; 971 972 for (i = 0; i < num_slots; i++) 973 if (slots[i].slot == our_slot) { 974 atomic_set(&lc->oc_this_node, slots[i].nodeid); 975 break; 976 } 977 978 lc->oc_our_slot = our_slot; 979 wake_up(&lc->oc_wait); 980 } 981 982 static const struct dlm_lockspace_ops ocfs2_ls_ops = { 983 .recover_prep = user_recover_prep, 984 .recover_slot = user_recover_slot, 985 .recover_done = user_recover_done, 986 }; 987 988 static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn) 989 { 990 version_unlock(conn); 991 dlm_release_lockspace(conn->cc_lockspace, 2); 992 conn->cc_lockspace = NULL; 993 ocfs2_live_connection_drop(conn->cc_private); 994 conn->cc_private = NULL; 995 return 0; 996 } 997 998 static int user_cluster_connect(struct ocfs2_cluster_connection *conn) 999 { 1000 dlm_lockspace_t *fsdlm; 1001 struct ocfs2_live_connection *lc; 1002 int rc, ops_rv; 1003 1004 BUG_ON(conn == NULL); 1005 1006 lc = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL); 1007 if (!lc) { 1008 rc = -ENOMEM; 1009 goto out; 1010 } 1011 1012 init_waitqueue_head(&lc->oc_wait); 1013 init_completion(&lc->oc_sync_wait); 1014 atomic_set(&lc->oc_this_node, 0); 1015 conn->cc_private = lc; 1016 lc->oc_type = NO_CONTROLD; 1017 1018 rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name, 1019 DLM_LSFL_FS, DLM_LVB_LEN, 1020 &ocfs2_ls_ops, conn, &ops_rv, &fsdlm); 1021 if (rc) 1022 goto out; 1023 1024 if (ops_rv == -EOPNOTSUPP) { 1025 lc->oc_type = WITH_CONTROLD; 1026 printk(KERN_NOTICE "ocfs2: You seem to be using an older " 1027 "version of dlm_controld and/or ocfs2-tools." 1028 " Please consider upgrading.\n"); 1029 } else if (ops_rv) { 1030 rc = ops_rv; 1031 goto out; 1032 } 1033 conn->cc_lockspace = fsdlm; 1034 1035 rc = ocfs2_live_connection_attach(conn, lc); 1036 if (rc) 1037 goto out; 1038 1039 if (lc->oc_type == NO_CONTROLD) { 1040 rc = get_protocol_version(conn); 1041 if (rc) { 1042 printk(KERN_ERR "ocfs2: Could not determine" 1043 " locking version\n"); 1044 user_cluster_disconnect(conn); 1045 goto out; 1046 } 1047 wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0)); 1048 } 1049 1050 /* 1051 * running_proto must have been set before we allowed any mounts 1052 * to proceed. 1053 */ 1054 if (fs_protocol_compare(&running_proto, &conn->cc_version)) { 1055 printk(KERN_ERR 1056 "Unable to mount with fs locking protocol version " 1057 "%u.%u because negotiated protocol is %u.%u\n", 1058 conn->cc_version.pv_major, conn->cc_version.pv_minor, 1059 running_proto.pv_major, running_proto.pv_minor); 1060 rc = -EPROTO; 1061 ocfs2_live_connection_drop(lc); 1062 lc = NULL; 1063 } 1064 1065 out: 1066 if (rc && lc) 1067 kfree(lc); 1068 return rc; 1069 } 1070 1071 1072 static int user_cluster_this_node(struct ocfs2_cluster_connection *conn, 1073 unsigned int *this_node) 1074 { 1075 int rc; 1076 struct ocfs2_live_connection *lc = conn->cc_private; 1077 1078 if (lc->oc_type == WITH_CONTROLD) 1079 rc = ocfs2_control_get_this_node(); 1080 else if (lc->oc_type == NO_CONTROLD) 1081 rc = atomic_read(&lc->oc_this_node); 1082 else 1083 rc = -EINVAL; 1084 1085 if (rc < 0) 1086 return rc; 1087 1088 *this_node = rc; 1089 return 0; 1090 } 1091 1092 static struct ocfs2_stack_operations ocfs2_user_plugin_ops = { 1093 .connect = user_cluster_connect, 1094 .disconnect = user_cluster_disconnect, 1095 .this_node = user_cluster_this_node, 1096 .dlm_lock = user_dlm_lock, 1097 .dlm_unlock = user_dlm_unlock, 1098 .lock_status = user_dlm_lock_status, 1099 .lvb_valid = user_dlm_lvb_valid, 1100 .lock_lvb = user_dlm_lvb, 1101 .plock = user_plock, 1102 .dump_lksb = user_dlm_dump_lksb, 1103 }; 1104 1105 static struct ocfs2_stack_plugin ocfs2_user_plugin = { 1106 .sp_name = "user", 1107 .sp_ops = &ocfs2_user_plugin_ops, 1108 .sp_owner = THIS_MODULE, 1109 }; 1110 1111 1112 static int __init ocfs2_user_plugin_init(void) 1113 { 1114 int rc; 1115 1116 rc = ocfs2_control_init(); 1117 if (!rc) { 1118 rc = ocfs2_stack_glue_register(&ocfs2_user_plugin); 1119 if (rc) 1120 ocfs2_control_exit(); 1121 } 1122 1123 return rc; 1124 } 1125 1126 static void __exit ocfs2_user_plugin_exit(void) 1127 { 1128 ocfs2_stack_glue_unregister(&ocfs2_user_plugin); 1129 ocfs2_control_exit(); 1130 } 1131 1132 MODULE_AUTHOR("Oracle"); 1133 MODULE_DESCRIPTION("ocfs2 driver for userspace cluster stacks"); 1134 MODULE_LICENSE("GPL"); 1135 module_init(ocfs2_user_plugin_init); 1136 module_exit(ocfs2_user_plugin_exit); 1137