1 /* 2 drbd_receiver.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 */ 24 25 26 #include <linux/module.h> 27 28 #include <asm/uaccess.h> 29 #include <net/sock.h> 30 31 #include <linux/drbd.h> 32 #include <linux/fs.h> 33 #include <linux/file.h> 34 #include <linux/in.h> 35 #include <linux/mm.h> 36 #include <linux/memcontrol.h> 37 #include <linux/mm_inline.h> 38 #include <linux/slab.h> 39 #include <linux/pkt_sched.h> 40 #define __KERNEL_SYSCALLS__ 41 #include <linux/unistd.h> 42 #include <linux/vmalloc.h> 43 #include <linux/random.h> 44 #include <linux/string.h> 45 #include <linux/scatterlist.h> 46 #include "drbd_int.h" 47 #include "drbd_req.h" 48 49 #include "drbd_vli.h" 50 51 enum finish_epoch { 52 FE_STILL_LIVE, 53 FE_DESTROYED, 54 FE_RECYCLED, 55 }; 56 57 static int drbd_do_handshake(struct drbd_conf *mdev); 58 static int drbd_do_auth(struct drbd_conf *mdev); 59 60 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event); 61 static int e_end_block(struct drbd_conf *, struct drbd_work *, int); 62 63 64 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 65 66 /* 67 * some helper functions to deal with single linked page lists, 68 * page->private being our "next" pointer. 69 */ 70 71 /* If at least n pages are linked at head, get n pages off. 72 * Otherwise, don't modify head, and return NULL. 73 * Locking is the responsibility of the caller. 74 */ 75 static struct page *page_chain_del(struct page **head, int n) 76 { 77 struct page *page; 78 struct page *tmp; 79 80 BUG_ON(!n); 81 BUG_ON(!head); 82 83 page = *head; 84 85 if (!page) 86 return NULL; 87 88 while (page) { 89 tmp = page_chain_next(page); 90 if (--n == 0) 91 break; /* found sufficient pages */ 92 if (tmp == NULL) 93 /* insufficient pages, don't use any of them. */ 94 return NULL; 95 page = tmp; 96 } 97 98 /* add end of list marker for the returned list */ 99 set_page_private(page, 0); 100 /* actual return value, and adjustment of head */ 101 page = *head; 102 *head = tmp; 103 return page; 104 } 105 106 /* may be used outside of locks to find the tail of a (usually short) 107 * "private" page chain, before adding it back to a global chain head 108 * with page_chain_add() under a spinlock. */ 109 static struct page *page_chain_tail(struct page *page, int *len) 110 { 111 struct page *tmp; 112 int i = 1; 113 while ((tmp = page_chain_next(page))) 114 ++i, page = tmp; 115 if (len) 116 *len = i; 117 return page; 118 } 119 120 static int page_chain_free(struct page *page) 121 { 122 struct page *tmp; 123 int i = 0; 124 page_chain_for_each_safe(page, tmp) { 125 put_page(page); 126 ++i; 127 } 128 return i; 129 } 130 131 static void page_chain_add(struct page **head, 132 struct page *chain_first, struct page *chain_last) 133 { 134 #if 1 135 struct page *tmp; 136 tmp = page_chain_tail(chain_first, NULL); 137 BUG_ON(tmp != chain_last); 138 #endif 139 140 /* add chain to head */ 141 set_page_private(chain_last, (unsigned long)*head); 142 *head = chain_first; 143 } 144 145 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number) 146 { 147 struct page *page = NULL; 148 struct page *tmp = NULL; 149 int i = 0; 150 151 /* Yes, testing drbd_pp_vacant outside the lock is racy. 152 * So what. It saves a spin_lock. */ 153 if (drbd_pp_vacant >= number) { 154 spin_lock(&drbd_pp_lock); 155 page = page_chain_del(&drbd_pp_pool, number); 156 if (page) 157 drbd_pp_vacant -= number; 158 spin_unlock(&drbd_pp_lock); 159 if (page) 160 return page; 161 } 162 163 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD 164 * "criss-cross" setup, that might cause write-out on some other DRBD, 165 * which in turn might block on the other node at this very place. */ 166 for (i = 0; i < number; i++) { 167 tmp = alloc_page(GFP_TRY); 168 if (!tmp) 169 break; 170 set_page_private(tmp, (unsigned long)page); 171 page = tmp; 172 } 173 174 if (i == number) 175 return page; 176 177 /* Not enough pages immediately available this time. 178 * No need to jump around here, drbd_pp_alloc will retry this 179 * function "soon". */ 180 if (page) { 181 tmp = page_chain_tail(page, NULL); 182 spin_lock(&drbd_pp_lock); 183 page_chain_add(&drbd_pp_pool, page, tmp); 184 drbd_pp_vacant += i; 185 spin_unlock(&drbd_pp_lock); 186 } 187 return NULL; 188 } 189 190 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed) 191 { 192 struct drbd_epoch_entry *e; 193 struct list_head *le, *tle; 194 195 /* The EEs are always appended to the end of the list. Since 196 they are sent in order over the wire, they have to finish 197 in order. As soon as we see the first not finished we can 198 stop to examine the list... */ 199 200 list_for_each_safe(le, tle, &mdev->net_ee) { 201 e = list_entry(le, struct drbd_epoch_entry, w.list); 202 if (drbd_ee_has_active_page(e)) 203 break; 204 list_move(le, to_be_freed); 205 } 206 } 207 208 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev) 209 { 210 LIST_HEAD(reclaimed); 211 struct drbd_epoch_entry *e, *t; 212 213 spin_lock_irq(&mdev->req_lock); 214 reclaim_net_ee(mdev, &reclaimed); 215 spin_unlock_irq(&mdev->req_lock); 216 217 list_for_each_entry_safe(e, t, &reclaimed, w.list) 218 drbd_free_net_ee(mdev, e); 219 } 220 221 /** 222 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled) 223 * @mdev: DRBD device. 224 * @number: number of pages requested 225 * @retry: whether to retry, if not enough pages are available right now 226 * 227 * Tries to allocate number pages, first from our own page pool, then from 228 * the kernel, unless this allocation would exceed the max_buffers setting. 229 * Possibly retry until DRBD frees sufficient pages somewhere else. 230 * 231 * Returns a page chain linked via page->private. 232 */ 233 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry) 234 { 235 struct page *page = NULL; 236 DEFINE_WAIT(wait); 237 238 /* Yes, we may run up to @number over max_buffers. If we 239 * follow it strictly, the admin will get it wrong anyways. */ 240 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) 241 page = drbd_pp_first_pages_or_try_alloc(mdev, number); 242 243 while (page == NULL) { 244 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE); 245 246 drbd_kick_lo_and_reclaim_net(mdev); 247 248 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) { 249 page = drbd_pp_first_pages_or_try_alloc(mdev, number); 250 if (page) 251 break; 252 } 253 254 if (!retry) 255 break; 256 257 if (signal_pending(current)) { 258 dev_warn(DEV, "drbd_pp_alloc interrupted!\n"); 259 break; 260 } 261 262 schedule(); 263 } 264 finish_wait(&drbd_pp_wait, &wait); 265 266 if (page) 267 atomic_add(number, &mdev->pp_in_use); 268 return page; 269 } 270 271 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc. 272 * Is also used from inside an other spin_lock_irq(&mdev->req_lock); 273 * Either links the page chain back to the global pool, 274 * or returns all pages to the system. */ 275 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net) 276 { 277 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use; 278 int i; 279 280 if (page == NULL) 281 return; 282 283 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count) 284 i = page_chain_free(page); 285 else { 286 struct page *tmp; 287 tmp = page_chain_tail(page, &i); 288 spin_lock(&drbd_pp_lock); 289 page_chain_add(&drbd_pp_pool, page, tmp); 290 drbd_pp_vacant += i; 291 spin_unlock(&drbd_pp_lock); 292 } 293 i = atomic_sub_return(i, a); 294 if (i < 0) 295 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n", 296 is_net ? "pp_in_use_by_net" : "pp_in_use", i); 297 wake_up(&drbd_pp_wait); 298 } 299 300 /* 301 You need to hold the req_lock: 302 _drbd_wait_ee_list_empty() 303 304 You must not have the req_lock: 305 drbd_free_ee() 306 drbd_alloc_ee() 307 drbd_init_ee() 308 drbd_release_ee() 309 drbd_ee_fix_bhs() 310 drbd_process_done_ee() 311 drbd_clear_done_ee() 312 drbd_wait_ee_list_empty() 313 */ 314 315 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev, 316 u64 id, 317 sector_t sector, 318 unsigned int data_size, 319 gfp_t gfp_mask) __must_hold(local) 320 { 321 struct drbd_epoch_entry *e; 322 struct page *page = NULL; 323 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT; 324 325 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE)) 326 return NULL; 327 328 e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM); 329 if (!e) { 330 if (!(gfp_mask & __GFP_NOWARN)) 331 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n"); 332 return NULL; 333 } 334 335 if (data_size) { 336 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT)); 337 if (!page) 338 goto fail; 339 } 340 341 INIT_HLIST_NODE(&e->collision); 342 e->epoch = NULL; 343 e->mdev = mdev; 344 e->pages = page; 345 atomic_set(&e->pending_bios, 0); 346 e->size = data_size; 347 e->flags = 0; 348 e->sector = sector; 349 e->block_id = id; 350 351 return e; 352 353 fail: 354 mempool_free(e, drbd_ee_mempool); 355 return NULL; 356 } 357 358 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net) 359 { 360 if (e->flags & EE_HAS_DIGEST) 361 kfree(e->digest); 362 drbd_pp_free(mdev, e->pages, is_net); 363 D_ASSERT(atomic_read(&e->pending_bios) == 0); 364 D_ASSERT(hlist_unhashed(&e->collision)); 365 mempool_free(e, drbd_ee_mempool); 366 } 367 368 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list) 369 { 370 LIST_HEAD(work_list); 371 struct drbd_epoch_entry *e, *t; 372 int count = 0; 373 int is_net = list == &mdev->net_ee; 374 375 spin_lock_irq(&mdev->req_lock); 376 list_splice_init(list, &work_list); 377 spin_unlock_irq(&mdev->req_lock); 378 379 list_for_each_entry_safe(e, t, &work_list, w.list) { 380 drbd_free_some_ee(mdev, e, is_net); 381 count++; 382 } 383 return count; 384 } 385 386 387 /* 388 * This function is called from _asender only_ 389 * but see also comments in _req_mod(,barrier_acked) 390 * and receive_Barrier. 391 * 392 * Move entries from net_ee to done_ee, if ready. 393 * Grab done_ee, call all callbacks, free the entries. 394 * The callbacks typically send out ACKs. 395 */ 396 static int drbd_process_done_ee(struct drbd_conf *mdev) 397 { 398 LIST_HEAD(work_list); 399 LIST_HEAD(reclaimed); 400 struct drbd_epoch_entry *e, *t; 401 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS); 402 403 spin_lock_irq(&mdev->req_lock); 404 reclaim_net_ee(mdev, &reclaimed); 405 list_splice_init(&mdev->done_ee, &work_list); 406 spin_unlock_irq(&mdev->req_lock); 407 408 list_for_each_entry_safe(e, t, &reclaimed, w.list) 409 drbd_free_net_ee(mdev, e); 410 411 /* possible callbacks here: 412 * e_end_block, and e_end_resync_block, e_send_discard_ack. 413 * all ignore the last argument. 414 */ 415 list_for_each_entry_safe(e, t, &work_list, w.list) { 416 /* list_del not necessary, next/prev members not touched */ 417 ok = e->w.cb(mdev, &e->w, !ok) && ok; 418 drbd_free_ee(mdev, e); 419 } 420 wake_up(&mdev->ee_wait); 421 422 return ok; 423 } 424 425 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head) 426 { 427 DEFINE_WAIT(wait); 428 429 /* avoids spin_lock/unlock 430 * and calling prepare_to_wait in the fast path */ 431 while (!list_empty(head)) { 432 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE); 433 spin_unlock_irq(&mdev->req_lock); 434 io_schedule(); 435 finish_wait(&mdev->ee_wait, &wait); 436 spin_lock_irq(&mdev->req_lock); 437 } 438 } 439 440 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head) 441 { 442 spin_lock_irq(&mdev->req_lock); 443 _drbd_wait_ee_list_empty(mdev, head); 444 spin_unlock_irq(&mdev->req_lock); 445 } 446 447 /* see also kernel_accept; which is only present since 2.6.18. 448 * also we want to log which part of it failed, exactly */ 449 static int drbd_accept(struct drbd_conf *mdev, const char **what, 450 struct socket *sock, struct socket **newsock) 451 { 452 struct sock *sk = sock->sk; 453 int err = 0; 454 455 *what = "listen"; 456 err = sock->ops->listen(sock, 5); 457 if (err < 0) 458 goto out; 459 460 *what = "sock_create_lite"; 461 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol, 462 newsock); 463 if (err < 0) 464 goto out; 465 466 *what = "accept"; 467 err = sock->ops->accept(sock, *newsock, 0); 468 if (err < 0) { 469 sock_release(*newsock); 470 *newsock = NULL; 471 goto out; 472 } 473 (*newsock)->ops = sock->ops; 474 __module_get((*newsock)->ops->owner); 475 476 out: 477 return err; 478 } 479 480 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock, 481 void *buf, size_t size, int flags) 482 { 483 mm_segment_t oldfs; 484 struct kvec iov = { 485 .iov_base = buf, 486 .iov_len = size, 487 }; 488 struct msghdr msg = { 489 .msg_iovlen = 1, 490 .msg_iov = (struct iovec *)&iov, 491 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL) 492 }; 493 int rv; 494 495 oldfs = get_fs(); 496 set_fs(KERNEL_DS); 497 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags); 498 set_fs(oldfs); 499 500 return rv; 501 } 502 503 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size) 504 { 505 mm_segment_t oldfs; 506 struct kvec iov = { 507 .iov_base = buf, 508 .iov_len = size, 509 }; 510 struct msghdr msg = { 511 .msg_iovlen = 1, 512 .msg_iov = (struct iovec *)&iov, 513 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL 514 }; 515 int rv; 516 517 oldfs = get_fs(); 518 set_fs(KERNEL_DS); 519 520 for (;;) { 521 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags); 522 if (rv == size) 523 break; 524 525 /* Note: 526 * ECONNRESET other side closed the connection 527 * ERESTARTSYS (on sock) we got a signal 528 */ 529 530 if (rv < 0) { 531 if (rv == -ECONNRESET) 532 dev_info(DEV, "sock was reset by peer\n"); 533 else if (rv != -ERESTARTSYS) 534 dev_err(DEV, "sock_recvmsg returned %d\n", rv); 535 break; 536 } else if (rv == 0) { 537 dev_info(DEV, "sock was shut down by peer\n"); 538 break; 539 } else { 540 /* signal came in, or peer/link went down, 541 * after we read a partial message 542 */ 543 /* D_ASSERT(signal_pending(current)); */ 544 break; 545 } 546 }; 547 548 set_fs(oldfs); 549 550 if (rv != size) 551 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE)); 552 553 return rv; 554 } 555 556 /* quoting tcp(7): 557 * On individual connections, the socket buffer size must be set prior to the 558 * listen(2) or connect(2) calls in order to have it take effect. 559 * This is our wrapper to do so. 560 */ 561 static void drbd_setbufsize(struct socket *sock, unsigned int snd, 562 unsigned int rcv) 563 { 564 /* open coded SO_SNDBUF, SO_RCVBUF */ 565 if (snd) { 566 sock->sk->sk_sndbuf = snd; 567 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK; 568 } 569 if (rcv) { 570 sock->sk->sk_rcvbuf = rcv; 571 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK; 572 } 573 } 574 575 static struct socket *drbd_try_connect(struct drbd_conf *mdev) 576 { 577 const char *what; 578 struct socket *sock; 579 struct sockaddr_in6 src_in6; 580 int err; 581 int disconnect_on_error = 1; 582 583 if (!get_net_conf(mdev)) 584 return NULL; 585 586 what = "sock_create_kern"; 587 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family, 588 SOCK_STREAM, IPPROTO_TCP, &sock); 589 if (err < 0) { 590 sock = NULL; 591 goto out; 592 } 593 594 sock->sk->sk_rcvtimeo = 595 sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ; 596 drbd_setbufsize(sock, mdev->net_conf->sndbuf_size, 597 mdev->net_conf->rcvbuf_size); 598 599 /* explicitly bind to the configured IP as source IP 600 * for the outgoing connections. 601 * This is needed for multihomed hosts and to be 602 * able to use lo: interfaces for drbd. 603 * Make sure to use 0 as port number, so linux selects 604 * a free one dynamically. 605 */ 606 memcpy(&src_in6, mdev->net_conf->my_addr, 607 min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6))); 608 if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6) 609 src_in6.sin6_port = 0; 610 else 611 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */ 612 613 what = "bind before connect"; 614 err = sock->ops->bind(sock, 615 (struct sockaddr *) &src_in6, 616 mdev->net_conf->my_addr_len); 617 if (err < 0) 618 goto out; 619 620 /* connect may fail, peer not yet available. 621 * stay C_WF_CONNECTION, don't go Disconnecting! */ 622 disconnect_on_error = 0; 623 what = "connect"; 624 err = sock->ops->connect(sock, 625 (struct sockaddr *)mdev->net_conf->peer_addr, 626 mdev->net_conf->peer_addr_len, 0); 627 628 out: 629 if (err < 0) { 630 if (sock) { 631 sock_release(sock); 632 sock = NULL; 633 } 634 switch (-err) { 635 /* timeout, busy, signal pending */ 636 case ETIMEDOUT: case EAGAIN: case EINPROGRESS: 637 case EINTR: case ERESTARTSYS: 638 /* peer not (yet) available, network problem */ 639 case ECONNREFUSED: case ENETUNREACH: 640 case EHOSTDOWN: case EHOSTUNREACH: 641 disconnect_on_error = 0; 642 break; 643 default: 644 dev_err(DEV, "%s failed, err = %d\n", what, err); 645 } 646 if (disconnect_on_error) 647 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 648 } 649 put_net_conf(mdev); 650 return sock; 651 } 652 653 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev) 654 { 655 int timeo, err; 656 struct socket *s_estab = NULL, *s_listen; 657 const char *what; 658 659 if (!get_net_conf(mdev)) 660 return NULL; 661 662 what = "sock_create_kern"; 663 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family, 664 SOCK_STREAM, IPPROTO_TCP, &s_listen); 665 if (err) { 666 s_listen = NULL; 667 goto out; 668 } 669 670 timeo = mdev->net_conf->try_connect_int * HZ; 671 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */ 672 673 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 674 s_listen->sk->sk_rcvtimeo = timeo; 675 s_listen->sk->sk_sndtimeo = timeo; 676 drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size, 677 mdev->net_conf->rcvbuf_size); 678 679 what = "bind before listen"; 680 err = s_listen->ops->bind(s_listen, 681 (struct sockaddr *) mdev->net_conf->my_addr, 682 mdev->net_conf->my_addr_len); 683 if (err < 0) 684 goto out; 685 686 err = drbd_accept(mdev, &what, s_listen, &s_estab); 687 688 out: 689 if (s_listen) 690 sock_release(s_listen); 691 if (err < 0) { 692 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) { 693 dev_err(DEV, "%s failed, err = %d\n", what, err); 694 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 695 } 696 } 697 put_net_conf(mdev); 698 699 return s_estab; 700 } 701 702 static int drbd_send_fp(struct drbd_conf *mdev, 703 struct socket *sock, enum drbd_packets cmd) 704 { 705 struct p_header80 *h = &mdev->data.sbuf.header.h80; 706 707 return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0); 708 } 709 710 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock) 711 { 712 struct p_header80 *h = &mdev->data.rbuf.header.h80; 713 int rr; 714 715 rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0); 716 717 if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC) 718 return be16_to_cpu(h->command); 719 720 return 0xffff; 721 } 722 723 /** 724 * drbd_socket_okay() - Free the socket if its connection is not okay 725 * @mdev: DRBD device. 726 * @sock: pointer to the pointer to the socket. 727 */ 728 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock) 729 { 730 int rr; 731 char tb[4]; 732 733 if (!*sock) 734 return false; 735 736 rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK); 737 738 if (rr > 0 || rr == -EAGAIN) { 739 return true; 740 } else { 741 sock_release(*sock); 742 *sock = NULL; 743 return false; 744 } 745 } 746 747 /* 748 * return values: 749 * 1 yes, we have a valid connection 750 * 0 oops, did not work out, please try again 751 * -1 peer talks different language, 752 * no point in trying again, please go standalone. 753 * -2 We do not have a network config... 754 */ 755 static int drbd_connect(struct drbd_conf *mdev) 756 { 757 struct socket *s, *sock, *msock; 758 int try, h, ok; 759 enum drbd_state_rv rv; 760 761 D_ASSERT(!mdev->data.socket); 762 763 if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS) 764 return -2; 765 766 clear_bit(DISCARD_CONCURRENT, &mdev->flags); 767 768 sock = NULL; 769 msock = NULL; 770 771 do { 772 for (try = 0;;) { 773 /* 3 tries, this should take less than a second! */ 774 s = drbd_try_connect(mdev); 775 if (s || ++try >= 3) 776 break; 777 /* give the other side time to call bind() & listen() */ 778 schedule_timeout_interruptible(HZ / 10); 779 } 780 781 if (s) { 782 if (!sock) { 783 drbd_send_fp(mdev, s, P_HAND_SHAKE_S); 784 sock = s; 785 s = NULL; 786 } else if (!msock) { 787 drbd_send_fp(mdev, s, P_HAND_SHAKE_M); 788 msock = s; 789 s = NULL; 790 } else { 791 dev_err(DEV, "Logic error in drbd_connect()\n"); 792 goto out_release_sockets; 793 } 794 } 795 796 if (sock && msock) { 797 schedule_timeout_interruptible(mdev->net_conf->ping_timeo*HZ/10); 798 ok = drbd_socket_okay(mdev, &sock); 799 ok = drbd_socket_okay(mdev, &msock) && ok; 800 if (ok) 801 break; 802 } 803 804 retry: 805 s = drbd_wait_for_connect(mdev); 806 if (s) { 807 try = drbd_recv_fp(mdev, s); 808 drbd_socket_okay(mdev, &sock); 809 drbd_socket_okay(mdev, &msock); 810 switch (try) { 811 case P_HAND_SHAKE_S: 812 if (sock) { 813 dev_warn(DEV, "initial packet S crossed\n"); 814 sock_release(sock); 815 } 816 sock = s; 817 break; 818 case P_HAND_SHAKE_M: 819 if (msock) { 820 dev_warn(DEV, "initial packet M crossed\n"); 821 sock_release(msock); 822 } 823 msock = s; 824 set_bit(DISCARD_CONCURRENT, &mdev->flags); 825 break; 826 default: 827 dev_warn(DEV, "Error receiving initial packet\n"); 828 sock_release(s); 829 if (random32() & 1) 830 goto retry; 831 } 832 } 833 834 if (mdev->state.conn <= C_DISCONNECTING) 835 goto out_release_sockets; 836 if (signal_pending(current)) { 837 flush_signals(current); 838 smp_rmb(); 839 if (get_t_state(&mdev->receiver) == Exiting) 840 goto out_release_sockets; 841 } 842 843 if (sock && msock) { 844 ok = drbd_socket_okay(mdev, &sock); 845 ok = drbd_socket_okay(mdev, &msock) && ok; 846 if (ok) 847 break; 848 } 849 } while (1); 850 851 msock->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 852 sock->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 853 854 sock->sk->sk_allocation = GFP_NOIO; 855 msock->sk->sk_allocation = GFP_NOIO; 856 857 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK; 858 msock->sk->sk_priority = TC_PRIO_INTERACTIVE; 859 860 /* NOT YET ... 861 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10; 862 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; 863 * first set it to the P_HAND_SHAKE timeout, 864 * which we set to 4x the configured ping_timeout. */ 865 sock->sk->sk_sndtimeo = 866 sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10; 867 868 msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10; 869 msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ; 870 871 /* we don't want delays. 872 * we use TCP_CORK where appropriate, though */ 873 drbd_tcp_nodelay(sock); 874 drbd_tcp_nodelay(msock); 875 876 mdev->data.socket = sock; 877 mdev->meta.socket = msock; 878 mdev->last_received = jiffies; 879 880 D_ASSERT(mdev->asender.task == NULL); 881 882 h = drbd_do_handshake(mdev); 883 if (h <= 0) 884 return h; 885 886 if (mdev->cram_hmac_tfm) { 887 /* drbd_request_state(mdev, NS(conn, WFAuth)); */ 888 switch (drbd_do_auth(mdev)) { 889 case -1: 890 dev_err(DEV, "Authentication of peer failed\n"); 891 return -1; 892 case 0: 893 dev_err(DEV, "Authentication of peer failed, trying again.\n"); 894 return 0; 895 } 896 } 897 898 sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10; 899 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; 900 901 atomic_set(&mdev->packet_seq, 0); 902 mdev->peer_seq = 0; 903 904 if (drbd_send_protocol(mdev) == -1) 905 return -1; 906 set_bit(STATE_SENT, &mdev->flags); 907 drbd_send_sync_param(mdev, &mdev->sync_conf); 908 drbd_send_sizes(mdev, 0, 0); 909 drbd_send_uuids(mdev); 910 drbd_send_current_state(mdev); 911 clear_bit(USE_DEGR_WFC_T, &mdev->flags); 912 clear_bit(RESIZE_PENDING, &mdev->flags); 913 914 spin_lock_irq(&mdev->req_lock); 915 rv = _drbd_set_state(_NS(mdev, conn, C_WF_REPORT_PARAMS), CS_VERBOSE, NULL); 916 if (mdev->state.conn != C_WF_REPORT_PARAMS) 917 clear_bit(STATE_SENT, &mdev->flags); 918 spin_unlock_irq(&mdev->req_lock); 919 920 if (rv < SS_SUCCESS) 921 return 0; 922 923 drbd_thread_start(&mdev->asender); 924 mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */ 925 926 return 1; 927 928 out_release_sockets: 929 if (sock) 930 sock_release(sock); 931 if (msock) 932 sock_release(msock); 933 return -1; 934 } 935 936 static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size) 937 { 938 union p_header *h = &mdev->data.rbuf.header; 939 int r; 940 941 r = drbd_recv(mdev, h, sizeof(*h)); 942 if (unlikely(r != sizeof(*h))) { 943 if (!signal_pending(current)) 944 dev_warn(DEV, "short read expecting header on sock: r=%d\n", r); 945 return false; 946 } 947 948 if (likely(h->h80.magic == BE_DRBD_MAGIC)) { 949 *cmd = be16_to_cpu(h->h80.command); 950 *packet_size = be16_to_cpu(h->h80.length); 951 } else if (h->h95.magic == BE_DRBD_MAGIC_BIG) { 952 *cmd = be16_to_cpu(h->h95.command); 953 *packet_size = be32_to_cpu(h->h95.length); 954 } else { 955 dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n", 956 be32_to_cpu(h->h80.magic), 957 be16_to_cpu(h->h80.command), 958 be16_to_cpu(h->h80.length)); 959 return false; 960 } 961 mdev->last_received = jiffies; 962 963 return true; 964 } 965 966 static void drbd_flush(struct drbd_conf *mdev) 967 { 968 int rv; 969 970 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) { 971 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL, 972 NULL); 973 if (rv) { 974 dev_info(DEV, "local disk flush failed with status %d\n", rv); 975 /* would rather check on EOPNOTSUPP, but that is not reliable. 976 * don't try again for ANY return value != 0 977 * if (rv == -EOPNOTSUPP) */ 978 drbd_bump_write_ordering(mdev, WO_drain_io); 979 } 980 put_ldev(mdev); 981 } 982 } 983 984 /** 985 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it. 986 * @mdev: DRBD device. 987 * @epoch: Epoch object. 988 * @ev: Epoch event. 989 */ 990 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev, 991 struct drbd_epoch *epoch, 992 enum epoch_event ev) 993 { 994 int epoch_size; 995 struct drbd_epoch *next_epoch; 996 enum finish_epoch rv = FE_STILL_LIVE; 997 998 spin_lock(&mdev->epoch_lock); 999 do { 1000 next_epoch = NULL; 1001 1002 epoch_size = atomic_read(&epoch->epoch_size); 1003 1004 switch (ev & ~EV_CLEANUP) { 1005 case EV_PUT: 1006 atomic_dec(&epoch->active); 1007 break; 1008 case EV_GOT_BARRIER_NR: 1009 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags); 1010 break; 1011 case EV_BECAME_LAST: 1012 /* nothing to do*/ 1013 break; 1014 } 1015 1016 if (epoch_size != 0 && 1017 atomic_read(&epoch->active) == 0 && 1018 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) { 1019 if (!(ev & EV_CLEANUP)) { 1020 spin_unlock(&mdev->epoch_lock); 1021 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size); 1022 spin_lock(&mdev->epoch_lock); 1023 } 1024 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) 1025 dec_unacked(mdev); 1026 1027 if (mdev->current_epoch != epoch) { 1028 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list); 1029 list_del(&epoch->list); 1030 ev = EV_BECAME_LAST | (ev & EV_CLEANUP); 1031 mdev->epochs--; 1032 kfree(epoch); 1033 1034 if (rv == FE_STILL_LIVE) 1035 rv = FE_DESTROYED; 1036 } else { 1037 epoch->flags = 0; 1038 atomic_set(&epoch->epoch_size, 0); 1039 /* atomic_set(&epoch->active, 0); is already zero */ 1040 if (rv == FE_STILL_LIVE) 1041 rv = FE_RECYCLED; 1042 wake_up(&mdev->ee_wait); 1043 } 1044 } 1045 1046 if (!next_epoch) 1047 break; 1048 1049 epoch = next_epoch; 1050 } while (1); 1051 1052 spin_unlock(&mdev->epoch_lock); 1053 1054 return rv; 1055 } 1056 1057 /** 1058 * drbd_bump_write_ordering() - Fall back to an other write ordering method 1059 * @mdev: DRBD device. 1060 * @wo: Write ordering method to try. 1061 */ 1062 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local) 1063 { 1064 enum write_ordering_e pwo; 1065 static char *write_ordering_str[] = { 1066 [WO_none] = "none", 1067 [WO_drain_io] = "drain", 1068 [WO_bdev_flush] = "flush", 1069 }; 1070 1071 pwo = mdev->write_ordering; 1072 wo = min(pwo, wo); 1073 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush) 1074 wo = WO_drain_io; 1075 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain) 1076 wo = WO_none; 1077 mdev->write_ordering = wo; 1078 if (pwo != mdev->write_ordering || wo == WO_bdev_flush) 1079 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]); 1080 } 1081 1082 /** 1083 * drbd_submit_ee() 1084 * @mdev: DRBD device. 1085 * @e: epoch entry 1086 * @rw: flag field, see bio->bi_rw 1087 * 1088 * May spread the pages to multiple bios, 1089 * depending on bio_add_page restrictions. 1090 * 1091 * Returns 0 if all bios have been submitted, 1092 * -ENOMEM if we could not allocate enough bios, 1093 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a 1094 * single page to an empty bio (which should never happen and likely indicates 1095 * that the lower level IO stack is in some way broken). This has been observed 1096 * on certain Xen deployments. 1097 */ 1098 /* TODO allocate from our own bio_set. */ 1099 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, 1100 const unsigned rw, const int fault_type) 1101 { 1102 struct bio *bios = NULL; 1103 struct bio *bio; 1104 struct page *page = e->pages; 1105 sector_t sector = e->sector; 1106 unsigned ds = e->size; 1107 unsigned n_bios = 0; 1108 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT; 1109 int err = -ENOMEM; 1110 1111 /* In most cases, we will only need one bio. But in case the lower 1112 * level restrictions happen to be different at this offset on this 1113 * side than those of the sending peer, we may need to submit the 1114 * request in more than one bio. 1115 * 1116 * Plain bio_alloc is good enough here, this is no DRBD internally 1117 * generated bio, but a bio allocated on behalf of the peer. 1118 */ 1119 next_bio: 1120 bio = bio_alloc(GFP_NOIO, nr_pages); 1121 if (!bio) { 1122 dev_err(DEV, "submit_ee: Allocation of a bio failed\n"); 1123 goto fail; 1124 } 1125 /* > e->sector, unless this is the first bio */ 1126 bio->bi_sector = sector; 1127 bio->bi_bdev = mdev->ldev->backing_bdev; 1128 bio->bi_rw = rw; 1129 bio->bi_private = e; 1130 bio->bi_end_io = drbd_endio_sec; 1131 1132 bio->bi_next = bios; 1133 bios = bio; 1134 ++n_bios; 1135 1136 page_chain_for_each(page) { 1137 unsigned len = min_t(unsigned, ds, PAGE_SIZE); 1138 if (!bio_add_page(bio, page, len, 0)) { 1139 /* A single page must always be possible! 1140 * But in case it fails anyways, 1141 * we deal with it, and complain (below). */ 1142 if (bio->bi_vcnt == 0) { 1143 dev_err(DEV, 1144 "bio_add_page failed for len=%u, " 1145 "bi_vcnt=0 (bi_sector=%llu)\n", 1146 len, (unsigned long long)bio->bi_sector); 1147 err = -ENOSPC; 1148 goto fail; 1149 } 1150 goto next_bio; 1151 } 1152 ds -= len; 1153 sector += len >> 9; 1154 --nr_pages; 1155 } 1156 D_ASSERT(page == NULL); 1157 D_ASSERT(ds == 0); 1158 1159 atomic_set(&e->pending_bios, n_bios); 1160 do { 1161 bio = bios; 1162 bios = bios->bi_next; 1163 bio->bi_next = NULL; 1164 1165 drbd_generic_make_request(mdev, fault_type, bio); 1166 } while (bios); 1167 return 0; 1168 1169 fail: 1170 while (bios) { 1171 bio = bios; 1172 bios = bios->bi_next; 1173 bio_put(bio); 1174 } 1175 return err; 1176 } 1177 1178 static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) 1179 { 1180 int rv; 1181 struct p_barrier *p = &mdev->data.rbuf.barrier; 1182 struct drbd_epoch *epoch; 1183 1184 inc_unacked(mdev); 1185 1186 mdev->current_epoch->barrier_nr = p->barrier; 1187 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR); 1188 1189 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from 1190 * the activity log, which means it would not be resynced in case the 1191 * R_PRIMARY crashes now. 1192 * Therefore we must send the barrier_ack after the barrier request was 1193 * completed. */ 1194 switch (mdev->write_ordering) { 1195 case WO_none: 1196 if (rv == FE_RECYCLED) 1197 return true; 1198 1199 /* receiver context, in the writeout path of the other node. 1200 * avoid potential distributed deadlock */ 1201 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO); 1202 if (epoch) 1203 break; 1204 else 1205 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n"); 1206 /* Fall through */ 1207 1208 case WO_bdev_flush: 1209 case WO_drain_io: 1210 drbd_wait_ee_list_empty(mdev, &mdev->active_ee); 1211 drbd_flush(mdev); 1212 1213 if (atomic_read(&mdev->current_epoch->epoch_size)) { 1214 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO); 1215 if (epoch) 1216 break; 1217 } 1218 1219 epoch = mdev->current_epoch; 1220 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0); 1221 1222 D_ASSERT(atomic_read(&epoch->active) == 0); 1223 D_ASSERT(epoch->flags == 0); 1224 1225 return true; 1226 default: 1227 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering); 1228 return false; 1229 } 1230 1231 epoch->flags = 0; 1232 atomic_set(&epoch->epoch_size, 0); 1233 atomic_set(&epoch->active, 0); 1234 1235 spin_lock(&mdev->epoch_lock); 1236 if (atomic_read(&mdev->current_epoch->epoch_size)) { 1237 list_add(&epoch->list, &mdev->current_epoch->list); 1238 mdev->current_epoch = epoch; 1239 mdev->epochs++; 1240 } else { 1241 /* The current_epoch got recycled while we allocated this one... */ 1242 kfree(epoch); 1243 } 1244 spin_unlock(&mdev->epoch_lock); 1245 1246 return true; 1247 } 1248 1249 /* used from receive_RSDataReply (recv_resync_read) 1250 * and from receive_Data */ 1251 static struct drbd_epoch_entry * 1252 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local) 1253 { 1254 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 1255 struct drbd_epoch_entry *e; 1256 struct page *page; 1257 int dgs, ds, rr; 1258 void *dig_in = mdev->int_dig_in; 1259 void *dig_vv = mdev->int_dig_vv; 1260 unsigned long *data; 1261 1262 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ? 1263 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0; 1264 1265 if (dgs) { 1266 rr = drbd_recv(mdev, dig_in, dgs); 1267 if (rr != dgs) { 1268 if (!signal_pending(current)) 1269 dev_warn(DEV, 1270 "short read receiving data digest: read %d expected %d\n", 1271 rr, dgs); 1272 return NULL; 1273 } 1274 } 1275 1276 data_size -= dgs; 1277 1278 ERR_IF(data_size & 0x1ff) return NULL; 1279 ERR_IF(data_size > DRBD_MAX_BIO_SIZE) return NULL; 1280 1281 /* even though we trust out peer, 1282 * we sometimes have to double check. */ 1283 if (sector + (data_size>>9) > capacity) { 1284 dev_err(DEV, "request from peer beyond end of local disk: " 1285 "capacity: %llus < sector: %llus + size: %u\n", 1286 (unsigned long long)capacity, 1287 (unsigned long long)sector, data_size); 1288 return NULL; 1289 } 1290 1291 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD 1292 * "criss-cross" setup, that might cause write-out on some other DRBD, 1293 * which in turn might block on the other node at this very place. */ 1294 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO); 1295 if (!e) 1296 return NULL; 1297 1298 if (!data_size) 1299 return e; 1300 1301 ds = data_size; 1302 page = e->pages; 1303 page_chain_for_each(page) { 1304 unsigned len = min_t(int, ds, PAGE_SIZE); 1305 data = kmap(page); 1306 rr = drbd_recv(mdev, data, len); 1307 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) { 1308 dev_err(DEV, "Fault injection: Corrupting data on receive\n"); 1309 data[0] = data[0] ^ (unsigned long)-1; 1310 } 1311 kunmap(page); 1312 if (rr != len) { 1313 drbd_free_ee(mdev, e); 1314 if (!signal_pending(current)) 1315 dev_warn(DEV, "short read receiving data: read %d expected %d\n", 1316 rr, len); 1317 return NULL; 1318 } 1319 ds -= rr; 1320 } 1321 1322 if (dgs) { 1323 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv); 1324 if (memcmp(dig_in, dig_vv, dgs)) { 1325 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n", 1326 (unsigned long long)sector, data_size); 1327 drbd_bcast_ee(mdev, "digest failed", 1328 dgs, dig_in, dig_vv, e); 1329 drbd_free_ee(mdev, e); 1330 return NULL; 1331 } 1332 } 1333 mdev->recv_cnt += data_size>>9; 1334 return e; 1335 } 1336 1337 /* drbd_drain_block() just takes a data block 1338 * out of the socket input buffer, and discards it. 1339 */ 1340 static int drbd_drain_block(struct drbd_conf *mdev, int data_size) 1341 { 1342 struct page *page; 1343 int rr, rv = 1; 1344 void *data; 1345 1346 if (!data_size) 1347 return true; 1348 1349 page = drbd_pp_alloc(mdev, 1, 1); 1350 1351 data = kmap(page); 1352 while (data_size) { 1353 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE)); 1354 if (rr != min_t(int, data_size, PAGE_SIZE)) { 1355 rv = 0; 1356 if (!signal_pending(current)) 1357 dev_warn(DEV, 1358 "short read receiving data: read %d expected %d\n", 1359 rr, min_t(int, data_size, PAGE_SIZE)); 1360 break; 1361 } 1362 data_size -= rr; 1363 } 1364 kunmap(page); 1365 drbd_pp_free(mdev, page, 0); 1366 return rv; 1367 } 1368 1369 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req, 1370 sector_t sector, int data_size) 1371 { 1372 struct bio_vec *bvec; 1373 struct bio *bio; 1374 int dgs, rr, i, expect; 1375 void *dig_in = mdev->int_dig_in; 1376 void *dig_vv = mdev->int_dig_vv; 1377 1378 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ? 1379 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0; 1380 1381 if (dgs) { 1382 rr = drbd_recv(mdev, dig_in, dgs); 1383 if (rr != dgs) { 1384 if (!signal_pending(current)) 1385 dev_warn(DEV, 1386 "short read receiving data reply digest: read %d expected %d\n", 1387 rr, dgs); 1388 return 0; 1389 } 1390 } 1391 1392 data_size -= dgs; 1393 1394 /* optimistically update recv_cnt. if receiving fails below, 1395 * we disconnect anyways, and counters will be reset. */ 1396 mdev->recv_cnt += data_size>>9; 1397 1398 bio = req->master_bio; 1399 D_ASSERT(sector == bio->bi_sector); 1400 1401 bio_for_each_segment(bvec, bio, i) { 1402 expect = min_t(int, data_size, bvec->bv_len); 1403 rr = drbd_recv(mdev, 1404 kmap(bvec->bv_page)+bvec->bv_offset, 1405 expect); 1406 kunmap(bvec->bv_page); 1407 if (rr != expect) { 1408 if (!signal_pending(current)) 1409 dev_warn(DEV, "short read receiving data reply: " 1410 "read %d expected %d\n", 1411 rr, expect); 1412 return 0; 1413 } 1414 data_size -= rr; 1415 } 1416 1417 if (dgs) { 1418 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv); 1419 if (memcmp(dig_in, dig_vv, dgs)) { 1420 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n"); 1421 return 0; 1422 } 1423 } 1424 1425 D_ASSERT(data_size == 0); 1426 return 1; 1427 } 1428 1429 /* e_end_resync_block() is called via 1430 * drbd_process_done_ee() by asender only */ 1431 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused) 1432 { 1433 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w; 1434 sector_t sector = e->sector; 1435 int ok; 1436 1437 D_ASSERT(hlist_unhashed(&e->collision)); 1438 1439 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 1440 drbd_set_in_sync(mdev, sector, e->size); 1441 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e); 1442 } else { 1443 /* Record failure to sync */ 1444 drbd_rs_failed_io(mdev, sector, e->size); 1445 1446 ok = drbd_send_ack(mdev, P_NEG_ACK, e); 1447 } 1448 dec_unacked(mdev); 1449 1450 return ok; 1451 } 1452 1453 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local) 1454 { 1455 struct drbd_epoch_entry *e; 1456 1457 e = read_in_block(mdev, ID_SYNCER, sector, data_size); 1458 if (!e) 1459 goto fail; 1460 1461 dec_rs_pending(mdev); 1462 1463 inc_unacked(mdev); 1464 /* corresponding dec_unacked() in e_end_resync_block() 1465 * respective _drbd_clear_done_ee */ 1466 1467 e->w.cb = e_end_resync_block; 1468 1469 spin_lock_irq(&mdev->req_lock); 1470 list_add(&e->w.list, &mdev->sync_ee); 1471 spin_unlock_irq(&mdev->req_lock); 1472 1473 atomic_add(data_size >> 9, &mdev->rs_sect_ev); 1474 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0) 1475 return true; 1476 1477 /* don't care for the reason here */ 1478 dev_err(DEV, "submit failed, triggering re-connect\n"); 1479 spin_lock_irq(&mdev->req_lock); 1480 list_del(&e->w.list); 1481 spin_unlock_irq(&mdev->req_lock); 1482 1483 drbd_free_ee(mdev, e); 1484 fail: 1485 put_ldev(mdev); 1486 return false; 1487 } 1488 1489 static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) 1490 { 1491 struct drbd_request *req; 1492 sector_t sector; 1493 int ok; 1494 struct p_data *p = &mdev->data.rbuf.data; 1495 1496 sector = be64_to_cpu(p->sector); 1497 1498 spin_lock_irq(&mdev->req_lock); 1499 req = _ar_id_to_req(mdev, p->block_id, sector); 1500 spin_unlock_irq(&mdev->req_lock); 1501 if (unlikely(!req)) { 1502 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n"); 1503 return false; 1504 } 1505 1506 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid 1507 * special casing it there for the various failure cases. 1508 * still no race with drbd_fail_pending_reads */ 1509 ok = recv_dless_read(mdev, req, sector, data_size); 1510 1511 if (ok) 1512 req_mod(req, data_received); 1513 /* else: nothing. handled from drbd_disconnect... 1514 * I don't think we may complete this just yet 1515 * in case we are "on-disconnect: freeze" */ 1516 1517 return ok; 1518 } 1519 1520 static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) 1521 { 1522 sector_t sector; 1523 int ok; 1524 struct p_data *p = &mdev->data.rbuf.data; 1525 1526 sector = be64_to_cpu(p->sector); 1527 D_ASSERT(p->block_id == ID_SYNCER); 1528 1529 if (get_ldev(mdev)) { 1530 /* data is submitted to disk within recv_resync_read. 1531 * corresponding put_ldev done below on error, 1532 * or in drbd_endio_write_sec. */ 1533 ok = recv_resync_read(mdev, sector, data_size); 1534 } else { 1535 if (__ratelimit(&drbd_ratelimit_state)) 1536 dev_err(DEV, "Can not write resync data to local disk.\n"); 1537 1538 ok = drbd_drain_block(mdev, data_size); 1539 1540 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size); 1541 } 1542 1543 atomic_add(data_size >> 9, &mdev->rs_sect_in); 1544 1545 return ok; 1546 } 1547 1548 /* e_end_block() is called via drbd_process_done_ee(). 1549 * this means this function only runs in the asender thread 1550 */ 1551 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1552 { 1553 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w; 1554 sector_t sector = e->sector; 1555 int ok = 1, pcmd; 1556 1557 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) { 1558 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 1559 pcmd = (mdev->state.conn >= C_SYNC_SOURCE && 1560 mdev->state.conn <= C_PAUSED_SYNC_T && 1561 e->flags & EE_MAY_SET_IN_SYNC) ? 1562 P_RS_WRITE_ACK : P_WRITE_ACK; 1563 ok &= drbd_send_ack(mdev, pcmd, e); 1564 if (pcmd == P_RS_WRITE_ACK) 1565 drbd_set_in_sync(mdev, sector, e->size); 1566 } else { 1567 ok = drbd_send_ack(mdev, P_NEG_ACK, e); 1568 /* we expect it to be marked out of sync anyways... 1569 * maybe assert this? */ 1570 } 1571 dec_unacked(mdev); 1572 } 1573 /* we delete from the conflict detection hash _after_ we sent out the 1574 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */ 1575 if (mdev->net_conf->two_primaries) { 1576 spin_lock_irq(&mdev->req_lock); 1577 D_ASSERT(!hlist_unhashed(&e->collision)); 1578 hlist_del_init(&e->collision); 1579 spin_unlock_irq(&mdev->req_lock); 1580 } else { 1581 D_ASSERT(hlist_unhashed(&e->collision)); 1582 } 1583 1584 drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0)); 1585 1586 return ok; 1587 } 1588 1589 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused) 1590 { 1591 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w; 1592 int ok = 1; 1593 1594 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C); 1595 ok = drbd_send_ack(mdev, P_DISCARD_ACK, e); 1596 1597 spin_lock_irq(&mdev->req_lock); 1598 D_ASSERT(!hlist_unhashed(&e->collision)); 1599 hlist_del_init(&e->collision); 1600 spin_unlock_irq(&mdev->req_lock); 1601 1602 dec_unacked(mdev); 1603 1604 return ok; 1605 } 1606 1607 static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_epoch_entry *data_e) 1608 { 1609 1610 struct drbd_epoch_entry *rs_e; 1611 bool rv = 0; 1612 1613 spin_lock_irq(&mdev->req_lock); 1614 list_for_each_entry(rs_e, &mdev->sync_ee, w.list) { 1615 if (overlaps(data_e->sector, data_e->size, rs_e->sector, rs_e->size)) { 1616 rv = 1; 1617 break; 1618 } 1619 } 1620 spin_unlock_irq(&mdev->req_lock); 1621 1622 return rv; 1623 } 1624 1625 /* Called from receive_Data. 1626 * Synchronize packets on sock with packets on msock. 1627 * 1628 * This is here so even when a P_DATA packet traveling via sock overtook an Ack 1629 * packet traveling on msock, they are still processed in the order they have 1630 * been sent. 1631 * 1632 * Note: we don't care for Ack packets overtaking P_DATA packets. 1633 * 1634 * In case packet_seq is larger than mdev->peer_seq number, there are 1635 * outstanding packets on the msock. We wait for them to arrive. 1636 * In case we are the logically next packet, we update mdev->peer_seq 1637 * ourselves. Correctly handles 32bit wrap around. 1638 * 1639 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second, 1640 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds 1641 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have 1642 * 1<<9 == 512 seconds aka ages for the 32bit wrap around... 1643 * 1644 * returns 0 if we may process the packet, 1645 * -ERESTARTSYS if we were interrupted (by disconnect signal). */ 1646 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq) 1647 { 1648 DEFINE_WAIT(wait); 1649 unsigned int p_seq; 1650 long timeout; 1651 int ret = 0; 1652 spin_lock(&mdev->peer_seq_lock); 1653 for (;;) { 1654 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE); 1655 if (seq_le(packet_seq, mdev->peer_seq+1)) 1656 break; 1657 if (signal_pending(current)) { 1658 ret = -ERESTARTSYS; 1659 break; 1660 } 1661 p_seq = mdev->peer_seq; 1662 spin_unlock(&mdev->peer_seq_lock); 1663 timeout = schedule_timeout(30*HZ); 1664 spin_lock(&mdev->peer_seq_lock); 1665 if (timeout == 0 && p_seq == mdev->peer_seq) { 1666 ret = -ETIMEDOUT; 1667 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n"); 1668 break; 1669 } 1670 } 1671 finish_wait(&mdev->seq_wait, &wait); 1672 if (mdev->peer_seq+1 == packet_seq) 1673 mdev->peer_seq++; 1674 spin_unlock(&mdev->peer_seq_lock); 1675 return ret; 1676 } 1677 1678 /* see also bio_flags_to_wire() 1679 * DRBD_REQ_*, because we need to semantically map the flags to data packet 1680 * flags and back. We may replicate to other kernel versions. */ 1681 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf) 1682 { 1683 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) | 1684 (dpf & DP_FUA ? REQ_FUA : 0) | 1685 (dpf & DP_FLUSH ? REQ_FLUSH : 0) | 1686 (dpf & DP_DISCARD ? REQ_DISCARD : 0); 1687 } 1688 1689 /* mirrored write */ 1690 static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) 1691 { 1692 sector_t sector; 1693 struct drbd_epoch_entry *e; 1694 struct p_data *p = &mdev->data.rbuf.data; 1695 int rw = WRITE; 1696 u32 dp_flags; 1697 1698 if (!get_ldev(mdev)) { 1699 spin_lock(&mdev->peer_seq_lock); 1700 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num)) 1701 mdev->peer_seq++; 1702 spin_unlock(&mdev->peer_seq_lock); 1703 1704 drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size); 1705 atomic_inc(&mdev->current_epoch->epoch_size); 1706 return drbd_drain_block(mdev, data_size); 1707 } 1708 1709 /* get_ldev(mdev) successful. 1710 * Corresponding put_ldev done either below (on various errors), 1711 * or in drbd_endio_write_sec, if we successfully submit the data at 1712 * the end of this function. */ 1713 1714 sector = be64_to_cpu(p->sector); 1715 e = read_in_block(mdev, p->block_id, sector, data_size); 1716 if (!e) { 1717 put_ldev(mdev); 1718 return false; 1719 } 1720 1721 e->w.cb = e_end_block; 1722 1723 dp_flags = be32_to_cpu(p->dp_flags); 1724 rw |= wire_flags_to_bio(mdev, dp_flags); 1725 if (e->pages == NULL) { 1726 D_ASSERT(e->size == 0); 1727 D_ASSERT(dp_flags & DP_FLUSH); 1728 } 1729 1730 if (dp_flags & DP_MAY_SET_IN_SYNC) 1731 e->flags |= EE_MAY_SET_IN_SYNC; 1732 1733 spin_lock(&mdev->epoch_lock); 1734 e->epoch = mdev->current_epoch; 1735 atomic_inc(&e->epoch->epoch_size); 1736 atomic_inc(&e->epoch->active); 1737 spin_unlock(&mdev->epoch_lock); 1738 1739 /* I'm the receiver, I do hold a net_cnt reference. */ 1740 if (!mdev->net_conf->two_primaries) { 1741 spin_lock_irq(&mdev->req_lock); 1742 } else { 1743 /* don't get the req_lock yet, 1744 * we may sleep in drbd_wait_peer_seq */ 1745 const int size = e->size; 1746 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags); 1747 DEFINE_WAIT(wait); 1748 struct drbd_request *i; 1749 struct hlist_node *n; 1750 struct hlist_head *slot; 1751 int first; 1752 1753 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C); 1754 BUG_ON(mdev->ee_hash == NULL); 1755 BUG_ON(mdev->tl_hash == NULL); 1756 1757 /* conflict detection and handling: 1758 * 1. wait on the sequence number, 1759 * in case this data packet overtook ACK packets. 1760 * 2. check our hash tables for conflicting requests. 1761 * we only need to walk the tl_hash, since an ee can not 1762 * have a conflict with an other ee: on the submitting 1763 * node, the corresponding req had already been conflicting, 1764 * and a conflicting req is never sent. 1765 * 1766 * Note: for two_primaries, we are protocol C, 1767 * so there cannot be any request that is DONE 1768 * but still on the transfer log. 1769 * 1770 * unconditionally add to the ee_hash. 1771 * 1772 * if no conflicting request is found: 1773 * submit. 1774 * 1775 * if any conflicting request is found 1776 * that has not yet been acked, 1777 * AND I have the "discard concurrent writes" flag: 1778 * queue (via done_ee) the P_DISCARD_ACK; OUT. 1779 * 1780 * if any conflicting request is found: 1781 * block the receiver, waiting on misc_wait 1782 * until no more conflicting requests are there, 1783 * or we get interrupted (disconnect). 1784 * 1785 * we do not just write after local io completion of those 1786 * requests, but only after req is done completely, i.e. 1787 * we wait for the P_DISCARD_ACK to arrive! 1788 * 1789 * then proceed normally, i.e. submit. 1790 */ 1791 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num))) 1792 goto out_interrupted; 1793 1794 spin_lock_irq(&mdev->req_lock); 1795 1796 hlist_add_head(&e->collision, ee_hash_slot(mdev, sector)); 1797 1798 #define OVERLAPS overlaps(i->sector, i->size, sector, size) 1799 slot = tl_hash_slot(mdev, sector); 1800 first = 1; 1801 for (;;) { 1802 int have_unacked = 0; 1803 int have_conflict = 0; 1804 prepare_to_wait(&mdev->misc_wait, &wait, 1805 TASK_INTERRUPTIBLE); 1806 hlist_for_each_entry(i, n, slot, collision) { 1807 if (OVERLAPS) { 1808 /* only ALERT on first iteration, 1809 * we may be woken up early... */ 1810 if (first) 1811 dev_alert(DEV, "%s[%u] Concurrent local write detected!" 1812 " new: %llus +%u; pending: %llus +%u\n", 1813 current->comm, current->pid, 1814 (unsigned long long)sector, size, 1815 (unsigned long long)i->sector, i->size); 1816 if (i->rq_state & RQ_NET_PENDING) 1817 ++have_unacked; 1818 ++have_conflict; 1819 } 1820 } 1821 #undef OVERLAPS 1822 if (!have_conflict) 1823 break; 1824 1825 /* Discard Ack only for the _first_ iteration */ 1826 if (first && discard && have_unacked) { 1827 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n", 1828 (unsigned long long)sector); 1829 inc_unacked(mdev); 1830 e->w.cb = e_send_discard_ack; 1831 list_add_tail(&e->w.list, &mdev->done_ee); 1832 1833 spin_unlock_irq(&mdev->req_lock); 1834 1835 /* we could probably send that P_DISCARD_ACK ourselves, 1836 * but I don't like the receiver using the msock */ 1837 1838 put_ldev(mdev); 1839 wake_asender(mdev); 1840 finish_wait(&mdev->misc_wait, &wait); 1841 return true; 1842 } 1843 1844 if (signal_pending(current)) { 1845 hlist_del_init(&e->collision); 1846 1847 spin_unlock_irq(&mdev->req_lock); 1848 1849 finish_wait(&mdev->misc_wait, &wait); 1850 goto out_interrupted; 1851 } 1852 1853 spin_unlock_irq(&mdev->req_lock); 1854 if (first) { 1855 first = 0; 1856 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] " 1857 "sec=%llus\n", (unsigned long long)sector); 1858 } else if (discard) { 1859 /* we had none on the first iteration. 1860 * there must be none now. */ 1861 D_ASSERT(have_unacked == 0); 1862 } 1863 schedule(); 1864 spin_lock_irq(&mdev->req_lock); 1865 } 1866 finish_wait(&mdev->misc_wait, &wait); 1867 } 1868 1869 list_add(&e->w.list, &mdev->active_ee); 1870 spin_unlock_irq(&mdev->req_lock); 1871 1872 if (mdev->state.conn == C_SYNC_TARGET) 1873 wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, e)); 1874 1875 switch (mdev->net_conf->wire_protocol) { 1876 case DRBD_PROT_C: 1877 inc_unacked(mdev); 1878 /* corresponding dec_unacked() in e_end_block() 1879 * respective _drbd_clear_done_ee */ 1880 break; 1881 case DRBD_PROT_B: 1882 /* I really don't like it that the receiver thread 1883 * sends on the msock, but anyways */ 1884 drbd_send_ack(mdev, P_RECV_ACK, e); 1885 break; 1886 case DRBD_PROT_A: 1887 /* nothing to do */ 1888 break; 1889 } 1890 1891 if (mdev->state.pdsk < D_INCONSISTENT) { 1892 /* In case we have the only disk of the cluster, */ 1893 drbd_set_out_of_sync(mdev, e->sector, e->size); 1894 e->flags |= EE_CALL_AL_COMPLETE_IO; 1895 e->flags &= ~EE_MAY_SET_IN_SYNC; 1896 drbd_al_begin_io(mdev, e->sector); 1897 } 1898 1899 if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0) 1900 return true; 1901 1902 /* don't care for the reason here */ 1903 dev_err(DEV, "submit failed, triggering re-connect\n"); 1904 spin_lock_irq(&mdev->req_lock); 1905 list_del(&e->w.list); 1906 hlist_del_init(&e->collision); 1907 spin_unlock_irq(&mdev->req_lock); 1908 if (e->flags & EE_CALL_AL_COMPLETE_IO) 1909 drbd_al_complete_io(mdev, e->sector); 1910 1911 out_interrupted: 1912 drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP); 1913 put_ldev(mdev); 1914 drbd_free_ee(mdev, e); 1915 return false; 1916 } 1917 1918 /* We may throttle resync, if the lower device seems to be busy, 1919 * and current sync rate is above c_min_rate. 1920 * 1921 * To decide whether or not the lower device is busy, we use a scheme similar 1922 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant" 1923 * (more than 64 sectors) of activity we cannot account for with our own resync 1924 * activity, it obviously is "busy". 1925 * 1926 * The current sync rate used here uses only the most recent two step marks, 1927 * to have a short time average so we can react faster. 1928 */ 1929 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector) 1930 { 1931 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk; 1932 unsigned long db, dt, dbdt; 1933 struct lc_element *tmp; 1934 int curr_events; 1935 int throttle = 0; 1936 1937 /* feature disabled? */ 1938 if (mdev->sync_conf.c_min_rate == 0) 1939 return 0; 1940 1941 spin_lock_irq(&mdev->al_lock); 1942 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector)); 1943 if (tmp) { 1944 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce); 1945 if (test_bit(BME_PRIORITY, &bm_ext->flags)) { 1946 spin_unlock_irq(&mdev->al_lock); 1947 return 0; 1948 } 1949 /* Do not slow down if app IO is already waiting for this extent */ 1950 } 1951 spin_unlock_irq(&mdev->al_lock); 1952 1953 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) + 1954 (int)part_stat_read(&disk->part0, sectors[1]) - 1955 atomic_read(&mdev->rs_sect_ev); 1956 1957 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) { 1958 unsigned long rs_left; 1959 int i; 1960 1961 mdev->rs_last_events = curr_events; 1962 1963 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP, 1964 * approx. */ 1965 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS; 1966 1967 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T) 1968 rs_left = mdev->ov_left; 1969 else 1970 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed; 1971 1972 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ; 1973 if (!dt) 1974 dt++; 1975 db = mdev->rs_mark_left[i] - rs_left; 1976 dbdt = Bit2KB(db/dt); 1977 1978 if (dbdt > mdev->sync_conf.c_min_rate) 1979 throttle = 1; 1980 } 1981 return throttle; 1982 } 1983 1984 1985 static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size) 1986 { 1987 sector_t sector; 1988 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 1989 struct drbd_epoch_entry *e; 1990 struct digest_info *di = NULL; 1991 int size, verb; 1992 unsigned int fault_type; 1993 struct p_block_req *p = &mdev->data.rbuf.block_req; 1994 1995 sector = be64_to_cpu(p->sector); 1996 size = be32_to_cpu(p->blksize); 1997 1998 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) { 1999 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__, 2000 (unsigned long long)sector, size); 2001 return false; 2002 } 2003 if (sector + (size>>9) > capacity) { 2004 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__, 2005 (unsigned long long)sector, size); 2006 return false; 2007 } 2008 2009 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) { 2010 verb = 1; 2011 switch (cmd) { 2012 case P_DATA_REQUEST: 2013 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p); 2014 break; 2015 case P_RS_DATA_REQUEST: 2016 case P_CSUM_RS_REQUEST: 2017 case P_OV_REQUEST: 2018 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p); 2019 break; 2020 case P_OV_REPLY: 2021 verb = 0; 2022 dec_rs_pending(mdev); 2023 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC); 2024 break; 2025 default: 2026 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n", 2027 cmdname(cmd)); 2028 } 2029 if (verb && __ratelimit(&drbd_ratelimit_state)) 2030 dev_err(DEV, "Can not satisfy peer's read request, " 2031 "no local data.\n"); 2032 2033 /* drain possibly payload */ 2034 return drbd_drain_block(mdev, digest_size); 2035 } 2036 2037 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD 2038 * "criss-cross" setup, that might cause write-out on some other DRBD, 2039 * which in turn might block on the other node at this very place. */ 2040 e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO); 2041 if (!e) { 2042 put_ldev(mdev); 2043 return false; 2044 } 2045 2046 switch (cmd) { 2047 case P_DATA_REQUEST: 2048 e->w.cb = w_e_end_data_req; 2049 fault_type = DRBD_FAULT_DT_RD; 2050 /* application IO, don't drbd_rs_begin_io */ 2051 goto submit; 2052 2053 case P_RS_DATA_REQUEST: 2054 e->w.cb = w_e_end_rsdata_req; 2055 fault_type = DRBD_FAULT_RS_RD; 2056 /* used in the sector offset progress display */ 2057 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector); 2058 break; 2059 2060 case P_OV_REPLY: 2061 case P_CSUM_RS_REQUEST: 2062 fault_type = DRBD_FAULT_RS_RD; 2063 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO); 2064 if (!di) 2065 goto out_free_e; 2066 2067 di->digest_size = digest_size; 2068 di->digest = (((char *)di)+sizeof(struct digest_info)); 2069 2070 e->digest = di; 2071 e->flags |= EE_HAS_DIGEST; 2072 2073 if (drbd_recv(mdev, di->digest, digest_size) != digest_size) 2074 goto out_free_e; 2075 2076 if (cmd == P_CSUM_RS_REQUEST) { 2077 D_ASSERT(mdev->agreed_pro_version >= 89); 2078 e->w.cb = w_e_end_csum_rs_req; 2079 /* used in the sector offset progress display */ 2080 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector); 2081 } else if (cmd == P_OV_REPLY) { 2082 /* track progress, we may need to throttle */ 2083 atomic_add(size >> 9, &mdev->rs_sect_in); 2084 e->w.cb = w_e_end_ov_reply; 2085 dec_rs_pending(mdev); 2086 /* drbd_rs_begin_io done when we sent this request, 2087 * but accounting still needs to be done. */ 2088 goto submit_for_resync; 2089 } 2090 break; 2091 2092 case P_OV_REQUEST: 2093 if (mdev->ov_start_sector == ~(sector_t)0 && 2094 mdev->agreed_pro_version >= 90) { 2095 unsigned long now = jiffies; 2096 int i; 2097 mdev->ov_start_sector = sector; 2098 mdev->ov_position = sector; 2099 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector); 2100 mdev->rs_total = mdev->ov_left; 2101 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 2102 mdev->rs_mark_left[i] = mdev->ov_left; 2103 mdev->rs_mark_time[i] = now; 2104 } 2105 dev_info(DEV, "Online Verify start sector: %llu\n", 2106 (unsigned long long)sector); 2107 } 2108 e->w.cb = w_e_end_ov_req; 2109 fault_type = DRBD_FAULT_RS_RD; 2110 break; 2111 2112 default: 2113 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n", 2114 cmdname(cmd)); 2115 fault_type = DRBD_FAULT_MAX; 2116 goto out_free_e; 2117 } 2118 2119 /* Throttle, drbd_rs_begin_io and submit should become asynchronous 2120 * wrt the receiver, but it is not as straightforward as it may seem. 2121 * Various places in the resync start and stop logic assume resync 2122 * requests are processed in order, requeuing this on the worker thread 2123 * introduces a bunch of new code for synchronization between threads. 2124 * 2125 * Unlimited throttling before drbd_rs_begin_io may stall the resync 2126 * "forever", throttling after drbd_rs_begin_io will lock that extent 2127 * for application writes for the same time. For now, just throttle 2128 * here, where the rest of the code expects the receiver to sleep for 2129 * a while, anyways. 2130 */ 2131 2132 /* Throttle before drbd_rs_begin_io, as that locks out application IO; 2133 * this defers syncer requests for some time, before letting at least 2134 * on request through. The resync controller on the receiving side 2135 * will adapt to the incoming rate accordingly. 2136 * 2137 * We cannot throttle here if remote is Primary/SyncTarget: 2138 * we would also throttle its application reads. 2139 * In that case, throttling is done on the SyncTarget only. 2140 */ 2141 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector)) 2142 schedule_timeout_uninterruptible(HZ/10); 2143 if (drbd_rs_begin_io(mdev, sector)) 2144 goto out_free_e; 2145 2146 submit_for_resync: 2147 atomic_add(size >> 9, &mdev->rs_sect_ev); 2148 2149 submit: 2150 inc_unacked(mdev); 2151 spin_lock_irq(&mdev->req_lock); 2152 list_add_tail(&e->w.list, &mdev->read_ee); 2153 spin_unlock_irq(&mdev->req_lock); 2154 2155 if (drbd_submit_ee(mdev, e, READ, fault_type) == 0) 2156 return true; 2157 2158 /* don't care for the reason here */ 2159 dev_err(DEV, "submit failed, triggering re-connect\n"); 2160 spin_lock_irq(&mdev->req_lock); 2161 list_del(&e->w.list); 2162 spin_unlock_irq(&mdev->req_lock); 2163 /* no drbd_rs_complete_io(), we are dropping the connection anyways */ 2164 2165 out_free_e: 2166 put_ldev(mdev); 2167 drbd_free_ee(mdev, e); 2168 return false; 2169 } 2170 2171 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local) 2172 { 2173 int self, peer, rv = -100; 2174 unsigned long ch_self, ch_peer; 2175 2176 self = mdev->ldev->md.uuid[UI_BITMAP] & 1; 2177 peer = mdev->p_uuid[UI_BITMAP] & 1; 2178 2179 ch_peer = mdev->p_uuid[UI_SIZE]; 2180 ch_self = mdev->comm_bm_set; 2181 2182 switch (mdev->net_conf->after_sb_0p) { 2183 case ASB_CONSENSUS: 2184 case ASB_DISCARD_SECONDARY: 2185 case ASB_CALL_HELPER: 2186 dev_err(DEV, "Configuration error.\n"); 2187 break; 2188 case ASB_DISCONNECT: 2189 break; 2190 case ASB_DISCARD_YOUNGER_PRI: 2191 if (self == 0 && peer == 1) { 2192 rv = -1; 2193 break; 2194 } 2195 if (self == 1 && peer == 0) { 2196 rv = 1; 2197 break; 2198 } 2199 /* Else fall through to one of the other strategies... */ 2200 case ASB_DISCARD_OLDER_PRI: 2201 if (self == 0 && peer == 1) { 2202 rv = 1; 2203 break; 2204 } 2205 if (self == 1 && peer == 0) { 2206 rv = -1; 2207 break; 2208 } 2209 /* Else fall through to one of the other strategies... */ 2210 dev_warn(DEV, "Discard younger/older primary did not find a decision\n" 2211 "Using discard-least-changes instead\n"); 2212 case ASB_DISCARD_ZERO_CHG: 2213 if (ch_peer == 0 && ch_self == 0) { 2214 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags) 2215 ? -1 : 1; 2216 break; 2217 } else { 2218 if (ch_peer == 0) { rv = 1; break; } 2219 if (ch_self == 0) { rv = -1; break; } 2220 } 2221 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG) 2222 break; 2223 case ASB_DISCARD_LEAST_CHG: 2224 if (ch_self < ch_peer) 2225 rv = -1; 2226 else if (ch_self > ch_peer) 2227 rv = 1; 2228 else /* ( ch_self == ch_peer ) */ 2229 /* Well, then use something else. */ 2230 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags) 2231 ? -1 : 1; 2232 break; 2233 case ASB_DISCARD_LOCAL: 2234 rv = -1; 2235 break; 2236 case ASB_DISCARD_REMOTE: 2237 rv = 1; 2238 } 2239 2240 return rv; 2241 } 2242 2243 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local) 2244 { 2245 int hg, rv = -100; 2246 2247 switch (mdev->net_conf->after_sb_1p) { 2248 case ASB_DISCARD_YOUNGER_PRI: 2249 case ASB_DISCARD_OLDER_PRI: 2250 case ASB_DISCARD_LEAST_CHG: 2251 case ASB_DISCARD_LOCAL: 2252 case ASB_DISCARD_REMOTE: 2253 dev_err(DEV, "Configuration error.\n"); 2254 break; 2255 case ASB_DISCONNECT: 2256 break; 2257 case ASB_CONSENSUS: 2258 hg = drbd_asb_recover_0p(mdev); 2259 if (hg == -1 && mdev->state.role == R_SECONDARY) 2260 rv = hg; 2261 if (hg == 1 && mdev->state.role == R_PRIMARY) 2262 rv = hg; 2263 break; 2264 case ASB_VIOLENTLY: 2265 rv = drbd_asb_recover_0p(mdev); 2266 break; 2267 case ASB_DISCARD_SECONDARY: 2268 return mdev->state.role == R_PRIMARY ? 1 : -1; 2269 case ASB_CALL_HELPER: 2270 hg = drbd_asb_recover_0p(mdev); 2271 if (hg == -1 && mdev->state.role == R_PRIMARY) { 2272 enum drbd_state_rv rv2; 2273 2274 drbd_set_role(mdev, R_SECONDARY, 0); 2275 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE, 2276 * we might be here in C_WF_REPORT_PARAMS which is transient. 2277 * we do not need to wait for the after state change work either. */ 2278 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY)); 2279 if (rv2 != SS_SUCCESS) { 2280 drbd_khelper(mdev, "pri-lost-after-sb"); 2281 } else { 2282 dev_warn(DEV, "Successfully gave up primary role.\n"); 2283 rv = hg; 2284 } 2285 } else 2286 rv = hg; 2287 } 2288 2289 return rv; 2290 } 2291 2292 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local) 2293 { 2294 int hg, rv = -100; 2295 2296 switch (mdev->net_conf->after_sb_2p) { 2297 case ASB_DISCARD_YOUNGER_PRI: 2298 case ASB_DISCARD_OLDER_PRI: 2299 case ASB_DISCARD_LEAST_CHG: 2300 case ASB_DISCARD_LOCAL: 2301 case ASB_DISCARD_REMOTE: 2302 case ASB_CONSENSUS: 2303 case ASB_DISCARD_SECONDARY: 2304 dev_err(DEV, "Configuration error.\n"); 2305 break; 2306 case ASB_VIOLENTLY: 2307 rv = drbd_asb_recover_0p(mdev); 2308 break; 2309 case ASB_DISCONNECT: 2310 break; 2311 case ASB_CALL_HELPER: 2312 hg = drbd_asb_recover_0p(mdev); 2313 if (hg == -1) { 2314 enum drbd_state_rv rv2; 2315 2316 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE, 2317 * we might be here in C_WF_REPORT_PARAMS which is transient. 2318 * we do not need to wait for the after state change work either. */ 2319 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY)); 2320 if (rv2 != SS_SUCCESS) { 2321 drbd_khelper(mdev, "pri-lost-after-sb"); 2322 } else { 2323 dev_warn(DEV, "Successfully gave up primary role.\n"); 2324 rv = hg; 2325 } 2326 } else 2327 rv = hg; 2328 } 2329 2330 return rv; 2331 } 2332 2333 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid, 2334 u64 bits, u64 flags) 2335 { 2336 if (!uuid) { 2337 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text); 2338 return; 2339 } 2340 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n", 2341 text, 2342 (unsigned long long)uuid[UI_CURRENT], 2343 (unsigned long long)uuid[UI_BITMAP], 2344 (unsigned long long)uuid[UI_HISTORY_START], 2345 (unsigned long long)uuid[UI_HISTORY_END], 2346 (unsigned long long)bits, 2347 (unsigned long long)flags); 2348 } 2349 2350 /* 2351 100 after split brain try auto recover 2352 2 C_SYNC_SOURCE set BitMap 2353 1 C_SYNC_SOURCE use BitMap 2354 0 no Sync 2355 -1 C_SYNC_TARGET use BitMap 2356 -2 C_SYNC_TARGET set BitMap 2357 -100 after split brain, disconnect 2358 -1000 unrelated data 2359 -1091 requires proto 91 2360 -1096 requires proto 96 2361 */ 2362 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local) 2363 { 2364 u64 self, peer; 2365 int i, j; 2366 2367 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1); 2368 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1); 2369 2370 *rule_nr = 10; 2371 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED) 2372 return 0; 2373 2374 *rule_nr = 20; 2375 if ((self == UUID_JUST_CREATED || self == (u64)0) && 2376 peer != UUID_JUST_CREATED) 2377 return -2; 2378 2379 *rule_nr = 30; 2380 if (self != UUID_JUST_CREATED && 2381 (peer == UUID_JUST_CREATED || peer == (u64)0)) 2382 return 2; 2383 2384 if (self == peer) { 2385 int rct, dc; /* roles at crash time */ 2386 2387 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) { 2388 2389 if (mdev->agreed_pro_version < 91) 2390 return -1091; 2391 2392 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) && 2393 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) { 2394 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n"); 2395 drbd_uuid_set_bm(mdev, 0UL); 2396 2397 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, 2398 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0); 2399 *rule_nr = 34; 2400 } else { 2401 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n"); 2402 *rule_nr = 36; 2403 } 2404 2405 return 1; 2406 } 2407 2408 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) { 2409 2410 if (mdev->agreed_pro_version < 91) 2411 return -1091; 2412 2413 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) && 2414 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) { 2415 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n"); 2416 2417 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START]; 2418 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP]; 2419 mdev->p_uuid[UI_BITMAP] = 0UL; 2420 2421 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]); 2422 *rule_nr = 35; 2423 } else { 2424 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n"); 2425 *rule_nr = 37; 2426 } 2427 2428 return -1; 2429 } 2430 2431 /* Common power [off|failure] */ 2432 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) + 2433 (mdev->p_uuid[UI_FLAGS] & 2); 2434 /* lowest bit is set when we were primary, 2435 * next bit (weight 2) is set when peer was primary */ 2436 *rule_nr = 40; 2437 2438 switch (rct) { 2439 case 0: /* !self_pri && !peer_pri */ return 0; 2440 case 1: /* self_pri && !peer_pri */ return 1; 2441 case 2: /* !self_pri && peer_pri */ return -1; 2442 case 3: /* self_pri && peer_pri */ 2443 dc = test_bit(DISCARD_CONCURRENT, &mdev->flags); 2444 return dc ? -1 : 1; 2445 } 2446 } 2447 2448 *rule_nr = 50; 2449 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1); 2450 if (self == peer) 2451 return -1; 2452 2453 *rule_nr = 51; 2454 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1); 2455 if (self == peer) { 2456 if (mdev->agreed_pro_version < 96 ? 2457 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == 2458 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) : 2459 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) { 2460 /* The last P_SYNC_UUID did not get though. Undo the last start of 2461 resync as sync source modifications of the peer's UUIDs. */ 2462 2463 if (mdev->agreed_pro_version < 91) 2464 return -1091; 2465 2466 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START]; 2467 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1]; 2468 2469 dev_info(DEV, "Lost last syncUUID packet, corrected:\n"); 2470 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]); 2471 2472 return -1; 2473 } 2474 } 2475 2476 *rule_nr = 60; 2477 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1); 2478 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 2479 peer = mdev->p_uuid[i] & ~((u64)1); 2480 if (self == peer) 2481 return -2; 2482 } 2483 2484 *rule_nr = 70; 2485 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1); 2486 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1); 2487 if (self == peer) 2488 return 1; 2489 2490 *rule_nr = 71; 2491 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1); 2492 if (self == peer) { 2493 if (mdev->agreed_pro_version < 96 ? 2494 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == 2495 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) : 2496 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) { 2497 /* The last P_SYNC_UUID did not get though. Undo the last start of 2498 resync as sync source modifications of our UUIDs. */ 2499 2500 if (mdev->agreed_pro_version < 91) 2501 return -1091; 2502 2503 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]); 2504 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]); 2505 2506 dev_info(DEV, "Last syncUUID did not get through, corrected:\n"); 2507 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, 2508 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0); 2509 2510 return 1; 2511 } 2512 } 2513 2514 2515 *rule_nr = 80; 2516 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1); 2517 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 2518 self = mdev->ldev->md.uuid[i] & ~((u64)1); 2519 if (self == peer) 2520 return 2; 2521 } 2522 2523 *rule_nr = 90; 2524 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1); 2525 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1); 2526 if (self == peer && self != ((u64)0)) 2527 return 100; 2528 2529 *rule_nr = 100; 2530 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 2531 self = mdev->ldev->md.uuid[i] & ~((u64)1); 2532 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) { 2533 peer = mdev->p_uuid[j] & ~((u64)1); 2534 if (self == peer) 2535 return -100; 2536 } 2537 } 2538 2539 return -1000; 2540 } 2541 2542 /* drbd_sync_handshake() returns the new conn state on success, or 2543 CONN_MASK (-1) on failure. 2544 */ 2545 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role, 2546 enum drbd_disk_state peer_disk) __must_hold(local) 2547 { 2548 int hg, rule_nr; 2549 enum drbd_conns rv = C_MASK; 2550 enum drbd_disk_state mydisk; 2551 2552 mydisk = mdev->state.disk; 2553 if (mydisk == D_NEGOTIATING) 2554 mydisk = mdev->new_state_tmp.disk; 2555 2556 dev_info(DEV, "drbd_sync_handshake:\n"); 2557 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0); 2558 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, 2559 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]); 2560 2561 hg = drbd_uuid_compare(mdev, &rule_nr); 2562 2563 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr); 2564 2565 if (hg == -1000) { 2566 dev_alert(DEV, "Unrelated data, aborting!\n"); 2567 return C_MASK; 2568 } 2569 if (hg < -1000) { 2570 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000); 2571 return C_MASK; 2572 } 2573 2574 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) || 2575 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) { 2576 int f = (hg == -100) || abs(hg) == 2; 2577 hg = mydisk > D_INCONSISTENT ? 1 : -1; 2578 if (f) 2579 hg = hg*2; 2580 dev_info(DEV, "Becoming sync %s due to disk states.\n", 2581 hg > 0 ? "source" : "target"); 2582 } 2583 2584 if (abs(hg) == 100) 2585 drbd_khelper(mdev, "initial-split-brain"); 2586 2587 if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) { 2588 int pcount = (mdev->state.role == R_PRIMARY) 2589 + (peer_role == R_PRIMARY); 2590 int forced = (hg == -100); 2591 2592 switch (pcount) { 2593 case 0: 2594 hg = drbd_asb_recover_0p(mdev); 2595 break; 2596 case 1: 2597 hg = drbd_asb_recover_1p(mdev); 2598 break; 2599 case 2: 2600 hg = drbd_asb_recover_2p(mdev); 2601 break; 2602 } 2603 if (abs(hg) < 100) { 2604 dev_warn(DEV, "Split-Brain detected, %d primaries, " 2605 "automatically solved. Sync from %s node\n", 2606 pcount, (hg < 0) ? "peer" : "this"); 2607 if (forced) { 2608 dev_warn(DEV, "Doing a full sync, since" 2609 " UUIDs where ambiguous.\n"); 2610 hg = hg*2; 2611 } 2612 } 2613 } 2614 2615 if (hg == -100) { 2616 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1)) 2617 hg = -1; 2618 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1)) 2619 hg = 1; 2620 2621 if (abs(hg) < 100) 2622 dev_warn(DEV, "Split-Brain detected, manually solved. " 2623 "Sync from %s node\n", 2624 (hg < 0) ? "peer" : "this"); 2625 } 2626 2627 if (hg == -100) { 2628 /* FIXME this log message is not correct if we end up here 2629 * after an attempted attach on a diskless node. 2630 * We just refuse to attach -- well, we drop the "connection" 2631 * to that disk, in a way... */ 2632 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n"); 2633 drbd_khelper(mdev, "split-brain"); 2634 return C_MASK; 2635 } 2636 2637 if (hg > 0 && mydisk <= D_INCONSISTENT) { 2638 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n"); 2639 return C_MASK; 2640 } 2641 2642 if (hg < 0 && /* by intention we do not use mydisk here. */ 2643 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) { 2644 switch (mdev->net_conf->rr_conflict) { 2645 case ASB_CALL_HELPER: 2646 drbd_khelper(mdev, "pri-lost"); 2647 /* fall through */ 2648 case ASB_DISCONNECT: 2649 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n"); 2650 return C_MASK; 2651 case ASB_VIOLENTLY: 2652 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data" 2653 "assumption\n"); 2654 } 2655 } 2656 2657 if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) { 2658 if (hg == 0) 2659 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n"); 2660 else 2661 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.", 2662 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET), 2663 abs(hg) >= 2 ? "full" : "bit-map based"); 2664 return C_MASK; 2665 } 2666 2667 if (abs(hg) >= 2) { 2668 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n"); 2669 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake", 2670 BM_LOCKED_SET_ALLOWED)) 2671 return C_MASK; 2672 } 2673 2674 if (hg > 0) { /* become sync source. */ 2675 rv = C_WF_BITMAP_S; 2676 } else if (hg < 0) { /* become sync target */ 2677 rv = C_WF_BITMAP_T; 2678 } else { 2679 rv = C_CONNECTED; 2680 if (drbd_bm_total_weight(mdev)) { 2681 dev_info(DEV, "No resync, but %lu bits in bitmap!\n", 2682 drbd_bm_total_weight(mdev)); 2683 } 2684 } 2685 2686 return rv; 2687 } 2688 2689 /* returns 1 if invalid */ 2690 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self) 2691 { 2692 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */ 2693 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) || 2694 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL)) 2695 return 0; 2696 2697 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */ 2698 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL || 2699 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL) 2700 return 1; 2701 2702 /* everything else is valid if they are equal on both sides. */ 2703 if (peer == self) 2704 return 0; 2705 2706 /* everything es is invalid. */ 2707 return 1; 2708 } 2709 2710 static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) 2711 { 2712 struct p_protocol *p = &mdev->data.rbuf.protocol; 2713 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p; 2714 int p_want_lose, p_two_primaries, cf; 2715 char p_integrity_alg[SHARED_SECRET_MAX] = ""; 2716 2717 p_proto = be32_to_cpu(p->protocol); 2718 p_after_sb_0p = be32_to_cpu(p->after_sb_0p); 2719 p_after_sb_1p = be32_to_cpu(p->after_sb_1p); 2720 p_after_sb_2p = be32_to_cpu(p->after_sb_2p); 2721 p_two_primaries = be32_to_cpu(p->two_primaries); 2722 cf = be32_to_cpu(p->conn_flags); 2723 p_want_lose = cf & CF_WANT_LOSE; 2724 2725 clear_bit(CONN_DRY_RUN, &mdev->flags); 2726 2727 if (cf & CF_DRY_RUN) 2728 set_bit(CONN_DRY_RUN, &mdev->flags); 2729 2730 if (p_proto != mdev->net_conf->wire_protocol) { 2731 dev_err(DEV, "incompatible communication protocols\n"); 2732 goto disconnect; 2733 } 2734 2735 if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) { 2736 dev_err(DEV, "incompatible after-sb-0pri settings\n"); 2737 goto disconnect; 2738 } 2739 2740 if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) { 2741 dev_err(DEV, "incompatible after-sb-1pri settings\n"); 2742 goto disconnect; 2743 } 2744 2745 if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) { 2746 dev_err(DEV, "incompatible after-sb-2pri settings\n"); 2747 goto disconnect; 2748 } 2749 2750 if (p_want_lose && mdev->net_conf->want_lose) { 2751 dev_err(DEV, "both sides have the 'want_lose' flag set\n"); 2752 goto disconnect; 2753 } 2754 2755 if (p_two_primaries != mdev->net_conf->two_primaries) { 2756 dev_err(DEV, "incompatible setting of the two-primaries options\n"); 2757 goto disconnect; 2758 } 2759 2760 if (mdev->agreed_pro_version >= 87) { 2761 unsigned char *my_alg = mdev->net_conf->integrity_alg; 2762 2763 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size) 2764 return false; 2765 2766 p_integrity_alg[SHARED_SECRET_MAX-1] = 0; 2767 if (strcmp(p_integrity_alg, my_alg)) { 2768 dev_err(DEV, "incompatible setting of the data-integrity-alg\n"); 2769 goto disconnect; 2770 } 2771 dev_info(DEV, "data-integrity-alg: %s\n", 2772 my_alg[0] ? my_alg : (unsigned char *)"<not-used>"); 2773 } 2774 2775 return true; 2776 2777 disconnect: 2778 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 2779 return false; 2780 } 2781 2782 /* helper function 2783 * input: alg name, feature name 2784 * return: NULL (alg name was "") 2785 * ERR_PTR(error) if something goes wrong 2786 * or the crypto hash ptr, if it worked out ok. */ 2787 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev, 2788 const char *alg, const char *name) 2789 { 2790 struct crypto_hash *tfm; 2791 2792 if (!alg[0]) 2793 return NULL; 2794 2795 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC); 2796 if (IS_ERR(tfm)) { 2797 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n", 2798 alg, name, PTR_ERR(tfm)); 2799 return tfm; 2800 } 2801 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) { 2802 crypto_free_hash(tfm); 2803 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name); 2804 return ERR_PTR(-EINVAL); 2805 } 2806 return tfm; 2807 } 2808 2809 static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size) 2810 { 2811 int ok = true; 2812 struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95; 2813 unsigned int header_size, data_size, exp_max_sz; 2814 struct crypto_hash *verify_tfm = NULL; 2815 struct crypto_hash *csums_tfm = NULL; 2816 const int apv = mdev->agreed_pro_version; 2817 int *rs_plan_s = NULL; 2818 int fifo_size = 0; 2819 2820 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param) 2821 : apv == 88 ? sizeof(struct p_rs_param) 2822 + SHARED_SECRET_MAX 2823 : apv <= 94 ? sizeof(struct p_rs_param_89) 2824 : /* apv >= 95 */ sizeof(struct p_rs_param_95); 2825 2826 if (packet_size > exp_max_sz) { 2827 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n", 2828 packet_size, exp_max_sz); 2829 return false; 2830 } 2831 2832 if (apv <= 88) { 2833 header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80); 2834 data_size = packet_size - header_size; 2835 } else if (apv <= 94) { 2836 header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80); 2837 data_size = packet_size - header_size; 2838 D_ASSERT(data_size == 0); 2839 } else { 2840 header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80); 2841 data_size = packet_size - header_size; 2842 D_ASSERT(data_size == 0); 2843 } 2844 2845 /* initialize verify_alg and csums_alg */ 2846 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX); 2847 2848 if (drbd_recv(mdev, &p->head.payload, header_size) != header_size) 2849 return false; 2850 2851 mdev->sync_conf.rate = be32_to_cpu(p->rate); 2852 2853 if (apv >= 88) { 2854 if (apv == 88) { 2855 if (data_size > SHARED_SECRET_MAX || data_size == 0) { 2856 dev_err(DEV, "verify-alg of wrong size, " 2857 "peer wants %u, accepting only up to %u byte\n", 2858 data_size, SHARED_SECRET_MAX); 2859 return false; 2860 } 2861 2862 if (drbd_recv(mdev, p->verify_alg, data_size) != data_size) 2863 return false; 2864 2865 /* we expect NUL terminated string */ 2866 /* but just in case someone tries to be evil */ 2867 D_ASSERT(p->verify_alg[data_size-1] == 0); 2868 p->verify_alg[data_size-1] = 0; 2869 2870 } else /* apv >= 89 */ { 2871 /* we still expect NUL terminated strings */ 2872 /* but just in case someone tries to be evil */ 2873 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0); 2874 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0); 2875 p->verify_alg[SHARED_SECRET_MAX-1] = 0; 2876 p->csums_alg[SHARED_SECRET_MAX-1] = 0; 2877 } 2878 2879 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) { 2880 if (mdev->state.conn == C_WF_REPORT_PARAMS) { 2881 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n", 2882 mdev->sync_conf.verify_alg, p->verify_alg); 2883 goto disconnect; 2884 } 2885 verify_tfm = drbd_crypto_alloc_digest_safe(mdev, 2886 p->verify_alg, "verify-alg"); 2887 if (IS_ERR(verify_tfm)) { 2888 verify_tfm = NULL; 2889 goto disconnect; 2890 } 2891 } 2892 2893 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) { 2894 if (mdev->state.conn == C_WF_REPORT_PARAMS) { 2895 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n", 2896 mdev->sync_conf.csums_alg, p->csums_alg); 2897 goto disconnect; 2898 } 2899 csums_tfm = drbd_crypto_alloc_digest_safe(mdev, 2900 p->csums_alg, "csums-alg"); 2901 if (IS_ERR(csums_tfm)) { 2902 csums_tfm = NULL; 2903 goto disconnect; 2904 } 2905 } 2906 2907 if (apv > 94) { 2908 mdev->sync_conf.rate = be32_to_cpu(p->rate); 2909 mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead); 2910 mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target); 2911 mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target); 2912 mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate); 2913 2914 fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; 2915 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) { 2916 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL); 2917 if (!rs_plan_s) { 2918 dev_err(DEV, "kmalloc of fifo_buffer failed"); 2919 goto disconnect; 2920 } 2921 } 2922 } 2923 2924 spin_lock(&mdev->peer_seq_lock); 2925 /* lock against drbd_nl_syncer_conf() */ 2926 if (verify_tfm) { 2927 strcpy(mdev->sync_conf.verify_alg, p->verify_alg); 2928 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1; 2929 crypto_free_hash(mdev->verify_tfm); 2930 mdev->verify_tfm = verify_tfm; 2931 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg); 2932 } 2933 if (csums_tfm) { 2934 strcpy(mdev->sync_conf.csums_alg, p->csums_alg); 2935 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1; 2936 crypto_free_hash(mdev->csums_tfm); 2937 mdev->csums_tfm = csums_tfm; 2938 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg); 2939 } 2940 if (fifo_size != mdev->rs_plan_s.size) { 2941 kfree(mdev->rs_plan_s.values); 2942 mdev->rs_plan_s.values = rs_plan_s; 2943 mdev->rs_plan_s.size = fifo_size; 2944 mdev->rs_planed = 0; 2945 } 2946 spin_unlock(&mdev->peer_seq_lock); 2947 } 2948 2949 return ok; 2950 disconnect: 2951 /* just for completeness: actually not needed, 2952 * as this is not reached if csums_tfm was ok. */ 2953 crypto_free_hash(csums_tfm); 2954 /* but free the verify_tfm again, if csums_tfm did not work out */ 2955 crypto_free_hash(verify_tfm); 2956 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 2957 return false; 2958 } 2959 2960 /* warn if the arguments differ by more than 12.5% */ 2961 static void warn_if_differ_considerably(struct drbd_conf *mdev, 2962 const char *s, sector_t a, sector_t b) 2963 { 2964 sector_t d; 2965 if (a == 0 || b == 0) 2966 return; 2967 d = (a > b) ? (a - b) : (b - a); 2968 if (d > (a>>3) || d > (b>>3)) 2969 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s, 2970 (unsigned long long)a, (unsigned long long)b); 2971 } 2972 2973 static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) 2974 { 2975 struct p_sizes *p = &mdev->data.rbuf.sizes; 2976 enum determine_dev_size dd = unchanged; 2977 sector_t p_size, p_usize, my_usize; 2978 int ldsc = 0; /* local disk size changed */ 2979 enum dds_flags ddsf; 2980 2981 p_size = be64_to_cpu(p->d_size); 2982 p_usize = be64_to_cpu(p->u_size); 2983 2984 if (p_size == 0 && mdev->state.disk == D_DISKLESS) { 2985 dev_err(DEV, "some backing storage is needed\n"); 2986 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 2987 return false; 2988 } 2989 2990 /* just store the peer's disk size for now. 2991 * we still need to figure out whether we accept that. */ 2992 mdev->p_size = p_size; 2993 2994 if (get_ldev(mdev)) { 2995 warn_if_differ_considerably(mdev, "lower level device sizes", 2996 p_size, drbd_get_max_capacity(mdev->ldev)); 2997 warn_if_differ_considerably(mdev, "user requested size", 2998 p_usize, mdev->ldev->dc.disk_size); 2999 3000 /* if this is the first connect, or an otherwise expected 3001 * param exchange, choose the minimum */ 3002 if (mdev->state.conn == C_WF_REPORT_PARAMS) 3003 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size, 3004 p_usize); 3005 3006 my_usize = mdev->ldev->dc.disk_size; 3007 3008 if (mdev->ldev->dc.disk_size != p_usize) { 3009 mdev->ldev->dc.disk_size = p_usize; 3010 dev_info(DEV, "Peer sets u_size to %lu sectors\n", 3011 (unsigned long)mdev->ldev->dc.disk_size); 3012 } 3013 3014 /* Never shrink a device with usable data during connect. 3015 But allow online shrinking if we are connected. */ 3016 if (drbd_new_dev_size(mdev, mdev->ldev, 0) < 3017 drbd_get_capacity(mdev->this_bdev) && 3018 mdev->state.disk >= D_OUTDATED && 3019 mdev->state.conn < C_CONNECTED) { 3020 dev_err(DEV, "The peer's disk size is too small!\n"); 3021 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 3022 mdev->ldev->dc.disk_size = my_usize; 3023 put_ldev(mdev); 3024 return false; 3025 } 3026 put_ldev(mdev); 3027 } 3028 3029 ddsf = be16_to_cpu(p->dds_flags); 3030 if (get_ldev(mdev)) { 3031 dd = drbd_determine_dev_size(mdev, ddsf); 3032 put_ldev(mdev); 3033 if (dd == dev_size_error) 3034 return false; 3035 drbd_md_sync(mdev); 3036 } else { 3037 /* I am diskless, need to accept the peer's size. */ 3038 drbd_set_my_capacity(mdev, p_size); 3039 } 3040 3041 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size); 3042 drbd_reconsider_max_bio_size(mdev); 3043 3044 if (get_ldev(mdev)) { 3045 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) { 3046 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev); 3047 ldsc = 1; 3048 } 3049 3050 put_ldev(mdev); 3051 } 3052 3053 if (mdev->state.conn > C_WF_REPORT_PARAMS) { 3054 if (be64_to_cpu(p->c_size) != 3055 drbd_get_capacity(mdev->this_bdev) || ldsc) { 3056 /* we have different sizes, probably peer 3057 * needs to know my new size... */ 3058 drbd_send_sizes(mdev, 0, ddsf); 3059 } 3060 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) || 3061 (dd == grew && mdev->state.conn == C_CONNECTED)) { 3062 if (mdev->state.pdsk >= D_INCONSISTENT && 3063 mdev->state.disk >= D_INCONSISTENT) { 3064 if (ddsf & DDSF_NO_RESYNC) 3065 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n"); 3066 else 3067 resync_after_online_grow(mdev); 3068 } else 3069 set_bit(RESYNC_AFTER_NEG, &mdev->flags); 3070 } 3071 } 3072 3073 return true; 3074 } 3075 3076 static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) 3077 { 3078 struct p_uuids *p = &mdev->data.rbuf.uuids; 3079 u64 *p_uuid; 3080 int i, updated_uuids = 0; 3081 3082 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO); 3083 3084 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++) 3085 p_uuid[i] = be64_to_cpu(p->uuid[i]); 3086 3087 kfree(mdev->p_uuid); 3088 mdev->p_uuid = p_uuid; 3089 3090 if (mdev->state.conn < C_CONNECTED && 3091 mdev->state.disk < D_INCONSISTENT && 3092 mdev->state.role == R_PRIMARY && 3093 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) { 3094 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n", 3095 (unsigned long long)mdev->ed_uuid); 3096 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 3097 return false; 3098 } 3099 3100 if (get_ldev(mdev)) { 3101 int skip_initial_sync = 3102 mdev->state.conn == C_CONNECTED && 3103 mdev->agreed_pro_version >= 90 && 3104 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED && 3105 (p_uuid[UI_FLAGS] & 8); 3106 if (skip_initial_sync) { 3107 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n"); 3108 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write, 3109 "clear_n_write from receive_uuids", 3110 BM_LOCKED_TEST_ALLOWED); 3111 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]); 3112 _drbd_uuid_set(mdev, UI_BITMAP, 0); 3113 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE), 3114 CS_VERBOSE, NULL); 3115 drbd_md_sync(mdev); 3116 updated_uuids = 1; 3117 } 3118 put_ldev(mdev); 3119 } else if (mdev->state.disk < D_INCONSISTENT && 3120 mdev->state.role == R_PRIMARY) { 3121 /* I am a diskless primary, the peer just created a new current UUID 3122 for me. */ 3123 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]); 3124 } 3125 3126 /* Before we test for the disk state, we should wait until an eventually 3127 ongoing cluster wide state change is finished. That is important if 3128 we are primary and are detaching from our disk. We need to see the 3129 new disk state... */ 3130 wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags)); 3131 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT) 3132 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]); 3133 3134 if (updated_uuids) 3135 drbd_print_uuids(mdev, "receiver updated UUIDs to"); 3136 3137 return true; 3138 } 3139 3140 /** 3141 * convert_state() - Converts the peer's view of the cluster state to our point of view 3142 * @ps: The state as seen by the peer. 3143 */ 3144 static union drbd_state convert_state(union drbd_state ps) 3145 { 3146 union drbd_state ms; 3147 3148 static enum drbd_conns c_tab[] = { 3149 [C_CONNECTED] = C_CONNECTED, 3150 3151 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T, 3152 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S, 3153 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */ 3154 [C_VERIFY_S] = C_VERIFY_T, 3155 [C_MASK] = C_MASK, 3156 }; 3157 3158 ms.i = ps.i; 3159 3160 ms.conn = c_tab[ps.conn]; 3161 ms.peer = ps.role; 3162 ms.role = ps.peer; 3163 ms.pdsk = ps.disk; 3164 ms.disk = ps.pdsk; 3165 ms.peer_isp = (ps.aftr_isp | ps.user_isp); 3166 3167 return ms; 3168 } 3169 3170 static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) 3171 { 3172 struct p_req_state *p = &mdev->data.rbuf.req_state; 3173 union drbd_state mask, val; 3174 enum drbd_state_rv rv; 3175 3176 mask.i = be32_to_cpu(p->mask); 3177 val.i = be32_to_cpu(p->val); 3178 3179 if (test_bit(DISCARD_CONCURRENT, &mdev->flags) && 3180 test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) { 3181 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG); 3182 return true; 3183 } 3184 3185 mask = convert_state(mask); 3186 val = convert_state(val); 3187 3188 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val); 3189 3190 drbd_send_sr_reply(mdev, rv); 3191 drbd_md_sync(mdev); 3192 3193 return true; 3194 } 3195 3196 static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) 3197 { 3198 struct p_state *p = &mdev->data.rbuf.state; 3199 union drbd_state os, ns, peer_state; 3200 enum drbd_disk_state real_peer_disk; 3201 enum chg_state_flags cs_flags; 3202 int rv; 3203 3204 peer_state.i = be32_to_cpu(p->state); 3205 3206 real_peer_disk = peer_state.disk; 3207 if (peer_state.disk == D_NEGOTIATING) { 3208 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT; 3209 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk)); 3210 } 3211 3212 spin_lock_irq(&mdev->req_lock); 3213 retry: 3214 os = ns = mdev->state; 3215 spin_unlock_irq(&mdev->req_lock); 3216 3217 /* If some other part of the code (asender thread, timeout) 3218 * already decided to close the connection again, 3219 * we must not "re-establish" it here. */ 3220 if (os.conn <= C_TEAR_DOWN) 3221 return false; 3222 3223 /* If this is the "end of sync" confirmation, usually the peer disk 3224 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits 3225 * set) resync started in PausedSyncT, or if the timing of pause-/ 3226 * unpause-sync events has been "just right", the peer disk may 3227 * transition from D_CONSISTENT to D_UP_TO_DATE as well. 3228 */ 3229 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) && 3230 real_peer_disk == D_UP_TO_DATE && 3231 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) { 3232 /* If we are (becoming) SyncSource, but peer is still in sync 3233 * preparation, ignore its uptodate-ness to avoid flapping, it 3234 * will change to inconsistent once the peer reaches active 3235 * syncing states. 3236 * It may have changed syncer-paused flags, however, so we 3237 * cannot ignore this completely. */ 3238 if (peer_state.conn > C_CONNECTED && 3239 peer_state.conn < C_SYNC_SOURCE) 3240 real_peer_disk = D_INCONSISTENT; 3241 3242 /* if peer_state changes to connected at the same time, 3243 * it explicitly notifies us that it finished resync. 3244 * Maybe we should finish it up, too? */ 3245 else if (os.conn >= C_SYNC_SOURCE && 3246 peer_state.conn == C_CONNECTED) { 3247 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed) 3248 drbd_resync_finished(mdev); 3249 return true; 3250 } 3251 } 3252 3253 /* peer says his disk is inconsistent, while we think it is uptodate, 3254 * and this happens while the peer still thinks we have a sync going on, 3255 * but we think we are already done with the sync. 3256 * We ignore this to avoid flapping pdsk. 3257 * This should not happen, if the peer is a recent version of drbd. */ 3258 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT && 3259 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE) 3260 real_peer_disk = D_UP_TO_DATE; 3261 3262 if (ns.conn == C_WF_REPORT_PARAMS) 3263 ns.conn = C_CONNECTED; 3264 3265 if (peer_state.conn == C_AHEAD) 3266 ns.conn = C_BEHIND; 3267 3268 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING && 3269 get_ldev_if_state(mdev, D_NEGOTIATING)) { 3270 int cr; /* consider resync */ 3271 3272 /* if we established a new connection */ 3273 cr = (os.conn < C_CONNECTED); 3274 /* if we had an established connection 3275 * and one of the nodes newly attaches a disk */ 3276 cr |= (os.conn == C_CONNECTED && 3277 (peer_state.disk == D_NEGOTIATING || 3278 os.disk == D_NEGOTIATING)); 3279 /* if we have both been inconsistent, and the peer has been 3280 * forced to be UpToDate with --overwrite-data */ 3281 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags); 3282 /* if we had been plain connected, and the admin requested to 3283 * start a sync by "invalidate" or "invalidate-remote" */ 3284 cr |= (os.conn == C_CONNECTED && 3285 (peer_state.conn >= C_STARTING_SYNC_S && 3286 peer_state.conn <= C_WF_BITMAP_T)); 3287 3288 if (cr) 3289 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk); 3290 3291 put_ldev(mdev); 3292 if (ns.conn == C_MASK) { 3293 ns.conn = C_CONNECTED; 3294 if (mdev->state.disk == D_NEGOTIATING) { 3295 drbd_force_state(mdev, NS(disk, D_FAILED)); 3296 } else if (peer_state.disk == D_NEGOTIATING) { 3297 dev_err(DEV, "Disk attach process on the peer node was aborted.\n"); 3298 peer_state.disk = D_DISKLESS; 3299 real_peer_disk = D_DISKLESS; 3300 } else { 3301 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags)) 3302 return false; 3303 D_ASSERT(os.conn == C_WF_REPORT_PARAMS); 3304 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 3305 return false; 3306 } 3307 } 3308 } 3309 3310 spin_lock_irq(&mdev->req_lock); 3311 if (mdev->state.i != os.i) 3312 goto retry; 3313 clear_bit(CONSIDER_RESYNC, &mdev->flags); 3314 ns.peer = peer_state.role; 3315 ns.pdsk = real_peer_disk; 3316 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp); 3317 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING) 3318 ns.disk = mdev->new_state_tmp.disk; 3319 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD); 3320 if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED && 3321 test_bit(NEW_CUR_UUID, &mdev->flags)) { 3322 /* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this 3323 for temporal network outages! */ 3324 spin_unlock_irq(&mdev->req_lock); 3325 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n"); 3326 tl_clear(mdev); 3327 drbd_uuid_new_current(mdev); 3328 clear_bit(NEW_CUR_UUID, &mdev->flags); 3329 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0)); 3330 return false; 3331 } 3332 rv = _drbd_set_state(mdev, ns, cs_flags, NULL); 3333 ns = mdev->state; 3334 spin_unlock_irq(&mdev->req_lock); 3335 3336 if (rv < SS_SUCCESS) { 3337 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 3338 return false; 3339 } 3340 3341 if (os.conn > C_WF_REPORT_PARAMS) { 3342 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED && 3343 peer_state.disk != D_NEGOTIATING ) { 3344 /* we want resync, peer has not yet decided to sync... */ 3345 /* Nowadays only used when forcing a node into primary role and 3346 setting its disk to UpToDate with that */ 3347 drbd_send_uuids(mdev); 3348 drbd_send_current_state(mdev); 3349 } 3350 } 3351 3352 mdev->net_conf->want_lose = 0; 3353 3354 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */ 3355 3356 return true; 3357 } 3358 3359 static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) 3360 { 3361 struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid; 3362 3363 wait_event(mdev->misc_wait, 3364 mdev->state.conn == C_WF_SYNC_UUID || 3365 mdev->state.conn == C_BEHIND || 3366 mdev->state.conn < C_CONNECTED || 3367 mdev->state.disk < D_NEGOTIATING); 3368 3369 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */ 3370 3371 /* Here the _drbd_uuid_ functions are right, current should 3372 _not_ be rotated into the history */ 3373 if (get_ldev_if_state(mdev, D_NEGOTIATING)) { 3374 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid)); 3375 _drbd_uuid_set(mdev, UI_BITMAP, 0UL); 3376 3377 drbd_print_uuids(mdev, "updated sync uuid"); 3378 drbd_start_resync(mdev, C_SYNC_TARGET); 3379 3380 put_ldev(mdev); 3381 } else 3382 dev_err(DEV, "Ignoring SyncUUID packet!\n"); 3383 3384 return true; 3385 } 3386 3387 /** 3388 * receive_bitmap_plain 3389 * 3390 * Return 0 when done, 1 when another iteration is needed, and a negative error 3391 * code upon failure. 3392 */ 3393 static int 3394 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size, 3395 unsigned long *buffer, struct bm_xfer_ctx *c) 3396 { 3397 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset); 3398 unsigned want = num_words * sizeof(long); 3399 int err; 3400 3401 if (want != data_size) { 3402 dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size); 3403 return -EIO; 3404 } 3405 if (want == 0) 3406 return 0; 3407 err = drbd_recv(mdev, buffer, want); 3408 if (err != want) { 3409 if (err >= 0) 3410 err = -EIO; 3411 return err; 3412 } 3413 3414 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer); 3415 3416 c->word_offset += num_words; 3417 c->bit_offset = c->word_offset * BITS_PER_LONG; 3418 if (c->bit_offset > c->bm_bits) 3419 c->bit_offset = c->bm_bits; 3420 3421 return 1; 3422 } 3423 3424 /** 3425 * recv_bm_rle_bits 3426 * 3427 * Return 0 when done, 1 when another iteration is needed, and a negative error 3428 * code upon failure. 3429 */ 3430 static int 3431 recv_bm_rle_bits(struct drbd_conf *mdev, 3432 struct p_compressed_bm *p, 3433 struct bm_xfer_ctx *c) 3434 { 3435 struct bitstream bs; 3436 u64 look_ahead; 3437 u64 rl; 3438 u64 tmp; 3439 unsigned long s = c->bit_offset; 3440 unsigned long e; 3441 int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head)); 3442 int toggle = DCBP_get_start(p); 3443 int have; 3444 int bits; 3445 3446 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p)); 3447 3448 bits = bitstream_get_bits(&bs, &look_ahead, 64); 3449 if (bits < 0) 3450 return -EIO; 3451 3452 for (have = bits; have > 0; s += rl, toggle = !toggle) { 3453 bits = vli_decode_bits(&rl, look_ahead); 3454 if (bits <= 0) 3455 return -EIO; 3456 3457 if (toggle) { 3458 e = s + rl -1; 3459 if (e >= c->bm_bits) { 3460 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e); 3461 return -EIO; 3462 } 3463 _drbd_bm_set_bits(mdev, s, e); 3464 } 3465 3466 if (have < bits) { 3467 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n", 3468 have, bits, look_ahead, 3469 (unsigned int)(bs.cur.b - p->code), 3470 (unsigned int)bs.buf_len); 3471 return -EIO; 3472 } 3473 look_ahead >>= bits; 3474 have -= bits; 3475 3476 bits = bitstream_get_bits(&bs, &tmp, 64 - have); 3477 if (bits < 0) 3478 return -EIO; 3479 look_ahead |= tmp << have; 3480 have += bits; 3481 } 3482 3483 c->bit_offset = s; 3484 bm_xfer_ctx_bit_to_word_offset(c); 3485 3486 return (s != c->bm_bits); 3487 } 3488 3489 /** 3490 * decode_bitmap_c 3491 * 3492 * Return 0 when done, 1 when another iteration is needed, and a negative error 3493 * code upon failure. 3494 */ 3495 static int 3496 decode_bitmap_c(struct drbd_conf *mdev, 3497 struct p_compressed_bm *p, 3498 struct bm_xfer_ctx *c) 3499 { 3500 if (DCBP_get_code(p) == RLE_VLI_Bits) 3501 return recv_bm_rle_bits(mdev, p, c); 3502 3503 /* other variants had been implemented for evaluation, 3504 * but have been dropped as this one turned out to be "best" 3505 * during all our tests. */ 3506 3507 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding); 3508 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR)); 3509 return -EIO; 3510 } 3511 3512 void INFO_bm_xfer_stats(struct drbd_conf *mdev, 3513 const char *direction, struct bm_xfer_ctx *c) 3514 { 3515 /* what would it take to transfer it "plaintext" */ 3516 unsigned plain = sizeof(struct p_header80) * 3517 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1) 3518 + c->bm_words * sizeof(long); 3519 unsigned total = c->bytes[0] + c->bytes[1]; 3520 unsigned r; 3521 3522 /* total can not be zero. but just in case: */ 3523 if (total == 0) 3524 return; 3525 3526 /* don't report if not compressed */ 3527 if (total >= plain) 3528 return; 3529 3530 /* total < plain. check for overflow, still */ 3531 r = (total > UINT_MAX/1000) ? (total / (plain/1000)) 3532 : (1000 * total / plain); 3533 3534 if (r > 1000) 3535 r = 1000; 3536 3537 r = 1000 - r; 3538 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), " 3539 "total %u; compression: %u.%u%%\n", 3540 direction, 3541 c->bytes[1], c->packets[1], 3542 c->bytes[0], c->packets[0], 3543 total, r/10, r % 10); 3544 } 3545 3546 /* Since we are processing the bitfield from lower addresses to higher, 3547 it does not matter if the process it in 32 bit chunks or 64 bit 3548 chunks as long as it is little endian. (Understand it as byte stream, 3549 beginning with the lowest byte...) If we would use big endian 3550 we would need to process it from the highest address to the lowest, 3551 in order to be agnostic to the 32 vs 64 bits issue. 3552 3553 returns 0 on failure, 1 if we successfully received it. */ 3554 static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) 3555 { 3556 struct bm_xfer_ctx c; 3557 void *buffer; 3558 int err; 3559 int ok = false; 3560 struct p_header80 *h = &mdev->data.rbuf.header.h80; 3561 3562 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED); 3563 /* you are supposed to send additional out-of-sync information 3564 * if you actually set bits during this phase */ 3565 3566 /* maybe we should use some per thread scratch page, 3567 * and allocate that during initial device creation? */ 3568 buffer = (unsigned long *) __get_free_page(GFP_NOIO); 3569 if (!buffer) { 3570 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__); 3571 goto out; 3572 } 3573 3574 c = (struct bm_xfer_ctx) { 3575 .bm_bits = drbd_bm_bits(mdev), 3576 .bm_words = drbd_bm_words(mdev), 3577 }; 3578 3579 for(;;) { 3580 if (cmd == P_BITMAP) { 3581 err = receive_bitmap_plain(mdev, data_size, buffer, &c); 3582 } else if (cmd == P_COMPRESSED_BITMAP) { 3583 /* MAYBE: sanity check that we speak proto >= 90, 3584 * and the feature is enabled! */ 3585 struct p_compressed_bm *p; 3586 3587 if (data_size > BM_PACKET_PAYLOAD_BYTES) { 3588 dev_err(DEV, "ReportCBitmap packet too large\n"); 3589 goto out; 3590 } 3591 /* use the page buff */ 3592 p = buffer; 3593 memcpy(p, h, sizeof(*h)); 3594 if (drbd_recv(mdev, p->head.payload, data_size) != data_size) 3595 goto out; 3596 if (data_size <= (sizeof(*p) - sizeof(p->head))) { 3597 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size); 3598 goto out; 3599 } 3600 err = decode_bitmap_c(mdev, p, &c); 3601 } else { 3602 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd); 3603 goto out; 3604 } 3605 3606 c.packets[cmd == P_BITMAP]++; 3607 c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size; 3608 3609 if (err <= 0) { 3610 if (err < 0) 3611 goto out; 3612 break; 3613 } 3614 if (!drbd_recv_header(mdev, &cmd, &data_size)) 3615 goto out; 3616 } 3617 3618 INFO_bm_xfer_stats(mdev, "receive", &c); 3619 3620 if (mdev->state.conn == C_WF_BITMAP_T) { 3621 enum drbd_state_rv rv; 3622 3623 ok = !drbd_send_bitmap(mdev); 3624 if (!ok) 3625 goto out; 3626 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */ 3627 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE); 3628 D_ASSERT(rv == SS_SUCCESS); 3629 } else if (mdev->state.conn != C_WF_BITMAP_S) { 3630 /* admin may have requested C_DISCONNECTING, 3631 * other threads may have noticed network errors */ 3632 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n", 3633 drbd_conn_str(mdev->state.conn)); 3634 } 3635 3636 ok = true; 3637 out: 3638 drbd_bm_unlock(mdev); 3639 if (ok && mdev->state.conn == C_WF_BITMAP_S) 3640 drbd_start_resync(mdev, C_SYNC_SOURCE); 3641 free_page((unsigned long) buffer); 3642 return ok; 3643 } 3644 3645 static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) 3646 { 3647 /* TODO zero copy sink :) */ 3648 static char sink[128]; 3649 int size, want, r; 3650 3651 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n", 3652 cmd, data_size); 3653 3654 size = data_size; 3655 while (size > 0) { 3656 want = min_t(int, size, sizeof(sink)); 3657 r = drbd_recv(mdev, sink, want); 3658 ERR_IF(r <= 0) break; 3659 size -= r; 3660 } 3661 return size == 0; 3662 } 3663 3664 static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) 3665 { 3666 /* Make sure we've acked all the TCP data associated 3667 * with the data requests being unplugged */ 3668 drbd_tcp_quickack(mdev->data.socket); 3669 3670 return true; 3671 } 3672 3673 static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size) 3674 { 3675 struct p_block_desc *p = &mdev->data.rbuf.block_desc; 3676 3677 switch (mdev->state.conn) { 3678 case C_WF_SYNC_UUID: 3679 case C_WF_BITMAP_T: 3680 case C_BEHIND: 3681 break; 3682 default: 3683 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n", 3684 drbd_conn_str(mdev->state.conn)); 3685 } 3686 3687 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize)); 3688 3689 return true; 3690 } 3691 3692 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive); 3693 3694 struct data_cmd { 3695 int expect_payload; 3696 size_t pkt_size; 3697 drbd_cmd_handler_f function; 3698 }; 3699 3700 static struct data_cmd drbd_cmd_handler[] = { 3701 [P_DATA] = { 1, sizeof(struct p_data), receive_Data }, 3702 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply }, 3703 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } , 3704 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } , 3705 [P_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } , 3706 [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } , 3707 [P_UNPLUG_REMOTE] = { 0, sizeof(struct p_header80), receive_UnplugRemote }, 3708 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 3709 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 3710 [P_SYNC_PARAM] = { 1, sizeof(struct p_header80), receive_SyncParam }, 3711 [P_SYNC_PARAM89] = { 1, sizeof(struct p_header80), receive_SyncParam }, 3712 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol }, 3713 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids }, 3714 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes }, 3715 [P_STATE] = { 0, sizeof(struct p_state), receive_state }, 3716 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state }, 3717 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid }, 3718 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 3719 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest }, 3720 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest }, 3721 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip }, 3722 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync }, 3723 /* anything missing from this table is in 3724 * the asender_tbl, see get_asender_cmd */ 3725 [P_MAX_CMD] = { 0, 0, NULL }, 3726 }; 3727 3728 /* All handler functions that expect a sub-header get that sub-heder in 3729 mdev->data.rbuf.header.head.payload. 3730 3731 Usually in mdev->data.rbuf.header.head the callback can find the usual 3732 p_header, but they may not rely on that. Since there is also p_header95 ! 3733 */ 3734 3735 static void drbdd(struct drbd_conf *mdev) 3736 { 3737 union p_header *header = &mdev->data.rbuf.header; 3738 unsigned int packet_size; 3739 enum drbd_packets cmd; 3740 size_t shs; /* sub header size */ 3741 int rv; 3742 3743 while (get_t_state(&mdev->receiver) == Running) { 3744 drbd_thread_current_set_cpu(mdev); 3745 if (!drbd_recv_header(mdev, &cmd, &packet_size)) 3746 goto err_out; 3747 3748 if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) { 3749 dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size); 3750 goto err_out; 3751 } 3752 3753 shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header); 3754 if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) { 3755 dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size); 3756 goto err_out; 3757 } 3758 3759 if (shs) { 3760 rv = drbd_recv(mdev, &header->h80.payload, shs); 3761 if (unlikely(rv != shs)) { 3762 if (!signal_pending(current)) 3763 dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv); 3764 goto err_out; 3765 } 3766 } 3767 3768 rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs); 3769 3770 if (unlikely(!rv)) { 3771 dev_err(DEV, "error receiving %s, l: %d!\n", 3772 cmdname(cmd), packet_size); 3773 goto err_out; 3774 } 3775 } 3776 3777 if (0) { 3778 err_out: 3779 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR)); 3780 } 3781 /* If we leave here, we probably want to update at least the 3782 * "Connected" indicator on stable storage. Do so explicitly here. */ 3783 drbd_md_sync(mdev); 3784 } 3785 3786 void drbd_flush_workqueue(struct drbd_conf *mdev) 3787 { 3788 struct drbd_wq_barrier barr; 3789 3790 barr.w.cb = w_prev_work_done; 3791 init_completion(&barr.done); 3792 drbd_queue_work(&mdev->data.work, &barr.w); 3793 wait_for_completion(&barr.done); 3794 } 3795 3796 void drbd_free_tl_hash(struct drbd_conf *mdev) 3797 { 3798 struct hlist_head *h; 3799 3800 spin_lock_irq(&mdev->req_lock); 3801 3802 if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) { 3803 spin_unlock_irq(&mdev->req_lock); 3804 return; 3805 } 3806 /* paranoia code */ 3807 for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++) 3808 if (h->first) 3809 dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n", 3810 (int)(h - mdev->ee_hash), h->first); 3811 kfree(mdev->ee_hash); 3812 mdev->ee_hash = NULL; 3813 mdev->ee_hash_s = 0; 3814 3815 /* We may not have had the chance to wait for all locally pending 3816 * application requests. The hlist_add_fake() prevents access after 3817 * free on master bio completion. */ 3818 for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++) { 3819 struct drbd_request *req; 3820 struct hlist_node *pos, *n; 3821 hlist_for_each_entry_safe(req, pos, n, h, collision) { 3822 hlist_del_init(&req->collision); 3823 hlist_add_fake(&req->collision); 3824 } 3825 } 3826 3827 kfree(mdev->tl_hash); 3828 mdev->tl_hash = NULL; 3829 mdev->tl_hash_s = 0; 3830 spin_unlock_irq(&mdev->req_lock); 3831 } 3832 3833 static void drbd_disconnect(struct drbd_conf *mdev) 3834 { 3835 enum drbd_fencing_p fp; 3836 union drbd_state os, ns; 3837 int rv = SS_UNKNOWN_ERROR; 3838 unsigned int i; 3839 3840 if (mdev->state.conn == C_STANDALONE) 3841 return; 3842 3843 /* We are about to start the cleanup after connection loss. 3844 * Make sure drbd_make_request knows about that. 3845 * Usually we should be in some network failure state already, 3846 * but just in case we are not, we fix it up here. 3847 */ 3848 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE)); 3849 3850 /* asender does not clean up anything. it must not interfere, either */ 3851 drbd_thread_stop(&mdev->asender); 3852 drbd_free_sock(mdev); 3853 3854 /* wait for current activity to cease. */ 3855 spin_lock_irq(&mdev->req_lock); 3856 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee); 3857 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee); 3858 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee); 3859 spin_unlock_irq(&mdev->req_lock); 3860 3861 /* We do not have data structures that would allow us to 3862 * get the rs_pending_cnt down to 0 again. 3863 * * On C_SYNC_TARGET we do not have any data structures describing 3864 * the pending RSDataRequest's we have sent. 3865 * * On C_SYNC_SOURCE there is no data structure that tracks 3866 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget. 3867 * And no, it is not the sum of the reference counts in the 3868 * resync_LRU. The resync_LRU tracks the whole operation including 3869 * the disk-IO, while the rs_pending_cnt only tracks the blocks 3870 * on the fly. */ 3871 drbd_rs_cancel_all(mdev); 3872 mdev->rs_total = 0; 3873 mdev->rs_failed = 0; 3874 atomic_set(&mdev->rs_pending_cnt, 0); 3875 wake_up(&mdev->misc_wait); 3876 3877 /* make sure syncer is stopped and w_resume_next_sg queued */ 3878 del_timer_sync(&mdev->resync_timer); 3879 resync_timer_fn((unsigned long)mdev); 3880 3881 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier, 3882 * w_make_resync_request etc. which may still be on the worker queue 3883 * to be "canceled" */ 3884 drbd_flush_workqueue(mdev); 3885 3886 /* This also does reclaim_net_ee(). If we do this too early, we might 3887 * miss some resync ee and pages.*/ 3888 drbd_process_done_ee(mdev); 3889 3890 kfree(mdev->p_uuid); 3891 mdev->p_uuid = NULL; 3892 3893 if (!is_susp(mdev->state)) 3894 tl_clear(mdev); 3895 3896 dev_info(DEV, "Connection closed\n"); 3897 3898 drbd_md_sync(mdev); 3899 3900 fp = FP_DONT_CARE; 3901 if (get_ldev(mdev)) { 3902 fp = mdev->ldev->dc.fencing; 3903 put_ldev(mdev); 3904 } 3905 3906 if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) 3907 drbd_try_outdate_peer_async(mdev); 3908 3909 spin_lock_irq(&mdev->req_lock); 3910 os = mdev->state; 3911 if (os.conn >= C_UNCONNECTED) { 3912 /* Do not restart in case we are C_DISCONNECTING */ 3913 ns = os; 3914 ns.conn = C_UNCONNECTED; 3915 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL); 3916 } 3917 spin_unlock_irq(&mdev->req_lock); 3918 3919 if (os.conn == C_DISCONNECTING) { 3920 wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0); 3921 3922 crypto_free_hash(mdev->cram_hmac_tfm); 3923 mdev->cram_hmac_tfm = NULL; 3924 3925 kfree(mdev->net_conf); 3926 mdev->net_conf = NULL; 3927 drbd_request_state(mdev, NS(conn, C_STANDALONE)); 3928 } 3929 3930 /* serialize with bitmap writeout triggered by the state change, 3931 * if any. */ 3932 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags)); 3933 3934 /* tcp_close and release of sendpage pages can be deferred. I don't 3935 * want to use SO_LINGER, because apparently it can be deferred for 3936 * more than 20 seconds (longest time I checked). 3937 * 3938 * Actually we don't care for exactly when the network stack does its 3939 * put_page(), but release our reference on these pages right here. 3940 */ 3941 i = drbd_release_ee(mdev, &mdev->net_ee); 3942 if (i) 3943 dev_info(DEV, "net_ee not empty, killed %u entries\n", i); 3944 i = atomic_read(&mdev->pp_in_use_by_net); 3945 if (i) 3946 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i); 3947 i = atomic_read(&mdev->pp_in_use); 3948 if (i) 3949 dev_info(DEV, "pp_in_use = %d, expected 0\n", i); 3950 3951 D_ASSERT(list_empty(&mdev->read_ee)); 3952 D_ASSERT(list_empty(&mdev->active_ee)); 3953 D_ASSERT(list_empty(&mdev->sync_ee)); 3954 D_ASSERT(list_empty(&mdev->done_ee)); 3955 3956 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */ 3957 atomic_set(&mdev->current_epoch->epoch_size, 0); 3958 D_ASSERT(list_empty(&mdev->current_epoch->list)); 3959 } 3960 3961 /* 3962 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version 3963 * we can agree on is stored in agreed_pro_version. 3964 * 3965 * feature flags and the reserved array should be enough room for future 3966 * enhancements of the handshake protocol, and possible plugins... 3967 * 3968 * for now, they are expected to be zero, but ignored. 3969 */ 3970 static int drbd_send_handshake(struct drbd_conf *mdev) 3971 { 3972 /* ASSERT current == mdev->receiver ... */ 3973 struct p_handshake *p = &mdev->data.sbuf.handshake; 3974 int ok; 3975 3976 if (mutex_lock_interruptible(&mdev->data.mutex)) { 3977 dev_err(DEV, "interrupted during initial handshake\n"); 3978 return 0; /* interrupted. not ok. */ 3979 } 3980 3981 if (mdev->data.socket == NULL) { 3982 mutex_unlock(&mdev->data.mutex); 3983 return 0; 3984 } 3985 3986 memset(p, 0, sizeof(*p)); 3987 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN); 3988 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX); 3989 ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE, 3990 (struct p_header80 *)p, sizeof(*p), 0 ); 3991 mutex_unlock(&mdev->data.mutex); 3992 return ok; 3993 } 3994 3995 /* 3996 * return values: 3997 * 1 yes, we have a valid connection 3998 * 0 oops, did not work out, please try again 3999 * -1 peer talks different language, 4000 * no point in trying again, please go standalone. 4001 */ 4002 static int drbd_do_handshake(struct drbd_conf *mdev) 4003 { 4004 /* ASSERT current == mdev->receiver ... */ 4005 struct p_handshake *p = &mdev->data.rbuf.handshake; 4006 const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80); 4007 unsigned int length; 4008 enum drbd_packets cmd; 4009 int rv; 4010 4011 rv = drbd_send_handshake(mdev); 4012 if (!rv) 4013 return 0; 4014 4015 rv = drbd_recv_header(mdev, &cmd, &length); 4016 if (!rv) 4017 return 0; 4018 4019 if (cmd != P_HAND_SHAKE) { 4020 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n", 4021 cmdname(cmd), cmd); 4022 return -1; 4023 } 4024 4025 if (length != expect) { 4026 dev_err(DEV, "expected HandShake length: %u, received: %u\n", 4027 expect, length); 4028 return -1; 4029 } 4030 4031 rv = drbd_recv(mdev, &p->head.payload, expect); 4032 4033 if (rv != expect) { 4034 if (!signal_pending(current)) 4035 dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv); 4036 return 0; 4037 } 4038 4039 p->protocol_min = be32_to_cpu(p->protocol_min); 4040 p->protocol_max = be32_to_cpu(p->protocol_max); 4041 if (p->protocol_max == 0) 4042 p->protocol_max = p->protocol_min; 4043 4044 if (PRO_VERSION_MAX < p->protocol_min || 4045 PRO_VERSION_MIN > p->protocol_max) 4046 goto incompat; 4047 4048 mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max); 4049 4050 dev_info(DEV, "Handshake successful: " 4051 "Agreed network protocol version %d\n", mdev->agreed_pro_version); 4052 4053 return 1; 4054 4055 incompat: 4056 dev_err(DEV, "incompatible DRBD dialects: " 4057 "I support %d-%d, peer supports %d-%d\n", 4058 PRO_VERSION_MIN, PRO_VERSION_MAX, 4059 p->protocol_min, p->protocol_max); 4060 return -1; 4061 } 4062 4063 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE) 4064 static int drbd_do_auth(struct drbd_conf *mdev) 4065 { 4066 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n"); 4067 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n"); 4068 return -1; 4069 } 4070 #else 4071 #define CHALLENGE_LEN 64 4072 4073 /* Return value: 4074 1 - auth succeeded, 4075 0 - failed, try again (network error), 4076 -1 - auth failed, don't try again. 4077 */ 4078 4079 static int drbd_do_auth(struct drbd_conf *mdev) 4080 { 4081 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */ 4082 struct scatterlist sg; 4083 char *response = NULL; 4084 char *right_response = NULL; 4085 char *peers_ch = NULL; 4086 unsigned int key_len = strlen(mdev->net_conf->shared_secret); 4087 unsigned int resp_size; 4088 struct hash_desc desc; 4089 enum drbd_packets cmd; 4090 unsigned int length; 4091 int rv; 4092 4093 desc.tfm = mdev->cram_hmac_tfm; 4094 desc.flags = 0; 4095 4096 rv = crypto_hash_setkey(mdev->cram_hmac_tfm, 4097 (u8 *)mdev->net_conf->shared_secret, key_len); 4098 if (rv) { 4099 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv); 4100 rv = -1; 4101 goto fail; 4102 } 4103 4104 get_random_bytes(my_challenge, CHALLENGE_LEN); 4105 4106 rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN); 4107 if (!rv) 4108 goto fail; 4109 4110 rv = drbd_recv_header(mdev, &cmd, &length); 4111 if (!rv) 4112 goto fail; 4113 4114 if (cmd != P_AUTH_CHALLENGE) { 4115 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n", 4116 cmdname(cmd), cmd); 4117 rv = 0; 4118 goto fail; 4119 } 4120 4121 if (length > CHALLENGE_LEN * 2) { 4122 dev_err(DEV, "expected AuthChallenge payload too big.\n"); 4123 rv = -1; 4124 goto fail; 4125 } 4126 4127 peers_ch = kmalloc(length, GFP_NOIO); 4128 if (peers_ch == NULL) { 4129 dev_err(DEV, "kmalloc of peers_ch failed\n"); 4130 rv = -1; 4131 goto fail; 4132 } 4133 4134 rv = drbd_recv(mdev, peers_ch, length); 4135 4136 if (rv != length) { 4137 if (!signal_pending(current)) 4138 dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv); 4139 rv = 0; 4140 goto fail; 4141 } 4142 4143 resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm); 4144 response = kmalloc(resp_size, GFP_NOIO); 4145 if (response == NULL) { 4146 dev_err(DEV, "kmalloc of response failed\n"); 4147 rv = -1; 4148 goto fail; 4149 } 4150 4151 sg_init_table(&sg, 1); 4152 sg_set_buf(&sg, peers_ch, length); 4153 4154 rv = crypto_hash_digest(&desc, &sg, sg.length, response); 4155 if (rv) { 4156 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv); 4157 rv = -1; 4158 goto fail; 4159 } 4160 4161 rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size); 4162 if (!rv) 4163 goto fail; 4164 4165 rv = drbd_recv_header(mdev, &cmd, &length); 4166 if (!rv) 4167 goto fail; 4168 4169 if (cmd != P_AUTH_RESPONSE) { 4170 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n", 4171 cmdname(cmd), cmd); 4172 rv = 0; 4173 goto fail; 4174 } 4175 4176 if (length != resp_size) { 4177 dev_err(DEV, "expected AuthResponse payload of wrong size\n"); 4178 rv = 0; 4179 goto fail; 4180 } 4181 4182 rv = drbd_recv(mdev, response , resp_size); 4183 4184 if (rv != resp_size) { 4185 if (!signal_pending(current)) 4186 dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv); 4187 rv = 0; 4188 goto fail; 4189 } 4190 4191 right_response = kmalloc(resp_size, GFP_NOIO); 4192 if (right_response == NULL) { 4193 dev_err(DEV, "kmalloc of right_response failed\n"); 4194 rv = -1; 4195 goto fail; 4196 } 4197 4198 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN); 4199 4200 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response); 4201 if (rv) { 4202 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv); 4203 rv = -1; 4204 goto fail; 4205 } 4206 4207 rv = !memcmp(response, right_response, resp_size); 4208 4209 if (rv) 4210 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n", 4211 resp_size, mdev->net_conf->cram_hmac_alg); 4212 else 4213 rv = -1; 4214 4215 fail: 4216 kfree(peers_ch); 4217 kfree(response); 4218 kfree(right_response); 4219 4220 return rv; 4221 } 4222 #endif 4223 4224 int drbdd_init(struct drbd_thread *thi) 4225 { 4226 struct drbd_conf *mdev = thi->mdev; 4227 unsigned int minor = mdev_to_minor(mdev); 4228 int h; 4229 4230 sprintf(current->comm, "drbd%d_receiver", minor); 4231 4232 dev_info(DEV, "receiver (re)started\n"); 4233 4234 do { 4235 h = drbd_connect(mdev); 4236 if (h == 0) { 4237 drbd_disconnect(mdev); 4238 schedule_timeout_interruptible(HZ); 4239 } 4240 if (h == -1) { 4241 dev_warn(DEV, "Discarding network configuration.\n"); 4242 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 4243 } 4244 } while (h == 0); 4245 4246 if (h > 0) { 4247 if (get_net_conf(mdev)) { 4248 drbdd(mdev); 4249 put_net_conf(mdev); 4250 } 4251 } 4252 4253 drbd_disconnect(mdev); 4254 4255 dev_info(DEV, "receiver terminated\n"); 4256 return 0; 4257 } 4258 4259 /* ********* acknowledge sender ******** */ 4260 4261 static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h) 4262 { 4263 struct p_req_state_reply *p = (struct p_req_state_reply *)h; 4264 4265 int retcode = be32_to_cpu(p->retcode); 4266 4267 if (retcode >= SS_SUCCESS) { 4268 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags); 4269 } else { 4270 set_bit(CL_ST_CHG_FAIL, &mdev->flags); 4271 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n", 4272 drbd_set_st_err_str(retcode), retcode); 4273 } 4274 wake_up(&mdev->state_wait); 4275 4276 return true; 4277 } 4278 4279 static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h) 4280 { 4281 return drbd_send_ping_ack(mdev); 4282 4283 } 4284 4285 static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h) 4286 { 4287 /* restore idle timeout */ 4288 mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ; 4289 if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags)) 4290 wake_up(&mdev->misc_wait); 4291 4292 return true; 4293 } 4294 4295 static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h) 4296 { 4297 struct p_block_ack *p = (struct p_block_ack *)h; 4298 sector_t sector = be64_to_cpu(p->sector); 4299 int blksize = be32_to_cpu(p->blksize); 4300 4301 D_ASSERT(mdev->agreed_pro_version >= 89); 4302 4303 update_peer_seq(mdev, be32_to_cpu(p->seq_num)); 4304 4305 if (get_ldev(mdev)) { 4306 drbd_rs_complete_io(mdev, sector); 4307 drbd_set_in_sync(mdev, sector, blksize); 4308 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */ 4309 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT); 4310 put_ldev(mdev); 4311 } 4312 dec_rs_pending(mdev); 4313 atomic_add(blksize >> 9, &mdev->rs_sect_in); 4314 4315 return true; 4316 } 4317 4318 /* when we receive the ACK for a write request, 4319 * verify that we actually know about it */ 4320 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev, 4321 u64 id, sector_t sector) 4322 { 4323 struct hlist_head *slot = tl_hash_slot(mdev, sector); 4324 struct hlist_node *n; 4325 struct drbd_request *req; 4326 4327 hlist_for_each_entry(req, n, slot, collision) { 4328 if ((unsigned long)req == (unsigned long)id) { 4329 if (req->sector != sector) { 4330 dev_err(DEV, "_ack_id_to_req: found req %p but it has " 4331 "wrong sector (%llus versus %llus)\n", req, 4332 (unsigned long long)req->sector, 4333 (unsigned long long)sector); 4334 break; 4335 } 4336 return req; 4337 } 4338 } 4339 return NULL; 4340 } 4341 4342 typedef struct drbd_request *(req_validator_fn) 4343 (struct drbd_conf *mdev, u64 id, sector_t sector); 4344 4345 static int validate_req_change_req_state(struct drbd_conf *mdev, 4346 u64 id, sector_t sector, req_validator_fn validator, 4347 const char *func, enum drbd_req_event what) 4348 { 4349 struct drbd_request *req; 4350 struct bio_and_error m; 4351 4352 spin_lock_irq(&mdev->req_lock); 4353 req = validator(mdev, id, sector); 4354 if (unlikely(!req)) { 4355 spin_unlock_irq(&mdev->req_lock); 4356 4357 dev_err(DEV, "%s: failed to find req %p, sector %llus\n", func, 4358 (void *)(unsigned long)id, (unsigned long long)sector); 4359 return false; 4360 } 4361 __req_mod(req, what, &m); 4362 spin_unlock_irq(&mdev->req_lock); 4363 4364 if (m.bio) 4365 complete_master_bio(mdev, &m); 4366 return true; 4367 } 4368 4369 static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h) 4370 { 4371 struct p_block_ack *p = (struct p_block_ack *)h; 4372 sector_t sector = be64_to_cpu(p->sector); 4373 int blksize = be32_to_cpu(p->blksize); 4374 enum drbd_req_event what; 4375 4376 update_peer_seq(mdev, be32_to_cpu(p->seq_num)); 4377 4378 if (is_syncer_block_id(p->block_id)) { 4379 drbd_set_in_sync(mdev, sector, blksize); 4380 dec_rs_pending(mdev); 4381 return true; 4382 } 4383 switch (be16_to_cpu(h->command)) { 4384 case P_RS_WRITE_ACK: 4385 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C); 4386 what = write_acked_by_peer_and_sis; 4387 break; 4388 case P_WRITE_ACK: 4389 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C); 4390 what = write_acked_by_peer; 4391 break; 4392 case P_RECV_ACK: 4393 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B); 4394 what = recv_acked_by_peer; 4395 break; 4396 case P_DISCARD_ACK: 4397 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C); 4398 what = conflict_discarded_by_peer; 4399 break; 4400 default: 4401 D_ASSERT(0); 4402 return false; 4403 } 4404 4405 return validate_req_change_req_state(mdev, p->block_id, sector, 4406 _ack_id_to_req, __func__ , what); 4407 } 4408 4409 static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h) 4410 { 4411 struct p_block_ack *p = (struct p_block_ack *)h; 4412 sector_t sector = be64_to_cpu(p->sector); 4413 int size = be32_to_cpu(p->blksize); 4414 struct drbd_request *req; 4415 struct bio_and_error m; 4416 4417 update_peer_seq(mdev, be32_to_cpu(p->seq_num)); 4418 4419 if (is_syncer_block_id(p->block_id)) { 4420 dec_rs_pending(mdev); 4421 drbd_rs_failed_io(mdev, sector, size); 4422 return true; 4423 } 4424 4425 spin_lock_irq(&mdev->req_lock); 4426 req = _ack_id_to_req(mdev, p->block_id, sector); 4427 if (!req) { 4428 spin_unlock_irq(&mdev->req_lock); 4429 if (mdev->net_conf->wire_protocol == DRBD_PROT_A || 4430 mdev->net_conf->wire_protocol == DRBD_PROT_B) { 4431 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs. 4432 The master bio might already be completed, therefore the 4433 request is no longer in the collision hash. 4434 => Do not try to validate block_id as request. */ 4435 /* In Protocol B we might already have got a P_RECV_ACK 4436 but then get a P_NEG_ACK after wards. */ 4437 drbd_set_out_of_sync(mdev, sector, size); 4438 return true; 4439 } else { 4440 dev_err(DEV, "%s: failed to find req %p, sector %llus\n", __func__, 4441 (void *)(unsigned long)p->block_id, (unsigned long long)sector); 4442 return false; 4443 } 4444 } 4445 __req_mod(req, neg_acked, &m); 4446 spin_unlock_irq(&mdev->req_lock); 4447 4448 if (m.bio) 4449 complete_master_bio(mdev, &m); 4450 return true; 4451 } 4452 4453 static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h) 4454 { 4455 struct p_block_ack *p = (struct p_block_ack *)h; 4456 sector_t sector = be64_to_cpu(p->sector); 4457 4458 update_peer_seq(mdev, be32_to_cpu(p->seq_num)); 4459 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n", 4460 (unsigned long long)sector, be32_to_cpu(p->blksize)); 4461 4462 return validate_req_change_req_state(mdev, p->block_id, sector, 4463 _ar_id_to_req, __func__ , neg_acked); 4464 } 4465 4466 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h) 4467 { 4468 sector_t sector; 4469 int size; 4470 struct p_block_ack *p = (struct p_block_ack *)h; 4471 4472 sector = be64_to_cpu(p->sector); 4473 size = be32_to_cpu(p->blksize); 4474 4475 update_peer_seq(mdev, be32_to_cpu(p->seq_num)); 4476 4477 dec_rs_pending(mdev); 4478 4479 if (get_ldev_if_state(mdev, D_FAILED)) { 4480 drbd_rs_complete_io(mdev, sector); 4481 switch (be16_to_cpu(h->command)) { 4482 case P_NEG_RS_DREPLY: 4483 drbd_rs_failed_io(mdev, sector, size); 4484 case P_RS_CANCEL: 4485 break; 4486 default: 4487 D_ASSERT(0); 4488 put_ldev(mdev); 4489 return false; 4490 } 4491 put_ldev(mdev); 4492 } 4493 4494 return true; 4495 } 4496 4497 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h) 4498 { 4499 struct p_barrier_ack *p = (struct p_barrier_ack *)h; 4500 4501 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size)); 4502 4503 if (mdev->state.conn == C_AHEAD && 4504 atomic_read(&mdev->ap_in_flight) == 0 && 4505 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) { 4506 mdev->start_resync_timer.expires = jiffies + HZ; 4507 add_timer(&mdev->start_resync_timer); 4508 } 4509 4510 return true; 4511 } 4512 4513 static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h) 4514 { 4515 struct p_block_ack *p = (struct p_block_ack *)h; 4516 struct drbd_work *w; 4517 sector_t sector; 4518 int size; 4519 4520 sector = be64_to_cpu(p->sector); 4521 size = be32_to_cpu(p->blksize); 4522 4523 update_peer_seq(mdev, be32_to_cpu(p->seq_num)); 4524 4525 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC) 4526 drbd_ov_oos_found(mdev, sector, size); 4527 else 4528 ov_oos_print(mdev); 4529 4530 if (!get_ldev(mdev)) 4531 return true; 4532 4533 drbd_rs_complete_io(mdev, sector); 4534 dec_rs_pending(mdev); 4535 4536 --mdev->ov_left; 4537 4538 /* let's advance progress step marks only for every other megabyte */ 4539 if ((mdev->ov_left & 0x200) == 0x200) 4540 drbd_advance_rs_marks(mdev, mdev->ov_left); 4541 4542 if (mdev->ov_left == 0) { 4543 w = kmalloc(sizeof(*w), GFP_NOIO); 4544 if (w) { 4545 w->cb = w_ov_finished; 4546 drbd_queue_work_front(&mdev->data.work, w); 4547 } else { 4548 dev_err(DEV, "kmalloc(w) failed."); 4549 ov_oos_print(mdev); 4550 drbd_resync_finished(mdev); 4551 } 4552 } 4553 put_ldev(mdev); 4554 return true; 4555 } 4556 4557 static int got_skip(struct drbd_conf *mdev, struct p_header80 *h) 4558 { 4559 return true; 4560 } 4561 4562 struct asender_cmd { 4563 size_t pkt_size; 4564 int (*process)(struct drbd_conf *mdev, struct p_header80 *h); 4565 }; 4566 4567 static struct asender_cmd *get_asender_cmd(int cmd) 4568 { 4569 static struct asender_cmd asender_tbl[] = { 4570 /* anything missing from this table is in 4571 * the drbd_cmd_handler (drbd_default_handler) table, 4572 * see the beginning of drbdd() */ 4573 [P_PING] = { sizeof(struct p_header80), got_Ping }, 4574 [P_PING_ACK] = { sizeof(struct p_header80), got_PingAck }, 4575 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 4576 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 4577 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 4578 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 4579 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck }, 4580 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply }, 4581 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply}, 4582 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult }, 4583 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck }, 4584 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply }, 4585 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync }, 4586 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip }, 4587 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply}, 4588 [P_MAX_CMD] = { 0, NULL }, 4589 }; 4590 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL) 4591 return NULL; 4592 return &asender_tbl[cmd]; 4593 } 4594 4595 int drbd_asender(struct drbd_thread *thi) 4596 { 4597 struct drbd_conf *mdev = thi->mdev; 4598 struct p_header80 *h = &mdev->meta.rbuf.header.h80; 4599 struct asender_cmd *cmd = NULL; 4600 4601 int rv, len; 4602 void *buf = h; 4603 int received = 0; 4604 int expect = sizeof(struct p_header80); 4605 int empty; 4606 int ping_timeout_active = 0; 4607 4608 sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev)); 4609 4610 current->policy = SCHED_RR; /* Make this a realtime task! */ 4611 current->rt_priority = 2; /* more important than all other tasks */ 4612 4613 while (get_t_state(thi) == Running) { 4614 drbd_thread_current_set_cpu(mdev); 4615 if (test_and_clear_bit(SEND_PING, &mdev->flags)) { 4616 ERR_IF(!drbd_send_ping(mdev)) goto reconnect; 4617 mdev->meta.socket->sk->sk_rcvtimeo = 4618 mdev->net_conf->ping_timeo*HZ/10; 4619 ping_timeout_active = 1; 4620 } 4621 4622 /* conditionally cork; 4623 * it may hurt latency if we cork without much to send */ 4624 if (!mdev->net_conf->no_cork && 4625 3 < atomic_read(&mdev->unacked_cnt)) 4626 drbd_tcp_cork(mdev->meta.socket); 4627 while (1) { 4628 clear_bit(SIGNAL_ASENDER, &mdev->flags); 4629 flush_signals(current); 4630 if (!drbd_process_done_ee(mdev)) 4631 goto reconnect; 4632 /* to avoid race with newly queued ACKs */ 4633 set_bit(SIGNAL_ASENDER, &mdev->flags); 4634 spin_lock_irq(&mdev->req_lock); 4635 empty = list_empty(&mdev->done_ee); 4636 spin_unlock_irq(&mdev->req_lock); 4637 /* new ack may have been queued right here, 4638 * but then there is also a signal pending, 4639 * and we start over... */ 4640 if (empty) 4641 break; 4642 } 4643 /* but unconditionally uncork unless disabled */ 4644 if (!mdev->net_conf->no_cork) 4645 drbd_tcp_uncork(mdev->meta.socket); 4646 4647 /* short circuit, recv_msg would return EINTR anyways. */ 4648 if (signal_pending(current)) 4649 continue; 4650 4651 rv = drbd_recv_short(mdev, mdev->meta.socket, 4652 buf, expect-received, 0); 4653 clear_bit(SIGNAL_ASENDER, &mdev->flags); 4654 4655 flush_signals(current); 4656 4657 /* Note: 4658 * -EINTR (on meta) we got a signal 4659 * -EAGAIN (on meta) rcvtimeo expired 4660 * -ECONNRESET other side closed the connection 4661 * -ERESTARTSYS (on data) we got a signal 4662 * rv < 0 other than above: unexpected error! 4663 * rv == expected: full header or command 4664 * rv < expected: "woken" by signal during receive 4665 * rv == 0 : "connection shut down by peer" 4666 */ 4667 if (likely(rv > 0)) { 4668 received += rv; 4669 buf += rv; 4670 } else if (rv == 0) { 4671 dev_err(DEV, "meta connection shut down by peer.\n"); 4672 goto reconnect; 4673 } else if (rv == -EAGAIN) { 4674 /* If the data socket received something meanwhile, 4675 * that is good enough: peer is still alive. */ 4676 if (time_after(mdev->last_received, 4677 jiffies - mdev->meta.socket->sk->sk_rcvtimeo)) 4678 continue; 4679 if (ping_timeout_active) { 4680 dev_err(DEV, "PingAck did not arrive in time.\n"); 4681 goto reconnect; 4682 } 4683 set_bit(SEND_PING, &mdev->flags); 4684 continue; 4685 } else if (rv == -EINTR) { 4686 continue; 4687 } else { 4688 dev_err(DEV, "sock_recvmsg returned %d\n", rv); 4689 goto reconnect; 4690 } 4691 4692 if (received == expect && cmd == NULL) { 4693 if (unlikely(h->magic != BE_DRBD_MAGIC)) { 4694 dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n", 4695 be32_to_cpu(h->magic), 4696 be16_to_cpu(h->command), 4697 be16_to_cpu(h->length)); 4698 goto reconnect; 4699 } 4700 cmd = get_asender_cmd(be16_to_cpu(h->command)); 4701 len = be16_to_cpu(h->length); 4702 if (unlikely(cmd == NULL)) { 4703 dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n", 4704 be32_to_cpu(h->magic), 4705 be16_to_cpu(h->command), 4706 be16_to_cpu(h->length)); 4707 goto disconnect; 4708 } 4709 expect = cmd->pkt_size; 4710 ERR_IF(len != expect-sizeof(struct p_header80)) 4711 goto reconnect; 4712 } 4713 if (received == expect) { 4714 mdev->last_received = jiffies; 4715 D_ASSERT(cmd != NULL); 4716 if (!cmd->process(mdev, h)) 4717 goto reconnect; 4718 4719 /* the idle_timeout (ping-int) 4720 * has been restored in got_PingAck() */ 4721 if (cmd == get_asender_cmd(P_PING_ACK)) 4722 ping_timeout_active = 0; 4723 4724 buf = h; 4725 received = 0; 4726 expect = sizeof(struct p_header80); 4727 cmd = NULL; 4728 } 4729 } 4730 4731 if (0) { 4732 reconnect: 4733 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE)); 4734 drbd_md_sync(mdev); 4735 } 4736 if (0) { 4737 disconnect: 4738 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 4739 drbd_md_sync(mdev); 4740 } 4741 clear_bit(SIGNAL_ASENDER, &mdev->flags); 4742 4743 D_ASSERT(mdev->state.conn < C_CONNECTED); 4744 dev_info(DEV, "asender terminated\n"); 4745 4746 return 0; 4747 } 4748