1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * stack_user.c 5 * 6 * Code which interfaces ocfs2 with fs/dlm and a userspace stack. 7 * 8 * Copyright (C) 2007 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation, version 2. 13 * 14 * This program is distributed in the hope that it will be useful, 15 * but WITHOUT ANY WARRANTY; without even the implied warranty of 16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 17 * General Public License for more details. 18 */ 19 20 #include <linux/module.h> 21 #include <linux/fs.h> 22 #include <linux/miscdevice.h> 23 #include <linux/mutex.h> 24 #include <linux/slab.h> 25 #include <linux/reboot.h> 26 #include <linux/sched.h> 27 #include <asm/uaccess.h> 28 29 #include "stackglue.h" 30 31 #include <linux/dlm_plock.h> 32 33 /* 34 * The control protocol starts with a handshake. Until the handshake 35 * is complete, the control device will fail all write(2)s. 36 * 37 * The handshake is simple. First, the client reads until EOF. Each line 38 * of output is a supported protocol tag. All protocol tags are a single 39 * character followed by a two hex digit version number. Currently the 40 * only things supported is T01, for "Text-base version 0x01". Next, the 41 * client writes the version they would like to use, including the newline. 42 * Thus, the protocol tag is 'T01\n'. If the version tag written is 43 * unknown, -EINVAL is returned. Once the negotiation is complete, the 44 * client can start sending messages. 45 * 46 * The T01 protocol has three messages. First is the "SETN" message. 47 * It has the following syntax: 48 * 49 * SETN<space><8-char-hex-nodenum><newline> 50 * 51 * This is 14 characters. 52 * 53 * The "SETN" message must be the first message following the protocol. 54 * It tells ocfs2_control the local node number. 55 * 56 * Next comes the "SETV" message. It has the following syntax: 57 * 58 * SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> 59 * 60 * This is 11 characters. 61 * 62 * The "SETV" message sets the filesystem locking protocol version as 63 * negotiated by the client. The client negotiates based on the maximum 64 * version advertised in /sys/fs/ocfs2/max_locking_protocol. The major 65 * number from the "SETV" message must match 66 * ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number 67 * must be less than or equal to ...sp_max_version.pv_minor. 68 * 69 * Once this information has been set, mounts will be allowed. From this 70 * point on, the "DOWN" message can be sent for node down notification. 71 * It has the following syntax: 72 * 73 * DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> 74 * 75 * eg: 76 * 77 * DOWN 632A924FDD844190BDA93C0DF6B94899 00000001\n 78 * 79 * This is 47 characters. 80 */ 81 82 /* 83 * Whether or not the client has done the handshake. 84 * For now, we have just one protocol version. 85 */ 86 #define OCFS2_CONTROL_PROTO "T01\n" 87 #define OCFS2_CONTROL_PROTO_LEN 4 88 89 /* Handshake states */ 90 #define OCFS2_CONTROL_HANDSHAKE_INVALID (0) 91 #define OCFS2_CONTROL_HANDSHAKE_READ (1) 92 #define OCFS2_CONTROL_HANDSHAKE_PROTOCOL (2) 93 #define OCFS2_CONTROL_HANDSHAKE_VALID (3) 94 95 /* Messages */ 96 #define OCFS2_CONTROL_MESSAGE_OP_LEN 4 97 #define OCFS2_CONTROL_MESSAGE_SETNODE_OP "SETN" 98 #define OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN 14 99 #define OCFS2_CONTROL_MESSAGE_SETVERSION_OP "SETV" 100 #define OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN 11 101 #define OCFS2_CONTROL_MESSAGE_DOWN_OP "DOWN" 102 #define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN 47 103 #define OCFS2_TEXT_UUID_LEN 32 104 #define OCFS2_CONTROL_MESSAGE_VERNUM_LEN 2 105 #define OCFS2_CONTROL_MESSAGE_NODENUM_LEN 8 106 #define VERSION_LOCK "version_lock" 107 108 enum ocfs2_connection_type { 109 WITH_CONTROLD, 110 NO_CONTROLD 111 }; 112 113 /* 114 * ocfs2_live_connection is refcounted because the filesystem and 115 * miscdevice sides can detach in different order. Let's just be safe. 116 */ 117 struct ocfs2_live_connection { 118 struct list_head oc_list; 119 struct ocfs2_cluster_connection *oc_conn; 120 enum ocfs2_connection_type oc_type; 121 atomic_t oc_this_node; 122 int oc_our_slot; 123 struct dlm_lksb oc_version_lksb; 124 char oc_lvb[DLM_LVB_LEN]; 125 struct completion oc_sync_wait; 126 wait_queue_head_t oc_wait; 127 }; 128 129 struct ocfs2_control_private { 130 struct list_head op_list; 131 int op_state; 132 int op_this_node; 133 struct ocfs2_protocol_version op_proto; 134 }; 135 136 /* SETN<space><8-char-hex-nodenum><newline> */ 137 struct ocfs2_control_message_setn { 138 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 139 char space; 140 char nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN]; 141 char newline; 142 }; 143 144 /* SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> */ 145 struct ocfs2_control_message_setv { 146 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 147 char space1; 148 char major[OCFS2_CONTROL_MESSAGE_VERNUM_LEN]; 149 char space2; 150 char minor[OCFS2_CONTROL_MESSAGE_VERNUM_LEN]; 151 char newline; 152 }; 153 154 /* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */ 155 struct ocfs2_control_message_down { 156 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 157 char space1; 158 char uuid[OCFS2_TEXT_UUID_LEN]; 159 char space2; 160 char nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN]; 161 char newline; 162 }; 163 164 union ocfs2_control_message { 165 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 166 struct ocfs2_control_message_setn u_setn; 167 struct ocfs2_control_message_setv u_setv; 168 struct ocfs2_control_message_down u_down; 169 }; 170 171 static struct ocfs2_stack_plugin ocfs2_user_plugin; 172 173 static atomic_t ocfs2_control_opened; 174 static int ocfs2_control_this_node = -1; 175 static struct ocfs2_protocol_version running_proto; 176 177 static LIST_HEAD(ocfs2_live_connection_list); 178 static LIST_HEAD(ocfs2_control_private_list); 179 static DEFINE_MUTEX(ocfs2_control_lock); 180 181 static inline void ocfs2_control_set_handshake_state(struct file *file, 182 int state) 183 { 184 struct ocfs2_control_private *p = file->private_data; 185 p->op_state = state; 186 } 187 188 static inline int ocfs2_control_get_handshake_state(struct file *file) 189 { 190 struct ocfs2_control_private *p = file->private_data; 191 return p->op_state; 192 } 193 194 static struct ocfs2_live_connection *ocfs2_connection_find(const char *name) 195 { 196 size_t len = strlen(name); 197 struct ocfs2_live_connection *c; 198 199 BUG_ON(!mutex_is_locked(&ocfs2_control_lock)); 200 201 list_for_each_entry(c, &ocfs2_live_connection_list, oc_list) { 202 if ((c->oc_conn->cc_namelen == len) && 203 !strncmp(c->oc_conn->cc_name, name, len)) 204 return c; 205 } 206 207 return NULL; 208 } 209 210 /* 211 * ocfs2_live_connection structures are created underneath the ocfs2 212 * mount path. Since the VFS prevents multiple calls to 213 * fill_super(), we can't get dupes here. 214 */ 215 static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn, 216 struct ocfs2_live_connection *c) 217 { 218 int rc = 0; 219 220 mutex_lock(&ocfs2_control_lock); 221 c->oc_conn = conn; 222 223 if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened)) 224 list_add(&c->oc_list, &ocfs2_live_connection_list); 225 else { 226 printk(KERN_ERR 227 "ocfs2: Userspace control daemon is not present\n"); 228 rc = -ESRCH; 229 } 230 231 mutex_unlock(&ocfs2_control_lock); 232 return rc; 233 } 234 235 /* 236 * This function disconnects the cluster connection from ocfs2_control. 237 * Afterwards, userspace can't affect the cluster connection. 238 */ 239 static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c) 240 { 241 mutex_lock(&ocfs2_control_lock); 242 list_del_init(&c->oc_list); 243 c->oc_conn = NULL; 244 mutex_unlock(&ocfs2_control_lock); 245 246 kfree(c); 247 } 248 249 static int ocfs2_control_cfu(void *target, size_t target_len, 250 const char __user *buf, size_t count) 251 { 252 /* The T01 expects write(2) calls to have exactly one command */ 253 if ((count != target_len) || 254 (count > sizeof(union ocfs2_control_message))) 255 return -EINVAL; 256 257 if (copy_from_user(target, buf, target_len)) 258 return -EFAULT; 259 260 return 0; 261 } 262 263 static ssize_t ocfs2_control_validate_protocol(struct file *file, 264 const char __user *buf, 265 size_t count) 266 { 267 ssize_t ret; 268 char kbuf[OCFS2_CONTROL_PROTO_LEN]; 269 270 ret = ocfs2_control_cfu(kbuf, OCFS2_CONTROL_PROTO_LEN, 271 buf, count); 272 if (ret) 273 return ret; 274 275 if (strncmp(kbuf, OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN)) 276 return -EINVAL; 277 278 ocfs2_control_set_handshake_state(file, 279 OCFS2_CONTROL_HANDSHAKE_PROTOCOL); 280 281 return count; 282 } 283 284 static void ocfs2_control_send_down(const char *uuid, 285 int nodenum) 286 { 287 struct ocfs2_live_connection *c; 288 289 mutex_lock(&ocfs2_control_lock); 290 291 c = ocfs2_connection_find(uuid); 292 if (c) { 293 BUG_ON(c->oc_conn == NULL); 294 c->oc_conn->cc_recovery_handler(nodenum, 295 c->oc_conn->cc_recovery_data); 296 } 297 298 mutex_unlock(&ocfs2_control_lock); 299 } 300 301 /* 302 * Called whenever configuration elements are sent to /dev/ocfs2_control. 303 * If all configuration elements are present, try to set the global 304 * values. If there is a problem, return an error. Skip any missing 305 * elements, and only bump ocfs2_control_opened when we have all elements 306 * and are successful. 307 */ 308 static int ocfs2_control_install_private(struct file *file) 309 { 310 int rc = 0; 311 int set_p = 1; 312 struct ocfs2_control_private *p = file->private_data; 313 314 BUG_ON(p->op_state != OCFS2_CONTROL_HANDSHAKE_PROTOCOL); 315 316 mutex_lock(&ocfs2_control_lock); 317 318 if (p->op_this_node < 0) { 319 set_p = 0; 320 } else if ((ocfs2_control_this_node >= 0) && 321 (ocfs2_control_this_node != p->op_this_node)) { 322 rc = -EINVAL; 323 goto out_unlock; 324 } 325 326 if (!p->op_proto.pv_major) { 327 set_p = 0; 328 } else if (!list_empty(&ocfs2_live_connection_list) && 329 ((running_proto.pv_major != p->op_proto.pv_major) || 330 (running_proto.pv_minor != p->op_proto.pv_minor))) { 331 rc = -EINVAL; 332 goto out_unlock; 333 } 334 335 if (set_p) { 336 ocfs2_control_this_node = p->op_this_node; 337 running_proto.pv_major = p->op_proto.pv_major; 338 running_proto.pv_minor = p->op_proto.pv_minor; 339 } 340 341 out_unlock: 342 mutex_unlock(&ocfs2_control_lock); 343 344 if (!rc && set_p) { 345 /* We set the global values successfully */ 346 atomic_inc(&ocfs2_control_opened); 347 ocfs2_control_set_handshake_state(file, 348 OCFS2_CONTROL_HANDSHAKE_VALID); 349 } 350 351 return rc; 352 } 353 354 static int ocfs2_control_get_this_node(void) 355 { 356 int rc; 357 358 mutex_lock(&ocfs2_control_lock); 359 if (ocfs2_control_this_node < 0) 360 rc = -EINVAL; 361 else 362 rc = ocfs2_control_this_node; 363 mutex_unlock(&ocfs2_control_lock); 364 365 return rc; 366 } 367 368 static int ocfs2_control_do_setnode_msg(struct file *file, 369 struct ocfs2_control_message_setn *msg) 370 { 371 long nodenum; 372 char *ptr = NULL; 373 struct ocfs2_control_private *p = file->private_data; 374 375 if (ocfs2_control_get_handshake_state(file) != 376 OCFS2_CONTROL_HANDSHAKE_PROTOCOL) 377 return -EINVAL; 378 379 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP, 380 OCFS2_CONTROL_MESSAGE_OP_LEN)) 381 return -EINVAL; 382 383 if ((msg->space != ' ') || (msg->newline != '\n')) 384 return -EINVAL; 385 msg->space = msg->newline = '\0'; 386 387 nodenum = simple_strtol(msg->nodestr, &ptr, 16); 388 if (!ptr || *ptr) 389 return -EINVAL; 390 391 if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) || 392 (nodenum > INT_MAX) || (nodenum < 0)) 393 return -ERANGE; 394 p->op_this_node = nodenum; 395 396 return ocfs2_control_install_private(file); 397 } 398 399 static int ocfs2_control_do_setversion_msg(struct file *file, 400 struct ocfs2_control_message_setv *msg) 401 { 402 long major, minor; 403 char *ptr = NULL; 404 struct ocfs2_control_private *p = file->private_data; 405 struct ocfs2_protocol_version *max = 406 &ocfs2_user_plugin.sp_max_proto; 407 408 if (ocfs2_control_get_handshake_state(file) != 409 OCFS2_CONTROL_HANDSHAKE_PROTOCOL) 410 return -EINVAL; 411 412 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP, 413 OCFS2_CONTROL_MESSAGE_OP_LEN)) 414 return -EINVAL; 415 416 if ((msg->space1 != ' ') || (msg->space2 != ' ') || 417 (msg->newline != '\n')) 418 return -EINVAL; 419 msg->space1 = msg->space2 = msg->newline = '\0'; 420 421 major = simple_strtol(msg->major, &ptr, 16); 422 if (!ptr || *ptr) 423 return -EINVAL; 424 minor = simple_strtol(msg->minor, &ptr, 16); 425 if (!ptr || *ptr) 426 return -EINVAL; 427 428 /* 429 * The major must be between 1 and 255, inclusive. The minor 430 * must be between 0 and 255, inclusive. The version passed in 431 * must be within the maximum version supported by the filesystem. 432 */ 433 if ((major == LONG_MIN) || (major == LONG_MAX) || 434 (major > (u8)-1) || (major < 1)) 435 return -ERANGE; 436 if ((minor == LONG_MIN) || (minor == LONG_MAX) || 437 (minor > (u8)-1) || (minor < 0)) 438 return -ERANGE; 439 if ((major != max->pv_major) || 440 (minor > max->pv_minor)) 441 return -EINVAL; 442 443 p->op_proto.pv_major = major; 444 p->op_proto.pv_minor = minor; 445 446 return ocfs2_control_install_private(file); 447 } 448 449 static int ocfs2_control_do_down_msg(struct file *file, 450 struct ocfs2_control_message_down *msg) 451 { 452 long nodenum; 453 char *p = NULL; 454 455 if (ocfs2_control_get_handshake_state(file) != 456 OCFS2_CONTROL_HANDSHAKE_VALID) 457 return -EINVAL; 458 459 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_DOWN_OP, 460 OCFS2_CONTROL_MESSAGE_OP_LEN)) 461 return -EINVAL; 462 463 if ((msg->space1 != ' ') || (msg->space2 != ' ') || 464 (msg->newline != '\n')) 465 return -EINVAL; 466 msg->space1 = msg->space2 = msg->newline = '\0'; 467 468 nodenum = simple_strtol(msg->nodestr, &p, 16); 469 if (!p || *p) 470 return -EINVAL; 471 472 if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) || 473 (nodenum > INT_MAX) || (nodenum < 0)) 474 return -ERANGE; 475 476 ocfs2_control_send_down(msg->uuid, nodenum); 477 478 return 0; 479 } 480 481 static ssize_t ocfs2_control_message(struct file *file, 482 const char __user *buf, 483 size_t count) 484 { 485 ssize_t ret; 486 union ocfs2_control_message msg; 487 488 /* Try to catch padding issues */ 489 WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) != 490 (sizeof(msg.u_down.tag) + sizeof(msg.u_down.space1))); 491 492 memset(&msg, 0, sizeof(union ocfs2_control_message)); 493 ret = ocfs2_control_cfu(&msg, count, buf, count); 494 if (ret) 495 goto out; 496 497 if ((count == OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN) && 498 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP, 499 OCFS2_CONTROL_MESSAGE_OP_LEN)) 500 ret = ocfs2_control_do_setnode_msg(file, &msg.u_setn); 501 else if ((count == OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN) && 502 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP, 503 OCFS2_CONTROL_MESSAGE_OP_LEN)) 504 ret = ocfs2_control_do_setversion_msg(file, &msg.u_setv); 505 else if ((count == OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN) && 506 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN_OP, 507 OCFS2_CONTROL_MESSAGE_OP_LEN)) 508 ret = ocfs2_control_do_down_msg(file, &msg.u_down); 509 else 510 ret = -EINVAL; 511 512 out: 513 return ret ? ret : count; 514 } 515 516 static ssize_t ocfs2_control_write(struct file *file, 517 const char __user *buf, 518 size_t count, 519 loff_t *ppos) 520 { 521 ssize_t ret; 522 523 switch (ocfs2_control_get_handshake_state(file)) { 524 case OCFS2_CONTROL_HANDSHAKE_INVALID: 525 ret = -EINVAL; 526 break; 527 528 case OCFS2_CONTROL_HANDSHAKE_READ: 529 ret = ocfs2_control_validate_protocol(file, buf, 530 count); 531 break; 532 533 case OCFS2_CONTROL_HANDSHAKE_PROTOCOL: 534 case OCFS2_CONTROL_HANDSHAKE_VALID: 535 ret = ocfs2_control_message(file, buf, count); 536 break; 537 538 default: 539 BUG(); 540 ret = -EIO; 541 break; 542 } 543 544 return ret; 545 } 546 547 /* 548 * This is a naive version. If we ever have a new protocol, we'll expand 549 * it. Probably using seq_file. 550 */ 551 static ssize_t ocfs2_control_read(struct file *file, 552 char __user *buf, 553 size_t count, 554 loff_t *ppos) 555 { 556 ssize_t ret; 557 558 ret = simple_read_from_buffer(buf, count, ppos, 559 OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN); 560 561 /* Have we read the whole protocol list? */ 562 if (ret > 0 && *ppos >= OCFS2_CONTROL_PROTO_LEN) 563 ocfs2_control_set_handshake_state(file, 564 OCFS2_CONTROL_HANDSHAKE_READ); 565 566 return ret; 567 } 568 569 static int ocfs2_control_release(struct inode *inode, struct file *file) 570 { 571 struct ocfs2_control_private *p = file->private_data; 572 573 mutex_lock(&ocfs2_control_lock); 574 575 if (ocfs2_control_get_handshake_state(file) != 576 OCFS2_CONTROL_HANDSHAKE_VALID) 577 goto out; 578 579 if (atomic_dec_and_test(&ocfs2_control_opened)) { 580 if (!list_empty(&ocfs2_live_connection_list)) { 581 /* XXX: Do bad things! */ 582 printk(KERN_ERR 583 "ocfs2: Unexpected release of ocfs2_control!\n" 584 " Loss of cluster connection requires " 585 "an emergency restart!\n"); 586 emergency_restart(); 587 } 588 /* 589 * Last valid close clears the node number and resets 590 * the locking protocol version 591 */ 592 ocfs2_control_this_node = -1; 593 running_proto.pv_major = 0; 594 running_proto.pv_minor = 0; 595 } 596 597 out: 598 list_del_init(&p->op_list); 599 file->private_data = NULL; 600 601 mutex_unlock(&ocfs2_control_lock); 602 603 kfree(p); 604 605 return 0; 606 } 607 608 static int ocfs2_control_open(struct inode *inode, struct file *file) 609 { 610 struct ocfs2_control_private *p; 611 612 p = kzalloc(sizeof(struct ocfs2_control_private), GFP_KERNEL); 613 if (!p) 614 return -ENOMEM; 615 p->op_this_node = -1; 616 617 mutex_lock(&ocfs2_control_lock); 618 file->private_data = p; 619 list_add(&p->op_list, &ocfs2_control_private_list); 620 mutex_unlock(&ocfs2_control_lock); 621 622 return 0; 623 } 624 625 static const struct file_operations ocfs2_control_fops = { 626 .open = ocfs2_control_open, 627 .release = ocfs2_control_release, 628 .read = ocfs2_control_read, 629 .write = ocfs2_control_write, 630 .owner = THIS_MODULE, 631 .llseek = default_llseek, 632 }; 633 634 static struct miscdevice ocfs2_control_device = { 635 .minor = MISC_DYNAMIC_MINOR, 636 .name = "ocfs2_control", 637 .fops = &ocfs2_control_fops, 638 }; 639 640 static int ocfs2_control_init(void) 641 { 642 int rc; 643 644 atomic_set(&ocfs2_control_opened, 0); 645 646 rc = misc_register(&ocfs2_control_device); 647 if (rc) 648 printk(KERN_ERR 649 "ocfs2: Unable to register ocfs2_control device " 650 "(errno %d)\n", 651 -rc); 652 653 return rc; 654 } 655 656 static void ocfs2_control_exit(void) 657 { 658 misc_deregister(&ocfs2_control_device); 659 } 660 661 static void fsdlm_lock_ast_wrapper(void *astarg) 662 { 663 struct ocfs2_dlm_lksb *lksb = astarg; 664 int status = lksb->lksb_fsdlm.sb_status; 665 666 /* 667 * For now we're punting on the issue of other non-standard errors 668 * where we can't tell if the unlock_ast or lock_ast should be called. 669 * The main "other error" that's possible is EINVAL which means the 670 * function was called with invalid args, which shouldn't be possible 671 * since the caller here is under our control. Other non-standard 672 * errors probably fall into the same category, or otherwise are fatal 673 * which means we can't carry on anyway. 674 */ 675 676 if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL) 677 lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0); 678 else 679 lksb->lksb_conn->cc_proto->lp_lock_ast(lksb); 680 } 681 682 static void fsdlm_blocking_ast_wrapper(void *astarg, int level) 683 { 684 struct ocfs2_dlm_lksb *lksb = astarg; 685 686 lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level); 687 } 688 689 static int user_dlm_lock(struct ocfs2_cluster_connection *conn, 690 int mode, 691 struct ocfs2_dlm_lksb *lksb, 692 u32 flags, 693 void *name, 694 unsigned int namelen) 695 { 696 int ret; 697 698 if (!lksb->lksb_fsdlm.sb_lvbptr) 699 lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb + 700 sizeof(struct dlm_lksb); 701 702 ret = dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm, 703 flags|DLM_LKF_NODLCKWT, name, namelen, 0, 704 fsdlm_lock_ast_wrapper, lksb, 705 fsdlm_blocking_ast_wrapper); 706 return ret; 707 } 708 709 static int user_dlm_unlock(struct ocfs2_cluster_connection *conn, 710 struct ocfs2_dlm_lksb *lksb, 711 u32 flags) 712 { 713 int ret; 714 715 ret = dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid, 716 flags, &lksb->lksb_fsdlm, lksb); 717 return ret; 718 } 719 720 static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb) 721 { 722 return lksb->lksb_fsdlm.sb_status; 723 } 724 725 static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb) 726 { 727 int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID; 728 729 return !invalid; 730 } 731 732 static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb) 733 { 734 if (!lksb->lksb_fsdlm.sb_lvbptr) 735 lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb + 736 sizeof(struct dlm_lksb); 737 return (void *)(lksb->lksb_fsdlm.sb_lvbptr); 738 } 739 740 static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb) 741 { 742 } 743 744 static int user_plock(struct ocfs2_cluster_connection *conn, 745 u64 ino, 746 struct file *file, 747 int cmd, 748 struct file_lock *fl) 749 { 750 /* 751 * This more or less just demuxes the plock request into any 752 * one of three dlm calls. 753 * 754 * Internally, fs/dlm will pass these to a misc device, which 755 * a userspace daemon will read and write to. 756 * 757 * For now, cancel requests (which happen internally only), 758 * are turned into unlocks. Most of this function taken from 759 * gfs2_lock. 760 */ 761 762 if (cmd == F_CANCELLK) { 763 cmd = F_SETLK; 764 fl->fl_type = F_UNLCK; 765 } 766 767 if (IS_GETLK(cmd)) 768 return dlm_posix_get(conn->cc_lockspace, ino, file, fl); 769 else if (fl->fl_type == F_UNLCK) 770 return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl); 771 else 772 return dlm_posix_lock(conn->cc_lockspace, ino, file, cmd, fl); 773 } 774 775 /* 776 * Compare a requested locking protocol version against the current one. 777 * 778 * If the major numbers are different, they are incompatible. 779 * If the current minor is greater than the request, they are incompatible. 780 * If the current minor is less than or equal to the request, they are 781 * compatible, and the requester should run at the current minor version. 782 */ 783 static int fs_protocol_compare(struct ocfs2_protocol_version *existing, 784 struct ocfs2_protocol_version *request) 785 { 786 if (existing->pv_major != request->pv_major) 787 return 1; 788 789 if (existing->pv_minor > request->pv_minor) 790 return 1; 791 792 if (existing->pv_minor < request->pv_minor) 793 request->pv_minor = existing->pv_minor; 794 795 return 0; 796 } 797 798 static void lvb_to_version(char *lvb, struct ocfs2_protocol_version *ver) 799 { 800 struct ocfs2_protocol_version *pv = 801 (struct ocfs2_protocol_version *)lvb; 802 /* 803 * ocfs2_protocol_version has two u8 variables, so we don't 804 * need any endian conversion. 805 */ 806 ver->pv_major = pv->pv_major; 807 ver->pv_minor = pv->pv_minor; 808 } 809 810 static void version_to_lvb(struct ocfs2_protocol_version *ver, char *lvb) 811 { 812 struct ocfs2_protocol_version *pv = 813 (struct ocfs2_protocol_version *)lvb; 814 /* 815 * ocfs2_protocol_version has two u8 variables, so we don't 816 * need any endian conversion. 817 */ 818 pv->pv_major = ver->pv_major; 819 pv->pv_minor = ver->pv_minor; 820 } 821 822 static void sync_wait_cb(void *arg) 823 { 824 struct ocfs2_cluster_connection *conn = arg; 825 struct ocfs2_live_connection *lc = conn->cc_private; 826 complete(&lc->oc_sync_wait); 827 } 828 829 static int sync_unlock(struct ocfs2_cluster_connection *conn, 830 struct dlm_lksb *lksb, char *name) 831 { 832 int error; 833 struct ocfs2_live_connection *lc = conn->cc_private; 834 835 error = dlm_unlock(conn->cc_lockspace, lksb->sb_lkid, 0, lksb, conn); 836 if (error) { 837 printk(KERN_ERR "%s lkid %x error %d\n", 838 name, lksb->sb_lkid, error); 839 return error; 840 } 841 842 wait_for_completion(&lc->oc_sync_wait); 843 844 if (lksb->sb_status != -DLM_EUNLOCK) { 845 printk(KERN_ERR "%s lkid %x status %d\n", 846 name, lksb->sb_lkid, lksb->sb_status); 847 return -1; 848 } 849 return 0; 850 } 851 852 static int sync_lock(struct ocfs2_cluster_connection *conn, 853 int mode, uint32_t flags, 854 struct dlm_lksb *lksb, char *name) 855 { 856 int error, status; 857 struct ocfs2_live_connection *lc = conn->cc_private; 858 859 error = dlm_lock(conn->cc_lockspace, mode, lksb, flags, 860 name, strlen(name), 861 0, sync_wait_cb, conn, NULL); 862 if (error) { 863 printk(KERN_ERR "%s lkid %x flags %x mode %d error %d\n", 864 name, lksb->sb_lkid, flags, mode, error); 865 return error; 866 } 867 868 wait_for_completion(&lc->oc_sync_wait); 869 870 status = lksb->sb_status; 871 872 if (status && status != -EAGAIN) { 873 printk(KERN_ERR "%s lkid %x flags %x mode %d status %d\n", 874 name, lksb->sb_lkid, flags, mode, status); 875 } 876 877 return status; 878 } 879 880 881 static int version_lock(struct ocfs2_cluster_connection *conn, int mode, 882 int flags) 883 { 884 struct ocfs2_live_connection *lc = conn->cc_private; 885 return sync_lock(conn, mode, flags, 886 &lc->oc_version_lksb, VERSION_LOCK); 887 } 888 889 static int version_unlock(struct ocfs2_cluster_connection *conn) 890 { 891 struct ocfs2_live_connection *lc = conn->cc_private; 892 return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK); 893 } 894 895 /* get_protocol_version() 896 * 897 * To exchange ocfs2 versioning, we use the LVB of the version dlm lock. 898 * The algorithm is: 899 * 1. Attempt to take the lock in EX mode (non-blocking). 900 * 2. If successful (which means it is the first mount), write the 901 * version number and downconvert to PR lock. 902 * 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after 903 * taking the PR lock. 904 */ 905 906 static int get_protocol_version(struct ocfs2_cluster_connection *conn) 907 { 908 int ret; 909 struct ocfs2_live_connection *lc = conn->cc_private; 910 struct ocfs2_protocol_version pv; 911 912 running_proto.pv_major = 913 ocfs2_user_plugin.sp_max_proto.pv_major; 914 running_proto.pv_minor = 915 ocfs2_user_plugin.sp_max_proto.pv_minor; 916 917 lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb; 918 ret = version_lock(conn, DLM_LOCK_EX, 919 DLM_LKF_VALBLK|DLM_LKF_NOQUEUE); 920 if (!ret) { 921 conn->cc_version.pv_major = running_proto.pv_major; 922 conn->cc_version.pv_minor = running_proto.pv_minor; 923 version_to_lvb(&running_proto, lc->oc_lvb); 924 version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK); 925 } else if (ret == -EAGAIN) { 926 ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK); 927 if (ret) 928 goto out; 929 lvb_to_version(lc->oc_lvb, &pv); 930 931 if ((pv.pv_major != running_proto.pv_major) || 932 (pv.pv_minor > running_proto.pv_minor)) { 933 ret = -EINVAL; 934 goto out; 935 } 936 937 conn->cc_version.pv_major = pv.pv_major; 938 conn->cc_version.pv_minor = pv.pv_minor; 939 } 940 out: 941 return ret; 942 } 943 944 static void user_recover_prep(void *arg) 945 { 946 } 947 948 static void user_recover_slot(void *arg, struct dlm_slot *slot) 949 { 950 struct ocfs2_cluster_connection *conn = arg; 951 printk(KERN_INFO "ocfs2: Node %d/%d down. Initiating recovery.\n", 952 slot->nodeid, slot->slot); 953 conn->cc_recovery_handler(slot->nodeid, conn->cc_recovery_data); 954 955 } 956 957 static void user_recover_done(void *arg, struct dlm_slot *slots, 958 int num_slots, int our_slot, 959 uint32_t generation) 960 { 961 struct ocfs2_cluster_connection *conn = arg; 962 struct ocfs2_live_connection *lc = conn->cc_private; 963 int i; 964 965 for (i = 0; i < num_slots; i++) 966 if (slots[i].slot == our_slot) { 967 atomic_set(&lc->oc_this_node, slots[i].nodeid); 968 break; 969 } 970 971 lc->oc_our_slot = our_slot; 972 wake_up(&lc->oc_wait); 973 } 974 975 static const struct dlm_lockspace_ops ocfs2_ls_ops = { 976 .recover_prep = user_recover_prep, 977 .recover_slot = user_recover_slot, 978 .recover_done = user_recover_done, 979 }; 980 981 static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn) 982 { 983 version_unlock(conn); 984 dlm_release_lockspace(conn->cc_lockspace, 2); 985 conn->cc_lockspace = NULL; 986 ocfs2_live_connection_drop(conn->cc_private); 987 conn->cc_private = NULL; 988 return 0; 989 } 990 991 static int user_cluster_connect(struct ocfs2_cluster_connection *conn) 992 { 993 dlm_lockspace_t *fsdlm; 994 struct ocfs2_live_connection *lc; 995 int rc, ops_rv; 996 997 BUG_ON(conn == NULL); 998 999 lc = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL); 1000 if (!lc) 1001 return -ENOMEM; 1002 1003 init_waitqueue_head(&lc->oc_wait); 1004 init_completion(&lc->oc_sync_wait); 1005 atomic_set(&lc->oc_this_node, 0); 1006 conn->cc_private = lc; 1007 lc->oc_type = NO_CONTROLD; 1008 1009 rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name, 1010 DLM_LSFL_FS, DLM_LVB_LEN, 1011 &ocfs2_ls_ops, conn, &ops_rv, &fsdlm); 1012 if (rc) 1013 goto out; 1014 1015 if (ops_rv == -EOPNOTSUPP) { 1016 lc->oc_type = WITH_CONTROLD; 1017 printk(KERN_NOTICE "ocfs2: You seem to be using an older " 1018 "version of dlm_controld and/or ocfs2-tools." 1019 " Please consider upgrading.\n"); 1020 } else if (ops_rv) { 1021 rc = ops_rv; 1022 goto out; 1023 } 1024 conn->cc_lockspace = fsdlm; 1025 1026 rc = ocfs2_live_connection_attach(conn, lc); 1027 if (rc) 1028 goto out; 1029 1030 if (lc->oc_type == NO_CONTROLD) { 1031 rc = get_protocol_version(conn); 1032 if (rc) { 1033 printk(KERN_ERR "ocfs2: Could not determine" 1034 " locking version\n"); 1035 user_cluster_disconnect(conn); 1036 goto out; 1037 } 1038 wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0)); 1039 } 1040 1041 /* 1042 * running_proto must have been set before we allowed any mounts 1043 * to proceed. 1044 */ 1045 if (fs_protocol_compare(&running_proto, &conn->cc_version)) { 1046 printk(KERN_ERR 1047 "Unable to mount with fs locking protocol version " 1048 "%u.%u because negotiated protocol is %u.%u\n", 1049 conn->cc_version.pv_major, conn->cc_version.pv_minor, 1050 running_proto.pv_major, running_proto.pv_minor); 1051 rc = -EPROTO; 1052 ocfs2_live_connection_drop(lc); 1053 lc = NULL; 1054 } 1055 1056 out: 1057 if (rc) 1058 kfree(lc); 1059 return rc; 1060 } 1061 1062 1063 static int user_cluster_this_node(struct ocfs2_cluster_connection *conn, 1064 unsigned int *this_node) 1065 { 1066 int rc; 1067 struct ocfs2_live_connection *lc = conn->cc_private; 1068 1069 if (lc->oc_type == WITH_CONTROLD) 1070 rc = ocfs2_control_get_this_node(); 1071 else if (lc->oc_type == NO_CONTROLD) 1072 rc = atomic_read(&lc->oc_this_node); 1073 else 1074 rc = -EINVAL; 1075 1076 if (rc < 0) 1077 return rc; 1078 1079 *this_node = rc; 1080 return 0; 1081 } 1082 1083 static struct ocfs2_stack_operations ocfs2_user_plugin_ops = { 1084 .connect = user_cluster_connect, 1085 .disconnect = user_cluster_disconnect, 1086 .this_node = user_cluster_this_node, 1087 .dlm_lock = user_dlm_lock, 1088 .dlm_unlock = user_dlm_unlock, 1089 .lock_status = user_dlm_lock_status, 1090 .lvb_valid = user_dlm_lvb_valid, 1091 .lock_lvb = user_dlm_lvb, 1092 .plock = user_plock, 1093 .dump_lksb = user_dlm_dump_lksb, 1094 }; 1095 1096 static struct ocfs2_stack_plugin ocfs2_user_plugin = { 1097 .sp_name = "user", 1098 .sp_ops = &ocfs2_user_plugin_ops, 1099 .sp_owner = THIS_MODULE, 1100 }; 1101 1102 1103 static int __init ocfs2_user_plugin_init(void) 1104 { 1105 int rc; 1106 1107 rc = ocfs2_control_init(); 1108 if (!rc) { 1109 rc = ocfs2_stack_glue_register(&ocfs2_user_plugin); 1110 if (rc) 1111 ocfs2_control_exit(); 1112 } 1113 1114 return rc; 1115 } 1116 1117 static void __exit ocfs2_user_plugin_exit(void) 1118 { 1119 ocfs2_stack_glue_unregister(&ocfs2_user_plugin); 1120 ocfs2_control_exit(); 1121 } 1122 1123 MODULE_AUTHOR("Oracle"); 1124 MODULE_DESCRIPTION("ocfs2 driver for userspace cluster stacks"); 1125 MODULE_LICENSE("GPL"); 1126 module_init(ocfs2_user_plugin_init); 1127 module_exit(ocfs2_user_plugin_exit); 1128