1 /* 2 drbd_receiver.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 */ 24 25 26 #include <linux/module.h> 27 28 #include <asm/uaccess.h> 29 #include <net/sock.h> 30 31 #include <linux/drbd.h> 32 #include <linux/fs.h> 33 #include <linux/file.h> 34 #include <linux/in.h> 35 #include <linux/mm.h> 36 #include <linux/memcontrol.h> 37 #include <linux/mm_inline.h> 38 #include <linux/slab.h> 39 #include <linux/pkt_sched.h> 40 #define __KERNEL_SYSCALLS__ 41 #include <linux/unistd.h> 42 #include <linux/vmalloc.h> 43 #include <linux/random.h> 44 #include <linux/string.h> 45 #include <linux/scatterlist.h> 46 #include "drbd_int.h" 47 #include "drbd_protocol.h" 48 #include "drbd_req.h" 49 50 #include "drbd_vli.h" 51 52 struct packet_info { 53 enum drbd_packet cmd; 54 unsigned int size; 55 unsigned int vnr; 56 void *data; 57 }; 58 59 enum finish_epoch { 60 FE_STILL_LIVE, 61 FE_DESTROYED, 62 FE_RECYCLED, 63 }; 64 65 static int drbd_do_features(struct drbd_connection *connection); 66 static int drbd_do_auth(struct drbd_connection *connection); 67 static int drbd_disconnected(struct drbd_peer_device *); 68 69 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event); 70 static int e_end_block(struct drbd_work *, int); 71 72 73 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 74 75 /* 76 * some helper functions to deal with single linked page lists, 77 * page->private being our "next" pointer. 78 */ 79 80 /* If at least n pages are linked at head, get n pages off. 81 * Otherwise, don't modify head, and return NULL. 82 * Locking is the responsibility of the caller. 83 */ 84 static struct page *page_chain_del(struct page **head, int n) 85 { 86 struct page *page; 87 struct page *tmp; 88 89 BUG_ON(!n); 90 BUG_ON(!head); 91 92 page = *head; 93 94 if (!page) 95 return NULL; 96 97 while (page) { 98 tmp = page_chain_next(page); 99 if (--n == 0) 100 break; /* found sufficient pages */ 101 if (tmp == NULL) 102 /* insufficient pages, don't use any of them. */ 103 return NULL; 104 page = tmp; 105 } 106 107 /* add end of list marker for the returned list */ 108 set_page_private(page, 0); 109 /* actual return value, and adjustment of head */ 110 page = *head; 111 *head = tmp; 112 return page; 113 } 114 115 /* may be used outside of locks to find the tail of a (usually short) 116 * "private" page chain, before adding it back to a global chain head 117 * with page_chain_add() under a spinlock. */ 118 static struct page *page_chain_tail(struct page *page, int *len) 119 { 120 struct page *tmp; 121 int i = 1; 122 while ((tmp = page_chain_next(page))) 123 ++i, page = tmp; 124 if (len) 125 *len = i; 126 return page; 127 } 128 129 static int page_chain_free(struct page *page) 130 { 131 struct page *tmp; 132 int i = 0; 133 page_chain_for_each_safe(page, tmp) { 134 put_page(page); 135 ++i; 136 } 137 return i; 138 } 139 140 static void page_chain_add(struct page **head, 141 struct page *chain_first, struct page *chain_last) 142 { 143 #if 1 144 struct page *tmp; 145 tmp = page_chain_tail(chain_first, NULL); 146 BUG_ON(tmp != chain_last); 147 #endif 148 149 /* add chain to head */ 150 set_page_private(chain_last, (unsigned long)*head); 151 *head = chain_first; 152 } 153 154 static struct page *__drbd_alloc_pages(struct drbd_device *device, 155 unsigned int number) 156 { 157 struct page *page = NULL; 158 struct page *tmp = NULL; 159 unsigned int i = 0; 160 161 /* Yes, testing drbd_pp_vacant outside the lock is racy. 162 * So what. It saves a spin_lock. */ 163 if (drbd_pp_vacant >= number) { 164 spin_lock(&drbd_pp_lock); 165 page = page_chain_del(&drbd_pp_pool, number); 166 if (page) 167 drbd_pp_vacant -= number; 168 spin_unlock(&drbd_pp_lock); 169 if (page) 170 return page; 171 } 172 173 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD 174 * "criss-cross" setup, that might cause write-out on some other DRBD, 175 * which in turn might block on the other node at this very place. */ 176 for (i = 0; i < number; i++) { 177 tmp = alloc_page(GFP_TRY); 178 if (!tmp) 179 break; 180 set_page_private(tmp, (unsigned long)page); 181 page = tmp; 182 } 183 184 if (i == number) 185 return page; 186 187 /* Not enough pages immediately available this time. 188 * No need to jump around here, drbd_alloc_pages will retry this 189 * function "soon". */ 190 if (page) { 191 tmp = page_chain_tail(page, NULL); 192 spin_lock(&drbd_pp_lock); 193 page_chain_add(&drbd_pp_pool, page, tmp); 194 drbd_pp_vacant += i; 195 spin_unlock(&drbd_pp_lock); 196 } 197 return NULL; 198 } 199 200 static void reclaim_finished_net_peer_reqs(struct drbd_device *device, 201 struct list_head *to_be_freed) 202 { 203 struct drbd_peer_request *peer_req, *tmp; 204 205 /* The EEs are always appended to the end of the list. Since 206 they are sent in order over the wire, they have to finish 207 in order. As soon as we see the first not finished we can 208 stop to examine the list... */ 209 210 list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) { 211 if (drbd_peer_req_has_active_page(peer_req)) 212 break; 213 list_move(&peer_req->w.list, to_be_freed); 214 } 215 } 216 217 static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device) 218 { 219 LIST_HEAD(reclaimed); 220 struct drbd_peer_request *peer_req, *t; 221 222 spin_lock_irq(&device->resource->req_lock); 223 reclaim_finished_net_peer_reqs(device, &reclaimed); 224 spin_unlock_irq(&device->resource->req_lock); 225 226 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list) 227 drbd_free_net_peer_req(device, peer_req); 228 } 229 230 /** 231 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled) 232 * @device: DRBD device. 233 * @number: number of pages requested 234 * @retry: whether to retry, if not enough pages are available right now 235 * 236 * Tries to allocate number pages, first from our own page pool, then from 237 * the kernel, unless this allocation would exceed the max_buffers setting. 238 * Possibly retry until DRBD frees sufficient pages somewhere else. 239 * 240 * Returns a page chain linked via page->private. 241 */ 242 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number, 243 bool retry) 244 { 245 struct drbd_device *device = peer_device->device; 246 struct page *page = NULL; 247 struct net_conf *nc; 248 DEFINE_WAIT(wait); 249 int mxb; 250 251 /* Yes, we may run up to @number over max_buffers. If we 252 * follow it strictly, the admin will get it wrong anyways. */ 253 rcu_read_lock(); 254 nc = rcu_dereference(peer_device->connection->net_conf); 255 mxb = nc ? nc->max_buffers : 1000000; 256 rcu_read_unlock(); 257 258 if (atomic_read(&device->pp_in_use) < mxb) 259 page = __drbd_alloc_pages(device, number); 260 261 while (page == NULL) { 262 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE); 263 264 drbd_kick_lo_and_reclaim_net(device); 265 266 if (atomic_read(&device->pp_in_use) < mxb) { 267 page = __drbd_alloc_pages(device, number); 268 if (page) 269 break; 270 } 271 272 if (!retry) 273 break; 274 275 if (signal_pending(current)) { 276 drbd_warn(device, "drbd_alloc_pages interrupted!\n"); 277 break; 278 } 279 280 schedule(); 281 } 282 finish_wait(&drbd_pp_wait, &wait); 283 284 if (page) 285 atomic_add(number, &device->pp_in_use); 286 return page; 287 } 288 289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages. 290 * Is also used from inside an other spin_lock_irq(&resource->req_lock); 291 * Either links the page chain back to the global pool, 292 * or returns all pages to the system. */ 293 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net) 294 { 295 atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use; 296 int i; 297 298 if (page == NULL) 299 return; 300 301 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count) 302 i = page_chain_free(page); 303 else { 304 struct page *tmp; 305 tmp = page_chain_tail(page, &i); 306 spin_lock(&drbd_pp_lock); 307 page_chain_add(&drbd_pp_pool, page, tmp); 308 drbd_pp_vacant += i; 309 spin_unlock(&drbd_pp_lock); 310 } 311 i = atomic_sub_return(i, a); 312 if (i < 0) 313 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n", 314 is_net ? "pp_in_use_by_net" : "pp_in_use", i); 315 wake_up(&drbd_pp_wait); 316 } 317 318 /* 319 You need to hold the req_lock: 320 _drbd_wait_ee_list_empty() 321 322 You must not have the req_lock: 323 drbd_free_peer_req() 324 drbd_alloc_peer_req() 325 drbd_free_peer_reqs() 326 drbd_ee_fix_bhs() 327 drbd_finish_peer_reqs() 328 drbd_clear_done_ee() 329 drbd_wait_ee_list_empty() 330 */ 331 332 struct drbd_peer_request * 333 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector, 334 unsigned int data_size, gfp_t gfp_mask) __must_hold(local) 335 { 336 struct drbd_device *device = peer_device->device; 337 struct drbd_peer_request *peer_req; 338 struct page *page = NULL; 339 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT; 340 341 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE)) 342 return NULL; 343 344 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM); 345 if (!peer_req) { 346 if (!(gfp_mask & __GFP_NOWARN)) 347 drbd_err(device, "%s: allocation failed\n", __func__); 348 return NULL; 349 } 350 351 if (data_size) { 352 page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT)); 353 if (!page) 354 goto fail; 355 } 356 357 drbd_clear_interval(&peer_req->i); 358 peer_req->i.size = data_size; 359 peer_req->i.sector = sector; 360 peer_req->i.local = false; 361 peer_req->i.waiting = false; 362 363 peer_req->epoch = NULL; 364 peer_req->peer_device = peer_device; 365 peer_req->pages = page; 366 atomic_set(&peer_req->pending_bios, 0); 367 peer_req->flags = 0; 368 /* 369 * The block_id is opaque to the receiver. It is not endianness 370 * converted, and sent back to the sender unchanged. 371 */ 372 peer_req->block_id = id; 373 374 return peer_req; 375 376 fail: 377 mempool_free(peer_req, drbd_ee_mempool); 378 return NULL; 379 } 380 381 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req, 382 int is_net) 383 { 384 if (peer_req->flags & EE_HAS_DIGEST) 385 kfree(peer_req->digest); 386 drbd_free_pages(device, peer_req->pages, is_net); 387 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0); 388 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 389 mempool_free(peer_req, drbd_ee_mempool); 390 } 391 392 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list) 393 { 394 LIST_HEAD(work_list); 395 struct drbd_peer_request *peer_req, *t; 396 int count = 0; 397 int is_net = list == &device->net_ee; 398 399 spin_lock_irq(&device->resource->req_lock); 400 list_splice_init(list, &work_list); 401 spin_unlock_irq(&device->resource->req_lock); 402 403 list_for_each_entry_safe(peer_req, t, &work_list, w.list) { 404 __drbd_free_peer_req(device, peer_req, is_net); 405 count++; 406 } 407 return count; 408 } 409 410 /* 411 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier. 412 */ 413 static int drbd_finish_peer_reqs(struct drbd_device *device) 414 { 415 LIST_HEAD(work_list); 416 LIST_HEAD(reclaimed); 417 struct drbd_peer_request *peer_req, *t; 418 int err = 0; 419 420 spin_lock_irq(&device->resource->req_lock); 421 reclaim_finished_net_peer_reqs(device, &reclaimed); 422 list_splice_init(&device->done_ee, &work_list); 423 spin_unlock_irq(&device->resource->req_lock); 424 425 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list) 426 drbd_free_net_peer_req(device, peer_req); 427 428 /* possible callbacks here: 429 * e_end_block, and e_end_resync_block, e_send_superseded. 430 * all ignore the last argument. 431 */ 432 list_for_each_entry_safe(peer_req, t, &work_list, w.list) { 433 int err2; 434 435 /* list_del not necessary, next/prev members not touched */ 436 err2 = peer_req->w.cb(&peer_req->w, !!err); 437 if (!err) 438 err = err2; 439 drbd_free_peer_req(device, peer_req); 440 } 441 wake_up(&device->ee_wait); 442 443 return err; 444 } 445 446 static void _drbd_wait_ee_list_empty(struct drbd_device *device, 447 struct list_head *head) 448 { 449 DEFINE_WAIT(wait); 450 451 /* avoids spin_lock/unlock 452 * and calling prepare_to_wait in the fast path */ 453 while (!list_empty(head)) { 454 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE); 455 spin_unlock_irq(&device->resource->req_lock); 456 io_schedule(); 457 finish_wait(&device->ee_wait, &wait); 458 spin_lock_irq(&device->resource->req_lock); 459 } 460 } 461 462 static void drbd_wait_ee_list_empty(struct drbd_device *device, 463 struct list_head *head) 464 { 465 spin_lock_irq(&device->resource->req_lock); 466 _drbd_wait_ee_list_empty(device, head); 467 spin_unlock_irq(&device->resource->req_lock); 468 } 469 470 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags) 471 { 472 struct kvec iov = { 473 .iov_base = buf, 474 .iov_len = size, 475 }; 476 struct msghdr msg = { 477 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL) 478 }; 479 return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags); 480 } 481 482 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size) 483 { 484 int rv; 485 486 rv = drbd_recv_short(connection->data.socket, buf, size, 0); 487 488 if (rv < 0) { 489 if (rv == -ECONNRESET) 490 drbd_info(connection, "sock was reset by peer\n"); 491 else if (rv != -ERESTARTSYS) 492 drbd_err(connection, "sock_recvmsg returned %d\n", rv); 493 } else if (rv == 0) { 494 if (test_bit(DISCONNECT_SENT, &connection->flags)) { 495 long t; 496 rcu_read_lock(); 497 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10; 498 rcu_read_unlock(); 499 500 t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t); 501 502 if (t) 503 goto out; 504 } 505 drbd_info(connection, "sock was shut down by peer\n"); 506 } 507 508 if (rv != size) 509 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD); 510 511 out: 512 return rv; 513 } 514 515 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size) 516 { 517 int err; 518 519 err = drbd_recv(connection, buf, size); 520 if (err != size) { 521 if (err >= 0) 522 err = -EIO; 523 } else 524 err = 0; 525 return err; 526 } 527 528 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size) 529 { 530 int err; 531 532 err = drbd_recv_all(connection, buf, size); 533 if (err && !signal_pending(current)) 534 drbd_warn(connection, "short read (expected size %d)\n", (int)size); 535 return err; 536 } 537 538 /* quoting tcp(7): 539 * On individual connections, the socket buffer size must be set prior to the 540 * listen(2) or connect(2) calls in order to have it take effect. 541 * This is our wrapper to do so. 542 */ 543 static void drbd_setbufsize(struct socket *sock, unsigned int snd, 544 unsigned int rcv) 545 { 546 /* open coded SO_SNDBUF, SO_RCVBUF */ 547 if (snd) { 548 sock->sk->sk_sndbuf = snd; 549 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK; 550 } 551 if (rcv) { 552 sock->sk->sk_rcvbuf = rcv; 553 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK; 554 } 555 } 556 557 static struct socket *drbd_try_connect(struct drbd_connection *connection) 558 { 559 const char *what; 560 struct socket *sock; 561 struct sockaddr_in6 src_in6; 562 struct sockaddr_in6 peer_in6; 563 struct net_conf *nc; 564 int err, peer_addr_len, my_addr_len; 565 int sndbuf_size, rcvbuf_size, connect_int; 566 int disconnect_on_error = 1; 567 568 rcu_read_lock(); 569 nc = rcu_dereference(connection->net_conf); 570 if (!nc) { 571 rcu_read_unlock(); 572 return NULL; 573 } 574 sndbuf_size = nc->sndbuf_size; 575 rcvbuf_size = nc->rcvbuf_size; 576 connect_int = nc->connect_int; 577 rcu_read_unlock(); 578 579 my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6)); 580 memcpy(&src_in6, &connection->my_addr, my_addr_len); 581 582 if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6) 583 src_in6.sin6_port = 0; 584 else 585 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */ 586 587 peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6)); 588 memcpy(&peer_in6, &connection->peer_addr, peer_addr_len); 589 590 what = "sock_create_kern"; 591 err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family, 592 SOCK_STREAM, IPPROTO_TCP, &sock); 593 if (err < 0) { 594 sock = NULL; 595 goto out; 596 } 597 598 sock->sk->sk_rcvtimeo = 599 sock->sk->sk_sndtimeo = connect_int * HZ; 600 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size); 601 602 /* explicitly bind to the configured IP as source IP 603 * for the outgoing connections. 604 * This is needed for multihomed hosts and to be 605 * able to use lo: interfaces for drbd. 606 * Make sure to use 0 as port number, so linux selects 607 * a free one dynamically. 608 */ 609 what = "bind before connect"; 610 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len); 611 if (err < 0) 612 goto out; 613 614 /* connect may fail, peer not yet available. 615 * stay C_WF_CONNECTION, don't go Disconnecting! */ 616 disconnect_on_error = 0; 617 what = "connect"; 618 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0); 619 620 out: 621 if (err < 0) { 622 if (sock) { 623 sock_release(sock); 624 sock = NULL; 625 } 626 switch (-err) { 627 /* timeout, busy, signal pending */ 628 case ETIMEDOUT: case EAGAIN: case EINPROGRESS: 629 case EINTR: case ERESTARTSYS: 630 /* peer not (yet) available, network problem */ 631 case ECONNREFUSED: case ENETUNREACH: 632 case EHOSTDOWN: case EHOSTUNREACH: 633 disconnect_on_error = 0; 634 break; 635 default: 636 drbd_err(connection, "%s failed, err = %d\n", what, err); 637 } 638 if (disconnect_on_error) 639 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 640 } 641 642 return sock; 643 } 644 645 struct accept_wait_data { 646 struct drbd_connection *connection; 647 struct socket *s_listen; 648 struct completion door_bell; 649 void (*original_sk_state_change)(struct sock *sk); 650 651 }; 652 653 static void drbd_incoming_connection(struct sock *sk) 654 { 655 struct accept_wait_data *ad = sk->sk_user_data; 656 void (*state_change)(struct sock *sk); 657 658 state_change = ad->original_sk_state_change; 659 if (sk->sk_state == TCP_ESTABLISHED) 660 complete(&ad->door_bell); 661 state_change(sk); 662 } 663 664 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad) 665 { 666 int err, sndbuf_size, rcvbuf_size, my_addr_len; 667 struct sockaddr_in6 my_addr; 668 struct socket *s_listen; 669 struct net_conf *nc; 670 const char *what; 671 672 rcu_read_lock(); 673 nc = rcu_dereference(connection->net_conf); 674 if (!nc) { 675 rcu_read_unlock(); 676 return -EIO; 677 } 678 sndbuf_size = nc->sndbuf_size; 679 rcvbuf_size = nc->rcvbuf_size; 680 rcu_read_unlock(); 681 682 my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6)); 683 memcpy(&my_addr, &connection->my_addr, my_addr_len); 684 685 what = "sock_create_kern"; 686 err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family, 687 SOCK_STREAM, IPPROTO_TCP, &s_listen); 688 if (err) { 689 s_listen = NULL; 690 goto out; 691 } 692 693 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 694 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size); 695 696 what = "bind before listen"; 697 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len); 698 if (err < 0) 699 goto out; 700 701 ad->s_listen = s_listen; 702 write_lock_bh(&s_listen->sk->sk_callback_lock); 703 ad->original_sk_state_change = s_listen->sk->sk_state_change; 704 s_listen->sk->sk_state_change = drbd_incoming_connection; 705 s_listen->sk->sk_user_data = ad; 706 write_unlock_bh(&s_listen->sk->sk_callback_lock); 707 708 what = "listen"; 709 err = s_listen->ops->listen(s_listen, 5); 710 if (err < 0) 711 goto out; 712 713 return 0; 714 out: 715 if (s_listen) 716 sock_release(s_listen); 717 if (err < 0) { 718 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) { 719 drbd_err(connection, "%s failed, err = %d\n", what, err); 720 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 721 } 722 } 723 724 return -EIO; 725 } 726 727 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad) 728 { 729 write_lock_bh(&sk->sk_callback_lock); 730 sk->sk_state_change = ad->original_sk_state_change; 731 sk->sk_user_data = NULL; 732 write_unlock_bh(&sk->sk_callback_lock); 733 } 734 735 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad) 736 { 737 int timeo, connect_int, err = 0; 738 struct socket *s_estab = NULL; 739 struct net_conf *nc; 740 741 rcu_read_lock(); 742 nc = rcu_dereference(connection->net_conf); 743 if (!nc) { 744 rcu_read_unlock(); 745 return NULL; 746 } 747 connect_int = nc->connect_int; 748 rcu_read_unlock(); 749 750 timeo = connect_int * HZ; 751 /* 28.5% random jitter */ 752 timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7; 753 754 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo); 755 if (err <= 0) 756 return NULL; 757 758 err = kernel_accept(ad->s_listen, &s_estab, 0); 759 if (err < 0) { 760 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) { 761 drbd_err(connection, "accept failed, err = %d\n", err); 762 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 763 } 764 } 765 766 if (s_estab) 767 unregister_state_change(s_estab->sk, ad); 768 769 return s_estab; 770 } 771 772 static int decode_header(struct drbd_connection *, void *, struct packet_info *); 773 774 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock, 775 enum drbd_packet cmd) 776 { 777 if (!conn_prepare_command(connection, sock)) 778 return -EIO; 779 return conn_send_command(connection, sock, cmd, 0, NULL, 0); 780 } 781 782 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock) 783 { 784 unsigned int header_size = drbd_header_size(connection); 785 struct packet_info pi; 786 int err; 787 788 err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0); 789 if (err != header_size) { 790 if (err >= 0) 791 err = -EIO; 792 return err; 793 } 794 err = decode_header(connection, connection->data.rbuf, &pi); 795 if (err) 796 return err; 797 return pi.cmd; 798 } 799 800 /** 801 * drbd_socket_okay() - Free the socket if its connection is not okay 802 * @sock: pointer to the pointer to the socket. 803 */ 804 static int drbd_socket_okay(struct socket **sock) 805 { 806 int rr; 807 char tb[4]; 808 809 if (!*sock) 810 return false; 811 812 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK); 813 814 if (rr > 0 || rr == -EAGAIN) { 815 return true; 816 } else { 817 sock_release(*sock); 818 *sock = NULL; 819 return false; 820 } 821 } 822 /* Gets called if a connection is established, or if a new minor gets created 823 in a connection */ 824 int drbd_connected(struct drbd_peer_device *peer_device) 825 { 826 struct drbd_device *device = peer_device->device; 827 int err; 828 829 atomic_set(&device->packet_seq, 0); 830 device->peer_seq = 0; 831 832 device->state_mutex = peer_device->connection->agreed_pro_version < 100 ? 833 &peer_device->connection->cstate_mutex : 834 &device->own_state_mutex; 835 836 err = drbd_send_sync_param(peer_device); 837 if (!err) 838 err = drbd_send_sizes(peer_device, 0, 0); 839 if (!err) 840 err = drbd_send_uuids(peer_device); 841 if (!err) 842 err = drbd_send_current_state(peer_device); 843 clear_bit(USE_DEGR_WFC_T, &device->flags); 844 clear_bit(RESIZE_PENDING, &device->flags); 845 atomic_set(&device->ap_in_flight, 0); 846 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */ 847 return err; 848 } 849 850 /* 851 * return values: 852 * 1 yes, we have a valid connection 853 * 0 oops, did not work out, please try again 854 * -1 peer talks different language, 855 * no point in trying again, please go standalone. 856 * -2 We do not have a network config... 857 */ 858 static int conn_connect(struct drbd_connection *connection) 859 { 860 struct drbd_socket sock, msock; 861 struct drbd_peer_device *peer_device; 862 struct net_conf *nc; 863 int vnr, timeout, h, ok; 864 bool discard_my_data; 865 enum drbd_state_rv rv; 866 struct accept_wait_data ad = { 867 .connection = connection, 868 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell), 869 }; 870 871 clear_bit(DISCONNECT_SENT, &connection->flags); 872 if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS) 873 return -2; 874 875 mutex_init(&sock.mutex); 876 sock.sbuf = connection->data.sbuf; 877 sock.rbuf = connection->data.rbuf; 878 sock.socket = NULL; 879 mutex_init(&msock.mutex); 880 msock.sbuf = connection->meta.sbuf; 881 msock.rbuf = connection->meta.rbuf; 882 msock.socket = NULL; 883 884 /* Assume that the peer only understands protocol 80 until we know better. */ 885 connection->agreed_pro_version = 80; 886 887 if (prepare_listen_socket(connection, &ad)) 888 return 0; 889 890 do { 891 struct socket *s; 892 893 s = drbd_try_connect(connection); 894 if (s) { 895 if (!sock.socket) { 896 sock.socket = s; 897 send_first_packet(connection, &sock, P_INITIAL_DATA); 898 } else if (!msock.socket) { 899 clear_bit(RESOLVE_CONFLICTS, &connection->flags); 900 msock.socket = s; 901 send_first_packet(connection, &msock, P_INITIAL_META); 902 } else { 903 drbd_err(connection, "Logic error in conn_connect()\n"); 904 goto out_release_sockets; 905 } 906 } 907 908 if (sock.socket && msock.socket) { 909 rcu_read_lock(); 910 nc = rcu_dereference(connection->net_conf); 911 timeout = nc->ping_timeo * HZ / 10; 912 rcu_read_unlock(); 913 schedule_timeout_interruptible(timeout); 914 ok = drbd_socket_okay(&sock.socket); 915 ok = drbd_socket_okay(&msock.socket) && ok; 916 if (ok) 917 break; 918 } 919 920 retry: 921 s = drbd_wait_for_connect(connection, &ad); 922 if (s) { 923 int fp = receive_first_packet(connection, s); 924 drbd_socket_okay(&sock.socket); 925 drbd_socket_okay(&msock.socket); 926 switch (fp) { 927 case P_INITIAL_DATA: 928 if (sock.socket) { 929 drbd_warn(connection, "initial packet S crossed\n"); 930 sock_release(sock.socket); 931 sock.socket = s; 932 goto randomize; 933 } 934 sock.socket = s; 935 break; 936 case P_INITIAL_META: 937 set_bit(RESOLVE_CONFLICTS, &connection->flags); 938 if (msock.socket) { 939 drbd_warn(connection, "initial packet M crossed\n"); 940 sock_release(msock.socket); 941 msock.socket = s; 942 goto randomize; 943 } 944 msock.socket = s; 945 break; 946 default: 947 drbd_warn(connection, "Error receiving initial packet\n"); 948 sock_release(s); 949 randomize: 950 if (prandom_u32() & 1) 951 goto retry; 952 } 953 } 954 955 if (connection->cstate <= C_DISCONNECTING) 956 goto out_release_sockets; 957 if (signal_pending(current)) { 958 flush_signals(current); 959 smp_rmb(); 960 if (get_t_state(&connection->receiver) == EXITING) 961 goto out_release_sockets; 962 } 963 964 ok = drbd_socket_okay(&sock.socket); 965 ok = drbd_socket_okay(&msock.socket) && ok; 966 } while (!ok); 967 968 if (ad.s_listen) 969 sock_release(ad.s_listen); 970 971 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 972 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 973 974 sock.socket->sk->sk_allocation = GFP_NOIO; 975 msock.socket->sk->sk_allocation = GFP_NOIO; 976 977 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK; 978 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE; 979 980 /* NOT YET ... 981 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10; 982 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; 983 * first set it to the P_CONNECTION_FEATURES timeout, 984 * which we set to 4x the configured ping_timeout. */ 985 rcu_read_lock(); 986 nc = rcu_dereference(connection->net_conf); 987 988 sock.socket->sk->sk_sndtimeo = 989 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10; 990 991 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ; 992 timeout = nc->timeout * HZ / 10; 993 discard_my_data = nc->discard_my_data; 994 rcu_read_unlock(); 995 996 msock.socket->sk->sk_sndtimeo = timeout; 997 998 /* we don't want delays. 999 * we use TCP_CORK where appropriate, though */ 1000 drbd_tcp_nodelay(sock.socket); 1001 drbd_tcp_nodelay(msock.socket); 1002 1003 connection->data.socket = sock.socket; 1004 connection->meta.socket = msock.socket; 1005 connection->last_received = jiffies; 1006 1007 h = drbd_do_features(connection); 1008 if (h <= 0) 1009 return h; 1010 1011 if (connection->cram_hmac_tfm) { 1012 /* drbd_request_state(device, NS(conn, WFAuth)); */ 1013 switch (drbd_do_auth(connection)) { 1014 case -1: 1015 drbd_err(connection, "Authentication of peer failed\n"); 1016 return -1; 1017 case 0: 1018 drbd_err(connection, "Authentication of peer failed, trying again.\n"); 1019 return 0; 1020 } 1021 } 1022 1023 connection->data.socket->sk->sk_sndtimeo = timeout; 1024 connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; 1025 1026 if (drbd_send_protocol(connection) == -EOPNOTSUPP) 1027 return -1; 1028 1029 set_bit(STATE_SENT, &connection->flags); 1030 1031 rcu_read_lock(); 1032 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1033 struct drbd_device *device = peer_device->device; 1034 kref_get(&device->kref); 1035 rcu_read_unlock(); 1036 1037 /* Prevent a race between resync-handshake and 1038 * being promoted to Primary. 1039 * 1040 * Grab and release the state mutex, so we know that any current 1041 * drbd_set_role() is finished, and any incoming drbd_set_role 1042 * will see the STATE_SENT flag, and wait for it to be cleared. 1043 */ 1044 mutex_lock(device->state_mutex); 1045 mutex_unlock(device->state_mutex); 1046 1047 if (discard_my_data) 1048 set_bit(DISCARD_MY_DATA, &device->flags); 1049 else 1050 clear_bit(DISCARD_MY_DATA, &device->flags); 1051 1052 drbd_connected(peer_device); 1053 kref_put(&device->kref, drbd_destroy_device); 1054 rcu_read_lock(); 1055 } 1056 rcu_read_unlock(); 1057 1058 rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE); 1059 if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) { 1060 clear_bit(STATE_SENT, &connection->flags); 1061 return 0; 1062 } 1063 1064 drbd_thread_start(&connection->asender); 1065 1066 mutex_lock(&connection->resource->conf_update); 1067 /* The discard_my_data flag is a single-shot modifier to the next 1068 * connection attempt, the handshake of which is now well underway. 1069 * No need for rcu style copying of the whole struct 1070 * just to clear a single value. */ 1071 connection->net_conf->discard_my_data = 0; 1072 mutex_unlock(&connection->resource->conf_update); 1073 1074 return h; 1075 1076 out_release_sockets: 1077 if (ad.s_listen) 1078 sock_release(ad.s_listen); 1079 if (sock.socket) 1080 sock_release(sock.socket); 1081 if (msock.socket) 1082 sock_release(msock.socket); 1083 return -1; 1084 } 1085 1086 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi) 1087 { 1088 unsigned int header_size = drbd_header_size(connection); 1089 1090 if (header_size == sizeof(struct p_header100) && 1091 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) { 1092 struct p_header100 *h = header; 1093 if (h->pad != 0) { 1094 drbd_err(connection, "Header padding is not zero\n"); 1095 return -EINVAL; 1096 } 1097 pi->vnr = be16_to_cpu(h->volume); 1098 pi->cmd = be16_to_cpu(h->command); 1099 pi->size = be32_to_cpu(h->length); 1100 } else if (header_size == sizeof(struct p_header95) && 1101 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) { 1102 struct p_header95 *h = header; 1103 pi->cmd = be16_to_cpu(h->command); 1104 pi->size = be32_to_cpu(h->length); 1105 pi->vnr = 0; 1106 } else if (header_size == sizeof(struct p_header80) && 1107 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) { 1108 struct p_header80 *h = header; 1109 pi->cmd = be16_to_cpu(h->command); 1110 pi->size = be16_to_cpu(h->length); 1111 pi->vnr = 0; 1112 } else { 1113 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n", 1114 be32_to_cpu(*(__be32 *)header), 1115 connection->agreed_pro_version); 1116 return -EINVAL; 1117 } 1118 pi->data = header + header_size; 1119 return 0; 1120 } 1121 1122 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi) 1123 { 1124 void *buffer = connection->data.rbuf; 1125 int err; 1126 1127 err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection)); 1128 if (err) 1129 return err; 1130 1131 err = decode_header(connection, buffer, pi); 1132 connection->last_received = jiffies; 1133 1134 return err; 1135 } 1136 1137 static void drbd_flush(struct drbd_connection *connection) 1138 { 1139 int rv; 1140 struct drbd_peer_device *peer_device; 1141 int vnr; 1142 1143 if (connection->write_ordering >= WO_bdev_flush) { 1144 rcu_read_lock(); 1145 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1146 struct drbd_device *device = peer_device->device; 1147 1148 if (!get_ldev(device)) 1149 continue; 1150 kref_get(&device->kref); 1151 rcu_read_unlock(); 1152 1153 rv = blkdev_issue_flush(device->ldev->backing_bdev, 1154 GFP_NOIO, NULL); 1155 if (rv) { 1156 drbd_info(device, "local disk flush failed with status %d\n", rv); 1157 /* would rather check on EOPNOTSUPP, but that is not reliable. 1158 * don't try again for ANY return value != 0 1159 * if (rv == -EOPNOTSUPP) */ 1160 drbd_bump_write_ordering(connection, WO_drain_io); 1161 } 1162 put_ldev(device); 1163 kref_put(&device->kref, drbd_destroy_device); 1164 1165 rcu_read_lock(); 1166 if (rv) 1167 break; 1168 } 1169 rcu_read_unlock(); 1170 } 1171 } 1172 1173 /** 1174 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it. 1175 * @device: DRBD device. 1176 * @epoch: Epoch object. 1177 * @ev: Epoch event. 1178 */ 1179 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection, 1180 struct drbd_epoch *epoch, 1181 enum epoch_event ev) 1182 { 1183 int epoch_size; 1184 struct drbd_epoch *next_epoch; 1185 enum finish_epoch rv = FE_STILL_LIVE; 1186 1187 spin_lock(&connection->epoch_lock); 1188 do { 1189 next_epoch = NULL; 1190 1191 epoch_size = atomic_read(&epoch->epoch_size); 1192 1193 switch (ev & ~EV_CLEANUP) { 1194 case EV_PUT: 1195 atomic_dec(&epoch->active); 1196 break; 1197 case EV_GOT_BARRIER_NR: 1198 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags); 1199 break; 1200 case EV_BECAME_LAST: 1201 /* nothing to do*/ 1202 break; 1203 } 1204 1205 if (epoch_size != 0 && 1206 atomic_read(&epoch->active) == 0 && 1207 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) { 1208 if (!(ev & EV_CLEANUP)) { 1209 spin_unlock(&connection->epoch_lock); 1210 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size); 1211 spin_lock(&connection->epoch_lock); 1212 } 1213 #if 0 1214 /* FIXME: dec unacked on connection, once we have 1215 * something to count pending connection packets in. */ 1216 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) 1217 dec_unacked(epoch->connection); 1218 #endif 1219 1220 if (connection->current_epoch != epoch) { 1221 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list); 1222 list_del(&epoch->list); 1223 ev = EV_BECAME_LAST | (ev & EV_CLEANUP); 1224 connection->epochs--; 1225 kfree(epoch); 1226 1227 if (rv == FE_STILL_LIVE) 1228 rv = FE_DESTROYED; 1229 } else { 1230 epoch->flags = 0; 1231 atomic_set(&epoch->epoch_size, 0); 1232 /* atomic_set(&epoch->active, 0); is already zero */ 1233 if (rv == FE_STILL_LIVE) 1234 rv = FE_RECYCLED; 1235 } 1236 } 1237 1238 if (!next_epoch) 1239 break; 1240 1241 epoch = next_epoch; 1242 } while (1); 1243 1244 spin_unlock(&connection->epoch_lock); 1245 1246 return rv; 1247 } 1248 1249 /** 1250 * drbd_bump_write_ordering() - Fall back to an other write ordering method 1251 * @connection: DRBD connection. 1252 * @wo: Write ordering method to try. 1253 */ 1254 void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo) 1255 { 1256 struct disk_conf *dc; 1257 struct drbd_peer_device *peer_device; 1258 enum write_ordering_e pwo; 1259 int vnr; 1260 static char *write_ordering_str[] = { 1261 [WO_none] = "none", 1262 [WO_drain_io] = "drain", 1263 [WO_bdev_flush] = "flush", 1264 }; 1265 1266 pwo = connection->write_ordering; 1267 wo = min(pwo, wo); 1268 rcu_read_lock(); 1269 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1270 struct drbd_device *device = peer_device->device; 1271 1272 if (!get_ldev_if_state(device, D_ATTACHING)) 1273 continue; 1274 dc = rcu_dereference(device->ldev->disk_conf); 1275 1276 if (wo == WO_bdev_flush && !dc->disk_flushes) 1277 wo = WO_drain_io; 1278 if (wo == WO_drain_io && !dc->disk_drain) 1279 wo = WO_none; 1280 put_ldev(device); 1281 } 1282 rcu_read_unlock(); 1283 connection->write_ordering = wo; 1284 if (pwo != connection->write_ordering || wo == WO_bdev_flush) 1285 drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]); 1286 } 1287 1288 /** 1289 * drbd_submit_peer_request() 1290 * @device: DRBD device. 1291 * @peer_req: peer request 1292 * @rw: flag field, see bio->bi_rw 1293 * 1294 * May spread the pages to multiple bios, 1295 * depending on bio_add_page restrictions. 1296 * 1297 * Returns 0 if all bios have been submitted, 1298 * -ENOMEM if we could not allocate enough bios, 1299 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a 1300 * single page to an empty bio (which should never happen and likely indicates 1301 * that the lower level IO stack is in some way broken). This has been observed 1302 * on certain Xen deployments. 1303 */ 1304 /* TODO allocate from our own bio_set. */ 1305 int drbd_submit_peer_request(struct drbd_device *device, 1306 struct drbd_peer_request *peer_req, 1307 const unsigned rw, const int fault_type) 1308 { 1309 struct bio *bios = NULL; 1310 struct bio *bio; 1311 struct page *page = peer_req->pages; 1312 sector_t sector = peer_req->i.sector; 1313 unsigned ds = peer_req->i.size; 1314 unsigned n_bios = 0; 1315 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT; 1316 int err = -ENOMEM; 1317 1318 /* In most cases, we will only need one bio. But in case the lower 1319 * level restrictions happen to be different at this offset on this 1320 * side than those of the sending peer, we may need to submit the 1321 * request in more than one bio. 1322 * 1323 * Plain bio_alloc is good enough here, this is no DRBD internally 1324 * generated bio, but a bio allocated on behalf of the peer. 1325 */ 1326 next_bio: 1327 bio = bio_alloc(GFP_NOIO, nr_pages); 1328 if (!bio) { 1329 drbd_err(device, "submit_ee: Allocation of a bio failed\n"); 1330 goto fail; 1331 } 1332 /* > peer_req->i.sector, unless this is the first bio */ 1333 bio->bi_iter.bi_sector = sector; 1334 bio->bi_bdev = device->ldev->backing_bdev; 1335 bio->bi_rw = rw; 1336 bio->bi_private = peer_req; 1337 bio->bi_end_io = drbd_peer_request_endio; 1338 1339 bio->bi_next = bios; 1340 bios = bio; 1341 ++n_bios; 1342 1343 page_chain_for_each(page) { 1344 unsigned len = min_t(unsigned, ds, PAGE_SIZE); 1345 if (!bio_add_page(bio, page, len, 0)) { 1346 /* A single page must always be possible! 1347 * But in case it fails anyways, 1348 * we deal with it, and complain (below). */ 1349 if (bio->bi_vcnt == 0) { 1350 drbd_err(device, 1351 "bio_add_page failed for len=%u, " 1352 "bi_vcnt=0 (bi_sector=%llu)\n", 1353 len, (uint64_t)bio->bi_iter.bi_sector); 1354 err = -ENOSPC; 1355 goto fail; 1356 } 1357 goto next_bio; 1358 } 1359 ds -= len; 1360 sector += len >> 9; 1361 --nr_pages; 1362 } 1363 D_ASSERT(device, page == NULL); 1364 D_ASSERT(device, ds == 0); 1365 1366 atomic_set(&peer_req->pending_bios, n_bios); 1367 do { 1368 bio = bios; 1369 bios = bios->bi_next; 1370 bio->bi_next = NULL; 1371 1372 drbd_generic_make_request(device, fault_type, bio); 1373 } while (bios); 1374 return 0; 1375 1376 fail: 1377 while (bios) { 1378 bio = bios; 1379 bios = bios->bi_next; 1380 bio_put(bio); 1381 } 1382 return err; 1383 } 1384 1385 static void drbd_remove_epoch_entry_interval(struct drbd_device *device, 1386 struct drbd_peer_request *peer_req) 1387 { 1388 struct drbd_interval *i = &peer_req->i; 1389 1390 drbd_remove_interval(&device->write_requests, i); 1391 drbd_clear_interval(i); 1392 1393 /* Wake up any processes waiting for this peer request to complete. */ 1394 if (i->waiting) 1395 wake_up(&device->misc_wait); 1396 } 1397 1398 static void conn_wait_active_ee_empty(struct drbd_connection *connection) 1399 { 1400 struct drbd_peer_device *peer_device; 1401 int vnr; 1402 1403 rcu_read_lock(); 1404 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1405 struct drbd_device *device = peer_device->device; 1406 1407 kref_get(&device->kref); 1408 rcu_read_unlock(); 1409 drbd_wait_ee_list_empty(device, &device->active_ee); 1410 kref_put(&device->kref, drbd_destroy_device); 1411 rcu_read_lock(); 1412 } 1413 rcu_read_unlock(); 1414 } 1415 1416 static struct drbd_peer_device * 1417 conn_peer_device(struct drbd_connection *connection, int volume_number) 1418 { 1419 return idr_find(&connection->peer_devices, volume_number); 1420 } 1421 1422 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi) 1423 { 1424 int rv; 1425 struct p_barrier *p = pi->data; 1426 struct drbd_epoch *epoch; 1427 1428 /* FIXME these are unacked on connection, 1429 * not a specific (peer)device. 1430 */ 1431 connection->current_epoch->barrier_nr = p->barrier; 1432 connection->current_epoch->connection = connection; 1433 rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR); 1434 1435 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from 1436 * the activity log, which means it would not be resynced in case the 1437 * R_PRIMARY crashes now. 1438 * Therefore we must send the barrier_ack after the barrier request was 1439 * completed. */ 1440 switch (connection->write_ordering) { 1441 case WO_none: 1442 if (rv == FE_RECYCLED) 1443 return 0; 1444 1445 /* receiver context, in the writeout path of the other node. 1446 * avoid potential distributed deadlock */ 1447 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO); 1448 if (epoch) 1449 break; 1450 else 1451 drbd_warn(connection, "Allocation of an epoch failed, slowing down\n"); 1452 /* Fall through */ 1453 1454 case WO_bdev_flush: 1455 case WO_drain_io: 1456 conn_wait_active_ee_empty(connection); 1457 drbd_flush(connection); 1458 1459 if (atomic_read(&connection->current_epoch->epoch_size)) { 1460 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO); 1461 if (epoch) 1462 break; 1463 } 1464 1465 return 0; 1466 default: 1467 drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering); 1468 return -EIO; 1469 } 1470 1471 epoch->flags = 0; 1472 atomic_set(&epoch->epoch_size, 0); 1473 atomic_set(&epoch->active, 0); 1474 1475 spin_lock(&connection->epoch_lock); 1476 if (atomic_read(&connection->current_epoch->epoch_size)) { 1477 list_add(&epoch->list, &connection->current_epoch->list); 1478 connection->current_epoch = epoch; 1479 connection->epochs++; 1480 } else { 1481 /* The current_epoch got recycled while we allocated this one... */ 1482 kfree(epoch); 1483 } 1484 spin_unlock(&connection->epoch_lock); 1485 1486 return 0; 1487 } 1488 1489 /* used from receive_RSDataReply (recv_resync_read) 1490 * and from receive_Data */ 1491 static struct drbd_peer_request * 1492 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector, 1493 int data_size) __must_hold(local) 1494 { 1495 struct drbd_device *device = peer_device->device; 1496 const sector_t capacity = drbd_get_capacity(device->this_bdev); 1497 struct drbd_peer_request *peer_req; 1498 struct page *page; 1499 int dgs, ds, err; 1500 void *dig_in = peer_device->connection->int_dig_in; 1501 void *dig_vv = peer_device->connection->int_dig_vv; 1502 unsigned long *data; 1503 1504 dgs = 0; 1505 if (peer_device->connection->peer_integrity_tfm) { 1506 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm); 1507 /* 1508 * FIXME: Receive the incoming digest into the receive buffer 1509 * here, together with its struct p_data? 1510 */ 1511 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs); 1512 if (err) 1513 return NULL; 1514 data_size -= dgs; 1515 } 1516 1517 if (!expect(IS_ALIGNED(data_size, 512))) 1518 return NULL; 1519 if (!expect(data_size <= DRBD_MAX_BIO_SIZE)) 1520 return NULL; 1521 1522 /* even though we trust out peer, 1523 * we sometimes have to double check. */ 1524 if (sector + (data_size>>9) > capacity) { 1525 drbd_err(device, "request from peer beyond end of local disk: " 1526 "capacity: %llus < sector: %llus + size: %u\n", 1527 (unsigned long long)capacity, 1528 (unsigned long long)sector, data_size); 1529 return NULL; 1530 } 1531 1532 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD 1533 * "criss-cross" setup, that might cause write-out on some other DRBD, 1534 * which in turn might block on the other node at this very place. */ 1535 peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, GFP_NOIO); 1536 if (!peer_req) 1537 return NULL; 1538 1539 if (!data_size) 1540 return peer_req; 1541 1542 ds = data_size; 1543 page = peer_req->pages; 1544 page_chain_for_each(page) { 1545 unsigned len = min_t(int, ds, PAGE_SIZE); 1546 data = kmap(page); 1547 err = drbd_recv_all_warn(peer_device->connection, data, len); 1548 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) { 1549 drbd_err(device, "Fault injection: Corrupting data on receive\n"); 1550 data[0] = data[0] ^ (unsigned long)-1; 1551 } 1552 kunmap(page); 1553 if (err) { 1554 drbd_free_peer_req(device, peer_req); 1555 return NULL; 1556 } 1557 ds -= len; 1558 } 1559 1560 if (dgs) { 1561 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv); 1562 if (memcmp(dig_in, dig_vv, dgs)) { 1563 drbd_err(device, "Digest integrity check FAILED: %llus +%u\n", 1564 (unsigned long long)sector, data_size); 1565 drbd_free_peer_req(device, peer_req); 1566 return NULL; 1567 } 1568 } 1569 device->recv_cnt += data_size>>9; 1570 return peer_req; 1571 } 1572 1573 /* drbd_drain_block() just takes a data block 1574 * out of the socket input buffer, and discards it. 1575 */ 1576 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size) 1577 { 1578 struct page *page; 1579 int err = 0; 1580 void *data; 1581 1582 if (!data_size) 1583 return 0; 1584 1585 page = drbd_alloc_pages(peer_device, 1, 1); 1586 1587 data = kmap(page); 1588 while (data_size) { 1589 unsigned int len = min_t(int, data_size, PAGE_SIZE); 1590 1591 err = drbd_recv_all_warn(peer_device->connection, data, len); 1592 if (err) 1593 break; 1594 data_size -= len; 1595 } 1596 kunmap(page); 1597 drbd_free_pages(peer_device->device, page, 0); 1598 return err; 1599 } 1600 1601 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req, 1602 sector_t sector, int data_size) 1603 { 1604 struct bio_vec bvec; 1605 struct bvec_iter iter; 1606 struct bio *bio; 1607 int dgs, err, expect; 1608 void *dig_in = peer_device->connection->int_dig_in; 1609 void *dig_vv = peer_device->connection->int_dig_vv; 1610 1611 dgs = 0; 1612 if (peer_device->connection->peer_integrity_tfm) { 1613 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm); 1614 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs); 1615 if (err) 1616 return err; 1617 data_size -= dgs; 1618 } 1619 1620 /* optimistically update recv_cnt. if receiving fails below, 1621 * we disconnect anyways, and counters will be reset. */ 1622 peer_device->device->recv_cnt += data_size>>9; 1623 1624 bio = req->master_bio; 1625 D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector); 1626 1627 bio_for_each_segment(bvec, bio, iter) { 1628 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset; 1629 expect = min_t(int, data_size, bvec.bv_len); 1630 err = drbd_recv_all_warn(peer_device->connection, mapped, expect); 1631 kunmap(bvec.bv_page); 1632 if (err) 1633 return err; 1634 data_size -= expect; 1635 } 1636 1637 if (dgs) { 1638 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv); 1639 if (memcmp(dig_in, dig_vv, dgs)) { 1640 drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n"); 1641 return -EINVAL; 1642 } 1643 } 1644 1645 D_ASSERT(peer_device->device, data_size == 0); 1646 return 0; 1647 } 1648 1649 /* 1650 * e_end_resync_block() is called in asender context via 1651 * drbd_finish_peer_reqs(). 1652 */ 1653 static int e_end_resync_block(struct drbd_work *w, int unused) 1654 { 1655 struct drbd_peer_request *peer_req = 1656 container_of(w, struct drbd_peer_request, w); 1657 struct drbd_peer_device *peer_device = peer_req->peer_device; 1658 struct drbd_device *device = peer_device->device; 1659 sector_t sector = peer_req->i.sector; 1660 int err; 1661 1662 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 1663 1664 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1665 drbd_set_in_sync(device, sector, peer_req->i.size); 1666 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req); 1667 } else { 1668 /* Record failure to sync */ 1669 drbd_rs_failed_io(device, sector, peer_req->i.size); 1670 1671 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req); 1672 } 1673 dec_unacked(device); 1674 1675 return err; 1676 } 1677 1678 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector, 1679 int data_size) __releases(local) 1680 { 1681 struct drbd_device *device = peer_device->device; 1682 struct drbd_peer_request *peer_req; 1683 1684 peer_req = read_in_block(peer_device, ID_SYNCER, sector, data_size); 1685 if (!peer_req) 1686 goto fail; 1687 1688 dec_rs_pending(device); 1689 1690 inc_unacked(device); 1691 /* corresponding dec_unacked() in e_end_resync_block() 1692 * respective _drbd_clear_done_ee */ 1693 1694 peer_req->w.cb = e_end_resync_block; 1695 1696 spin_lock_irq(&device->resource->req_lock); 1697 list_add(&peer_req->w.list, &device->sync_ee); 1698 spin_unlock_irq(&device->resource->req_lock); 1699 1700 atomic_add(data_size >> 9, &device->rs_sect_ev); 1701 if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0) 1702 return 0; 1703 1704 /* don't care for the reason here */ 1705 drbd_err(device, "submit failed, triggering re-connect\n"); 1706 spin_lock_irq(&device->resource->req_lock); 1707 list_del(&peer_req->w.list); 1708 spin_unlock_irq(&device->resource->req_lock); 1709 1710 drbd_free_peer_req(device, peer_req); 1711 fail: 1712 put_ldev(device); 1713 return -EIO; 1714 } 1715 1716 static struct drbd_request * 1717 find_request(struct drbd_device *device, struct rb_root *root, u64 id, 1718 sector_t sector, bool missing_ok, const char *func) 1719 { 1720 struct drbd_request *req; 1721 1722 /* Request object according to our peer */ 1723 req = (struct drbd_request *)(unsigned long)id; 1724 if (drbd_contains_interval(root, sector, &req->i) && req->i.local) 1725 return req; 1726 if (!missing_ok) { 1727 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func, 1728 (unsigned long)id, (unsigned long long)sector); 1729 } 1730 return NULL; 1731 } 1732 1733 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi) 1734 { 1735 struct drbd_peer_device *peer_device; 1736 struct drbd_device *device; 1737 struct drbd_request *req; 1738 sector_t sector; 1739 int err; 1740 struct p_data *p = pi->data; 1741 1742 peer_device = conn_peer_device(connection, pi->vnr); 1743 if (!peer_device) 1744 return -EIO; 1745 device = peer_device->device; 1746 1747 sector = be64_to_cpu(p->sector); 1748 1749 spin_lock_irq(&device->resource->req_lock); 1750 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__); 1751 spin_unlock_irq(&device->resource->req_lock); 1752 if (unlikely(!req)) 1753 return -EIO; 1754 1755 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid 1756 * special casing it there for the various failure cases. 1757 * still no race with drbd_fail_pending_reads */ 1758 err = recv_dless_read(peer_device, req, sector, pi->size); 1759 if (!err) 1760 req_mod(req, DATA_RECEIVED); 1761 /* else: nothing. handled from drbd_disconnect... 1762 * I don't think we may complete this just yet 1763 * in case we are "on-disconnect: freeze" */ 1764 1765 return err; 1766 } 1767 1768 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi) 1769 { 1770 struct drbd_peer_device *peer_device; 1771 struct drbd_device *device; 1772 sector_t sector; 1773 int err; 1774 struct p_data *p = pi->data; 1775 1776 peer_device = conn_peer_device(connection, pi->vnr); 1777 if (!peer_device) 1778 return -EIO; 1779 device = peer_device->device; 1780 1781 sector = be64_to_cpu(p->sector); 1782 D_ASSERT(device, p->block_id == ID_SYNCER); 1783 1784 if (get_ldev(device)) { 1785 /* data is submitted to disk within recv_resync_read. 1786 * corresponding put_ldev done below on error, 1787 * or in drbd_peer_request_endio. */ 1788 err = recv_resync_read(peer_device, sector, pi->size); 1789 } else { 1790 if (__ratelimit(&drbd_ratelimit_state)) 1791 drbd_err(device, "Can not write resync data to local disk.\n"); 1792 1793 err = drbd_drain_block(peer_device, pi->size); 1794 1795 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size); 1796 } 1797 1798 atomic_add(pi->size >> 9, &device->rs_sect_in); 1799 1800 return err; 1801 } 1802 1803 static void restart_conflicting_writes(struct drbd_device *device, 1804 sector_t sector, int size) 1805 { 1806 struct drbd_interval *i; 1807 struct drbd_request *req; 1808 1809 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 1810 if (!i->local) 1811 continue; 1812 req = container_of(i, struct drbd_request, i); 1813 if (req->rq_state & RQ_LOCAL_PENDING || 1814 !(req->rq_state & RQ_POSTPONED)) 1815 continue; 1816 /* as it is RQ_POSTPONED, this will cause it to 1817 * be queued on the retry workqueue. */ 1818 __req_mod(req, CONFLICT_RESOLVED, NULL); 1819 } 1820 } 1821 1822 /* 1823 * e_end_block() is called in asender context via drbd_finish_peer_reqs(). 1824 */ 1825 static int e_end_block(struct drbd_work *w, int cancel) 1826 { 1827 struct drbd_peer_request *peer_req = 1828 container_of(w, struct drbd_peer_request, w); 1829 struct drbd_peer_device *peer_device = peer_req->peer_device; 1830 struct drbd_device *device = peer_device->device; 1831 sector_t sector = peer_req->i.sector; 1832 int err = 0, pcmd; 1833 1834 if (peer_req->flags & EE_SEND_WRITE_ACK) { 1835 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1836 pcmd = (device->state.conn >= C_SYNC_SOURCE && 1837 device->state.conn <= C_PAUSED_SYNC_T && 1838 peer_req->flags & EE_MAY_SET_IN_SYNC) ? 1839 P_RS_WRITE_ACK : P_WRITE_ACK; 1840 err = drbd_send_ack(peer_device, pcmd, peer_req); 1841 if (pcmd == P_RS_WRITE_ACK) 1842 drbd_set_in_sync(device, sector, peer_req->i.size); 1843 } else { 1844 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req); 1845 /* we expect it to be marked out of sync anyways... 1846 * maybe assert this? */ 1847 } 1848 dec_unacked(device); 1849 } 1850 /* we delete from the conflict detection hash _after_ we sent out the 1851 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */ 1852 if (peer_req->flags & EE_IN_INTERVAL_TREE) { 1853 spin_lock_irq(&device->resource->req_lock); 1854 D_ASSERT(device, !drbd_interval_empty(&peer_req->i)); 1855 drbd_remove_epoch_entry_interval(device, peer_req); 1856 if (peer_req->flags & EE_RESTART_REQUESTS) 1857 restart_conflicting_writes(device, sector, peer_req->i.size); 1858 spin_unlock_irq(&device->resource->req_lock); 1859 } else 1860 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 1861 1862 drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0)); 1863 1864 return err; 1865 } 1866 1867 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack) 1868 { 1869 struct drbd_peer_request *peer_req = 1870 container_of(w, struct drbd_peer_request, w); 1871 struct drbd_peer_device *peer_device = peer_req->peer_device; 1872 int err; 1873 1874 err = drbd_send_ack(peer_device, ack, peer_req); 1875 dec_unacked(peer_device->device); 1876 1877 return err; 1878 } 1879 1880 static int e_send_superseded(struct drbd_work *w, int unused) 1881 { 1882 return e_send_ack(w, P_SUPERSEDED); 1883 } 1884 1885 static int e_send_retry_write(struct drbd_work *w, int unused) 1886 { 1887 struct drbd_peer_request *peer_req = 1888 container_of(w, struct drbd_peer_request, w); 1889 struct drbd_connection *connection = peer_req->peer_device->connection; 1890 1891 return e_send_ack(w, connection->agreed_pro_version >= 100 ? 1892 P_RETRY_WRITE : P_SUPERSEDED); 1893 } 1894 1895 static bool seq_greater(u32 a, u32 b) 1896 { 1897 /* 1898 * We assume 32-bit wrap-around here. 1899 * For 24-bit wrap-around, we would have to shift: 1900 * a <<= 8; b <<= 8; 1901 */ 1902 return (s32)a - (s32)b > 0; 1903 } 1904 1905 static u32 seq_max(u32 a, u32 b) 1906 { 1907 return seq_greater(a, b) ? a : b; 1908 } 1909 1910 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq) 1911 { 1912 struct drbd_device *device = peer_device->device; 1913 unsigned int newest_peer_seq; 1914 1915 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) { 1916 spin_lock(&device->peer_seq_lock); 1917 newest_peer_seq = seq_max(device->peer_seq, peer_seq); 1918 device->peer_seq = newest_peer_seq; 1919 spin_unlock(&device->peer_seq_lock); 1920 /* wake up only if we actually changed device->peer_seq */ 1921 if (peer_seq == newest_peer_seq) 1922 wake_up(&device->seq_wait); 1923 } 1924 } 1925 1926 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2) 1927 { 1928 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9))); 1929 } 1930 1931 /* maybe change sync_ee into interval trees as well? */ 1932 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req) 1933 { 1934 struct drbd_peer_request *rs_req; 1935 bool rv = 0; 1936 1937 spin_lock_irq(&device->resource->req_lock); 1938 list_for_each_entry(rs_req, &device->sync_ee, w.list) { 1939 if (overlaps(peer_req->i.sector, peer_req->i.size, 1940 rs_req->i.sector, rs_req->i.size)) { 1941 rv = 1; 1942 break; 1943 } 1944 } 1945 spin_unlock_irq(&device->resource->req_lock); 1946 1947 return rv; 1948 } 1949 1950 /* Called from receive_Data. 1951 * Synchronize packets on sock with packets on msock. 1952 * 1953 * This is here so even when a P_DATA packet traveling via sock overtook an Ack 1954 * packet traveling on msock, they are still processed in the order they have 1955 * been sent. 1956 * 1957 * Note: we don't care for Ack packets overtaking P_DATA packets. 1958 * 1959 * In case packet_seq is larger than device->peer_seq number, there are 1960 * outstanding packets on the msock. We wait for them to arrive. 1961 * In case we are the logically next packet, we update device->peer_seq 1962 * ourselves. Correctly handles 32bit wrap around. 1963 * 1964 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second, 1965 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds 1966 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have 1967 * 1<<9 == 512 seconds aka ages for the 32bit wrap around... 1968 * 1969 * returns 0 if we may process the packet, 1970 * -ERESTARTSYS if we were interrupted (by disconnect signal). */ 1971 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq) 1972 { 1973 struct drbd_device *device = peer_device->device; 1974 DEFINE_WAIT(wait); 1975 long timeout; 1976 int ret = 0, tp; 1977 1978 if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) 1979 return 0; 1980 1981 spin_lock(&device->peer_seq_lock); 1982 for (;;) { 1983 if (!seq_greater(peer_seq - 1, device->peer_seq)) { 1984 device->peer_seq = seq_max(device->peer_seq, peer_seq); 1985 break; 1986 } 1987 1988 if (signal_pending(current)) { 1989 ret = -ERESTARTSYS; 1990 break; 1991 } 1992 1993 rcu_read_lock(); 1994 tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries; 1995 rcu_read_unlock(); 1996 1997 if (!tp) 1998 break; 1999 2000 /* Only need to wait if two_primaries is enabled */ 2001 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE); 2002 spin_unlock(&device->peer_seq_lock); 2003 rcu_read_lock(); 2004 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10; 2005 rcu_read_unlock(); 2006 timeout = schedule_timeout(timeout); 2007 spin_lock(&device->peer_seq_lock); 2008 if (!timeout) { 2009 ret = -ETIMEDOUT; 2010 drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n"); 2011 break; 2012 } 2013 } 2014 spin_unlock(&device->peer_seq_lock); 2015 finish_wait(&device->seq_wait, &wait); 2016 return ret; 2017 } 2018 2019 /* see also bio_flags_to_wire() 2020 * DRBD_REQ_*, because we need to semantically map the flags to data packet 2021 * flags and back. We may replicate to other kernel versions. */ 2022 static unsigned long wire_flags_to_bio(u32 dpf) 2023 { 2024 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) | 2025 (dpf & DP_FUA ? REQ_FUA : 0) | 2026 (dpf & DP_FLUSH ? REQ_FLUSH : 0) | 2027 (dpf & DP_DISCARD ? REQ_DISCARD : 0); 2028 } 2029 2030 static void fail_postponed_requests(struct drbd_device *device, sector_t sector, 2031 unsigned int size) 2032 { 2033 struct drbd_interval *i; 2034 2035 repeat: 2036 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 2037 struct drbd_request *req; 2038 struct bio_and_error m; 2039 2040 if (!i->local) 2041 continue; 2042 req = container_of(i, struct drbd_request, i); 2043 if (!(req->rq_state & RQ_POSTPONED)) 2044 continue; 2045 req->rq_state &= ~RQ_POSTPONED; 2046 __req_mod(req, NEG_ACKED, &m); 2047 spin_unlock_irq(&device->resource->req_lock); 2048 if (m.bio) 2049 complete_master_bio(device, &m); 2050 spin_lock_irq(&device->resource->req_lock); 2051 goto repeat; 2052 } 2053 } 2054 2055 static int handle_write_conflicts(struct drbd_device *device, 2056 struct drbd_peer_request *peer_req) 2057 { 2058 struct drbd_connection *connection = peer_req->peer_device->connection; 2059 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags); 2060 sector_t sector = peer_req->i.sector; 2061 const unsigned int size = peer_req->i.size; 2062 struct drbd_interval *i; 2063 bool equal; 2064 int err; 2065 2066 /* 2067 * Inserting the peer request into the write_requests tree will prevent 2068 * new conflicting local requests from being added. 2069 */ 2070 drbd_insert_interval(&device->write_requests, &peer_req->i); 2071 2072 repeat: 2073 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 2074 if (i == &peer_req->i) 2075 continue; 2076 2077 if (!i->local) { 2078 /* 2079 * Our peer has sent a conflicting remote request; this 2080 * should not happen in a two-node setup. Wait for the 2081 * earlier peer request to complete. 2082 */ 2083 err = drbd_wait_misc(device, i); 2084 if (err) 2085 goto out; 2086 goto repeat; 2087 } 2088 2089 equal = i->sector == sector && i->size == size; 2090 if (resolve_conflicts) { 2091 /* 2092 * If the peer request is fully contained within the 2093 * overlapping request, it can be considered overwritten 2094 * and thus superseded; otherwise, it will be retried 2095 * once all overlapping requests have completed. 2096 */ 2097 bool superseded = i->sector <= sector && i->sector + 2098 (i->size >> 9) >= sector + (size >> 9); 2099 2100 if (!equal) 2101 drbd_alert(device, "Concurrent writes detected: " 2102 "local=%llus +%u, remote=%llus +%u, " 2103 "assuming %s came first\n", 2104 (unsigned long long)i->sector, i->size, 2105 (unsigned long long)sector, size, 2106 superseded ? "local" : "remote"); 2107 2108 inc_unacked(device); 2109 peer_req->w.cb = superseded ? e_send_superseded : 2110 e_send_retry_write; 2111 list_add_tail(&peer_req->w.list, &device->done_ee); 2112 wake_asender(connection); 2113 2114 err = -ENOENT; 2115 goto out; 2116 } else { 2117 struct drbd_request *req = 2118 container_of(i, struct drbd_request, i); 2119 2120 if (!equal) 2121 drbd_alert(device, "Concurrent writes detected: " 2122 "local=%llus +%u, remote=%llus +%u\n", 2123 (unsigned long long)i->sector, i->size, 2124 (unsigned long long)sector, size); 2125 2126 if (req->rq_state & RQ_LOCAL_PENDING || 2127 !(req->rq_state & RQ_POSTPONED)) { 2128 /* 2129 * Wait for the node with the discard flag to 2130 * decide if this request has been superseded 2131 * or needs to be retried. 2132 * Requests that have been superseded will 2133 * disappear from the write_requests tree. 2134 * 2135 * In addition, wait for the conflicting 2136 * request to finish locally before submitting 2137 * the conflicting peer request. 2138 */ 2139 err = drbd_wait_misc(device, &req->i); 2140 if (err) { 2141 _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD); 2142 fail_postponed_requests(device, sector, size); 2143 goto out; 2144 } 2145 goto repeat; 2146 } 2147 /* 2148 * Remember to restart the conflicting requests after 2149 * the new peer request has completed. 2150 */ 2151 peer_req->flags |= EE_RESTART_REQUESTS; 2152 } 2153 } 2154 err = 0; 2155 2156 out: 2157 if (err) 2158 drbd_remove_epoch_entry_interval(device, peer_req); 2159 return err; 2160 } 2161 2162 /* mirrored write */ 2163 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi) 2164 { 2165 struct drbd_peer_device *peer_device; 2166 struct drbd_device *device; 2167 sector_t sector; 2168 struct drbd_peer_request *peer_req; 2169 struct p_data *p = pi->data; 2170 u32 peer_seq = be32_to_cpu(p->seq_num); 2171 int rw = WRITE; 2172 u32 dp_flags; 2173 int err, tp; 2174 2175 peer_device = conn_peer_device(connection, pi->vnr); 2176 if (!peer_device) 2177 return -EIO; 2178 device = peer_device->device; 2179 2180 if (!get_ldev(device)) { 2181 int err2; 2182 2183 err = wait_for_and_update_peer_seq(peer_device, peer_seq); 2184 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size); 2185 atomic_inc(&connection->current_epoch->epoch_size); 2186 err2 = drbd_drain_block(peer_device, pi->size); 2187 if (!err) 2188 err = err2; 2189 return err; 2190 } 2191 2192 /* 2193 * Corresponding put_ldev done either below (on various errors), or in 2194 * drbd_peer_request_endio, if we successfully submit the data at the 2195 * end of this function. 2196 */ 2197 2198 sector = be64_to_cpu(p->sector); 2199 peer_req = read_in_block(peer_device, p->block_id, sector, pi->size); 2200 if (!peer_req) { 2201 put_ldev(device); 2202 return -EIO; 2203 } 2204 2205 peer_req->w.cb = e_end_block; 2206 2207 dp_flags = be32_to_cpu(p->dp_flags); 2208 rw |= wire_flags_to_bio(dp_flags); 2209 if (peer_req->pages == NULL) { 2210 D_ASSERT(device, peer_req->i.size == 0); 2211 D_ASSERT(device, dp_flags & DP_FLUSH); 2212 } 2213 2214 if (dp_flags & DP_MAY_SET_IN_SYNC) 2215 peer_req->flags |= EE_MAY_SET_IN_SYNC; 2216 2217 spin_lock(&connection->epoch_lock); 2218 peer_req->epoch = connection->current_epoch; 2219 atomic_inc(&peer_req->epoch->epoch_size); 2220 atomic_inc(&peer_req->epoch->active); 2221 spin_unlock(&connection->epoch_lock); 2222 2223 rcu_read_lock(); 2224 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries; 2225 rcu_read_unlock(); 2226 if (tp) { 2227 peer_req->flags |= EE_IN_INTERVAL_TREE; 2228 err = wait_for_and_update_peer_seq(peer_device, peer_seq); 2229 if (err) 2230 goto out_interrupted; 2231 spin_lock_irq(&device->resource->req_lock); 2232 err = handle_write_conflicts(device, peer_req); 2233 if (err) { 2234 spin_unlock_irq(&device->resource->req_lock); 2235 if (err == -ENOENT) { 2236 put_ldev(device); 2237 return 0; 2238 } 2239 goto out_interrupted; 2240 } 2241 } else { 2242 update_peer_seq(peer_device, peer_seq); 2243 spin_lock_irq(&device->resource->req_lock); 2244 } 2245 list_add(&peer_req->w.list, &device->active_ee); 2246 spin_unlock_irq(&device->resource->req_lock); 2247 2248 if (device->state.conn == C_SYNC_TARGET) 2249 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req)); 2250 2251 if (peer_device->connection->agreed_pro_version < 100) { 2252 rcu_read_lock(); 2253 switch (rcu_dereference(peer_device->connection->net_conf)->wire_protocol) { 2254 case DRBD_PROT_C: 2255 dp_flags |= DP_SEND_WRITE_ACK; 2256 break; 2257 case DRBD_PROT_B: 2258 dp_flags |= DP_SEND_RECEIVE_ACK; 2259 break; 2260 } 2261 rcu_read_unlock(); 2262 } 2263 2264 if (dp_flags & DP_SEND_WRITE_ACK) { 2265 peer_req->flags |= EE_SEND_WRITE_ACK; 2266 inc_unacked(device); 2267 /* corresponding dec_unacked() in e_end_block() 2268 * respective _drbd_clear_done_ee */ 2269 } 2270 2271 if (dp_flags & DP_SEND_RECEIVE_ACK) { 2272 /* I really don't like it that the receiver thread 2273 * sends on the msock, but anyways */ 2274 drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req); 2275 } 2276 2277 if (device->state.pdsk < D_INCONSISTENT) { 2278 /* In case we have the only disk of the cluster, */ 2279 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size); 2280 peer_req->flags |= EE_CALL_AL_COMPLETE_IO; 2281 peer_req->flags &= ~EE_MAY_SET_IN_SYNC; 2282 drbd_al_begin_io(device, &peer_req->i, true); 2283 } 2284 2285 err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR); 2286 if (!err) 2287 return 0; 2288 2289 /* don't care for the reason here */ 2290 drbd_err(device, "submit failed, triggering re-connect\n"); 2291 spin_lock_irq(&device->resource->req_lock); 2292 list_del(&peer_req->w.list); 2293 drbd_remove_epoch_entry_interval(device, peer_req); 2294 spin_unlock_irq(&device->resource->req_lock); 2295 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) 2296 drbd_al_complete_io(device, &peer_req->i); 2297 2298 out_interrupted: 2299 drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP); 2300 put_ldev(device); 2301 drbd_free_peer_req(device, peer_req); 2302 return err; 2303 } 2304 2305 /* We may throttle resync, if the lower device seems to be busy, 2306 * and current sync rate is above c_min_rate. 2307 * 2308 * To decide whether or not the lower device is busy, we use a scheme similar 2309 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant" 2310 * (more than 64 sectors) of activity we cannot account for with our own resync 2311 * activity, it obviously is "busy". 2312 * 2313 * The current sync rate used here uses only the most recent two step marks, 2314 * to have a short time average so we can react faster. 2315 */ 2316 int drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector) 2317 { 2318 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk; 2319 unsigned long db, dt, dbdt; 2320 struct lc_element *tmp; 2321 int curr_events; 2322 int throttle = 0; 2323 unsigned int c_min_rate; 2324 2325 rcu_read_lock(); 2326 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate; 2327 rcu_read_unlock(); 2328 2329 /* feature disabled? */ 2330 if (c_min_rate == 0) 2331 return 0; 2332 2333 spin_lock_irq(&device->al_lock); 2334 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector)); 2335 if (tmp) { 2336 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce); 2337 if (test_bit(BME_PRIORITY, &bm_ext->flags)) { 2338 spin_unlock_irq(&device->al_lock); 2339 return 0; 2340 } 2341 /* Do not slow down if app IO is already waiting for this extent */ 2342 } 2343 spin_unlock_irq(&device->al_lock); 2344 2345 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) + 2346 (int)part_stat_read(&disk->part0, sectors[1]) - 2347 atomic_read(&device->rs_sect_ev); 2348 2349 if (!device->rs_last_events || curr_events - device->rs_last_events > 64) { 2350 unsigned long rs_left; 2351 int i; 2352 2353 device->rs_last_events = curr_events; 2354 2355 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP, 2356 * approx. */ 2357 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS; 2358 2359 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T) 2360 rs_left = device->ov_left; 2361 else 2362 rs_left = drbd_bm_total_weight(device) - device->rs_failed; 2363 2364 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ; 2365 if (!dt) 2366 dt++; 2367 db = device->rs_mark_left[i] - rs_left; 2368 dbdt = Bit2KB(db/dt); 2369 2370 if (dbdt > c_min_rate) 2371 throttle = 1; 2372 } 2373 return throttle; 2374 } 2375 2376 2377 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi) 2378 { 2379 struct drbd_peer_device *peer_device; 2380 struct drbd_device *device; 2381 sector_t sector; 2382 sector_t capacity; 2383 struct drbd_peer_request *peer_req; 2384 struct digest_info *di = NULL; 2385 int size, verb; 2386 unsigned int fault_type; 2387 struct p_block_req *p = pi->data; 2388 2389 peer_device = conn_peer_device(connection, pi->vnr); 2390 if (!peer_device) 2391 return -EIO; 2392 device = peer_device->device; 2393 capacity = drbd_get_capacity(device->this_bdev); 2394 2395 sector = be64_to_cpu(p->sector); 2396 size = be32_to_cpu(p->blksize); 2397 2398 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) { 2399 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__, 2400 (unsigned long long)sector, size); 2401 return -EINVAL; 2402 } 2403 if (sector + (size>>9) > capacity) { 2404 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__, 2405 (unsigned long long)sector, size); 2406 return -EINVAL; 2407 } 2408 2409 if (!get_ldev_if_state(device, D_UP_TO_DATE)) { 2410 verb = 1; 2411 switch (pi->cmd) { 2412 case P_DATA_REQUEST: 2413 drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p); 2414 break; 2415 case P_RS_DATA_REQUEST: 2416 case P_CSUM_RS_REQUEST: 2417 case P_OV_REQUEST: 2418 drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p); 2419 break; 2420 case P_OV_REPLY: 2421 verb = 0; 2422 dec_rs_pending(device); 2423 drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC); 2424 break; 2425 default: 2426 BUG(); 2427 } 2428 if (verb && __ratelimit(&drbd_ratelimit_state)) 2429 drbd_err(device, "Can not satisfy peer's read request, " 2430 "no local data.\n"); 2431 2432 /* drain possibly payload */ 2433 return drbd_drain_block(peer_device, pi->size); 2434 } 2435 2436 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD 2437 * "criss-cross" setup, that might cause write-out on some other DRBD, 2438 * which in turn might block on the other node at this very place. */ 2439 peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size, GFP_NOIO); 2440 if (!peer_req) { 2441 put_ldev(device); 2442 return -ENOMEM; 2443 } 2444 2445 switch (pi->cmd) { 2446 case P_DATA_REQUEST: 2447 peer_req->w.cb = w_e_end_data_req; 2448 fault_type = DRBD_FAULT_DT_RD; 2449 /* application IO, don't drbd_rs_begin_io */ 2450 goto submit; 2451 2452 case P_RS_DATA_REQUEST: 2453 peer_req->w.cb = w_e_end_rsdata_req; 2454 fault_type = DRBD_FAULT_RS_RD; 2455 /* used in the sector offset progress display */ 2456 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 2457 break; 2458 2459 case P_OV_REPLY: 2460 case P_CSUM_RS_REQUEST: 2461 fault_type = DRBD_FAULT_RS_RD; 2462 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO); 2463 if (!di) 2464 goto out_free_e; 2465 2466 di->digest_size = pi->size; 2467 di->digest = (((char *)di)+sizeof(struct digest_info)); 2468 2469 peer_req->digest = di; 2470 peer_req->flags |= EE_HAS_DIGEST; 2471 2472 if (drbd_recv_all(peer_device->connection, di->digest, pi->size)) 2473 goto out_free_e; 2474 2475 if (pi->cmd == P_CSUM_RS_REQUEST) { 2476 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89); 2477 peer_req->w.cb = w_e_end_csum_rs_req; 2478 /* used in the sector offset progress display */ 2479 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 2480 } else if (pi->cmd == P_OV_REPLY) { 2481 /* track progress, we may need to throttle */ 2482 atomic_add(size >> 9, &device->rs_sect_in); 2483 peer_req->w.cb = w_e_end_ov_reply; 2484 dec_rs_pending(device); 2485 /* drbd_rs_begin_io done when we sent this request, 2486 * but accounting still needs to be done. */ 2487 goto submit_for_resync; 2488 } 2489 break; 2490 2491 case P_OV_REQUEST: 2492 if (device->ov_start_sector == ~(sector_t)0 && 2493 peer_device->connection->agreed_pro_version >= 90) { 2494 unsigned long now = jiffies; 2495 int i; 2496 device->ov_start_sector = sector; 2497 device->ov_position = sector; 2498 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector); 2499 device->rs_total = device->ov_left; 2500 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 2501 device->rs_mark_left[i] = device->ov_left; 2502 device->rs_mark_time[i] = now; 2503 } 2504 drbd_info(device, "Online Verify start sector: %llu\n", 2505 (unsigned long long)sector); 2506 } 2507 peer_req->w.cb = w_e_end_ov_req; 2508 fault_type = DRBD_FAULT_RS_RD; 2509 break; 2510 2511 default: 2512 BUG(); 2513 } 2514 2515 /* Throttle, drbd_rs_begin_io and submit should become asynchronous 2516 * wrt the receiver, but it is not as straightforward as it may seem. 2517 * Various places in the resync start and stop logic assume resync 2518 * requests are processed in order, requeuing this on the worker thread 2519 * introduces a bunch of new code for synchronization between threads. 2520 * 2521 * Unlimited throttling before drbd_rs_begin_io may stall the resync 2522 * "forever", throttling after drbd_rs_begin_io will lock that extent 2523 * for application writes for the same time. For now, just throttle 2524 * here, where the rest of the code expects the receiver to sleep for 2525 * a while, anyways. 2526 */ 2527 2528 /* Throttle before drbd_rs_begin_io, as that locks out application IO; 2529 * this defers syncer requests for some time, before letting at least 2530 * on request through. The resync controller on the receiving side 2531 * will adapt to the incoming rate accordingly. 2532 * 2533 * We cannot throttle here if remote is Primary/SyncTarget: 2534 * we would also throttle its application reads. 2535 * In that case, throttling is done on the SyncTarget only. 2536 */ 2537 if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector)) 2538 schedule_timeout_uninterruptible(HZ/10); 2539 if (drbd_rs_begin_io(device, sector)) 2540 goto out_free_e; 2541 2542 submit_for_resync: 2543 atomic_add(size >> 9, &device->rs_sect_ev); 2544 2545 submit: 2546 inc_unacked(device); 2547 spin_lock_irq(&device->resource->req_lock); 2548 list_add_tail(&peer_req->w.list, &device->read_ee); 2549 spin_unlock_irq(&device->resource->req_lock); 2550 2551 if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0) 2552 return 0; 2553 2554 /* don't care for the reason here */ 2555 drbd_err(device, "submit failed, triggering re-connect\n"); 2556 spin_lock_irq(&device->resource->req_lock); 2557 list_del(&peer_req->w.list); 2558 spin_unlock_irq(&device->resource->req_lock); 2559 /* no drbd_rs_complete_io(), we are dropping the connection anyways */ 2560 2561 out_free_e: 2562 put_ldev(device); 2563 drbd_free_peer_req(device, peer_req); 2564 return -EIO; 2565 } 2566 2567 /** 2568 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries 2569 */ 2570 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local) 2571 { 2572 struct drbd_device *device = peer_device->device; 2573 int self, peer, rv = -100; 2574 unsigned long ch_self, ch_peer; 2575 enum drbd_after_sb_p after_sb_0p; 2576 2577 self = device->ldev->md.uuid[UI_BITMAP] & 1; 2578 peer = device->p_uuid[UI_BITMAP] & 1; 2579 2580 ch_peer = device->p_uuid[UI_SIZE]; 2581 ch_self = device->comm_bm_set; 2582 2583 rcu_read_lock(); 2584 after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p; 2585 rcu_read_unlock(); 2586 switch (after_sb_0p) { 2587 case ASB_CONSENSUS: 2588 case ASB_DISCARD_SECONDARY: 2589 case ASB_CALL_HELPER: 2590 case ASB_VIOLENTLY: 2591 drbd_err(device, "Configuration error.\n"); 2592 break; 2593 case ASB_DISCONNECT: 2594 break; 2595 case ASB_DISCARD_YOUNGER_PRI: 2596 if (self == 0 && peer == 1) { 2597 rv = -1; 2598 break; 2599 } 2600 if (self == 1 && peer == 0) { 2601 rv = 1; 2602 break; 2603 } 2604 /* Else fall through to one of the other strategies... */ 2605 case ASB_DISCARD_OLDER_PRI: 2606 if (self == 0 && peer == 1) { 2607 rv = 1; 2608 break; 2609 } 2610 if (self == 1 && peer == 0) { 2611 rv = -1; 2612 break; 2613 } 2614 /* Else fall through to one of the other strategies... */ 2615 drbd_warn(device, "Discard younger/older primary did not find a decision\n" 2616 "Using discard-least-changes instead\n"); 2617 case ASB_DISCARD_ZERO_CHG: 2618 if (ch_peer == 0 && ch_self == 0) { 2619 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) 2620 ? -1 : 1; 2621 break; 2622 } else { 2623 if (ch_peer == 0) { rv = 1; break; } 2624 if (ch_self == 0) { rv = -1; break; } 2625 } 2626 if (after_sb_0p == ASB_DISCARD_ZERO_CHG) 2627 break; 2628 case ASB_DISCARD_LEAST_CHG: 2629 if (ch_self < ch_peer) 2630 rv = -1; 2631 else if (ch_self > ch_peer) 2632 rv = 1; 2633 else /* ( ch_self == ch_peer ) */ 2634 /* Well, then use something else. */ 2635 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) 2636 ? -1 : 1; 2637 break; 2638 case ASB_DISCARD_LOCAL: 2639 rv = -1; 2640 break; 2641 case ASB_DISCARD_REMOTE: 2642 rv = 1; 2643 } 2644 2645 return rv; 2646 } 2647 2648 /** 2649 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary 2650 */ 2651 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local) 2652 { 2653 struct drbd_device *device = peer_device->device; 2654 int hg, rv = -100; 2655 enum drbd_after_sb_p after_sb_1p; 2656 2657 rcu_read_lock(); 2658 after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p; 2659 rcu_read_unlock(); 2660 switch (after_sb_1p) { 2661 case ASB_DISCARD_YOUNGER_PRI: 2662 case ASB_DISCARD_OLDER_PRI: 2663 case ASB_DISCARD_LEAST_CHG: 2664 case ASB_DISCARD_LOCAL: 2665 case ASB_DISCARD_REMOTE: 2666 case ASB_DISCARD_ZERO_CHG: 2667 drbd_err(device, "Configuration error.\n"); 2668 break; 2669 case ASB_DISCONNECT: 2670 break; 2671 case ASB_CONSENSUS: 2672 hg = drbd_asb_recover_0p(peer_device); 2673 if (hg == -1 && device->state.role == R_SECONDARY) 2674 rv = hg; 2675 if (hg == 1 && device->state.role == R_PRIMARY) 2676 rv = hg; 2677 break; 2678 case ASB_VIOLENTLY: 2679 rv = drbd_asb_recover_0p(peer_device); 2680 break; 2681 case ASB_DISCARD_SECONDARY: 2682 return device->state.role == R_PRIMARY ? 1 : -1; 2683 case ASB_CALL_HELPER: 2684 hg = drbd_asb_recover_0p(peer_device); 2685 if (hg == -1 && device->state.role == R_PRIMARY) { 2686 enum drbd_state_rv rv2; 2687 2688 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE, 2689 * we might be here in C_WF_REPORT_PARAMS which is transient. 2690 * we do not need to wait for the after state change work either. */ 2691 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY)); 2692 if (rv2 != SS_SUCCESS) { 2693 drbd_khelper(device, "pri-lost-after-sb"); 2694 } else { 2695 drbd_warn(device, "Successfully gave up primary role.\n"); 2696 rv = hg; 2697 } 2698 } else 2699 rv = hg; 2700 } 2701 2702 return rv; 2703 } 2704 2705 /** 2706 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries 2707 */ 2708 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local) 2709 { 2710 struct drbd_device *device = peer_device->device; 2711 int hg, rv = -100; 2712 enum drbd_after_sb_p after_sb_2p; 2713 2714 rcu_read_lock(); 2715 after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p; 2716 rcu_read_unlock(); 2717 switch (after_sb_2p) { 2718 case ASB_DISCARD_YOUNGER_PRI: 2719 case ASB_DISCARD_OLDER_PRI: 2720 case ASB_DISCARD_LEAST_CHG: 2721 case ASB_DISCARD_LOCAL: 2722 case ASB_DISCARD_REMOTE: 2723 case ASB_CONSENSUS: 2724 case ASB_DISCARD_SECONDARY: 2725 case ASB_DISCARD_ZERO_CHG: 2726 drbd_err(device, "Configuration error.\n"); 2727 break; 2728 case ASB_VIOLENTLY: 2729 rv = drbd_asb_recover_0p(peer_device); 2730 break; 2731 case ASB_DISCONNECT: 2732 break; 2733 case ASB_CALL_HELPER: 2734 hg = drbd_asb_recover_0p(peer_device); 2735 if (hg == -1) { 2736 enum drbd_state_rv rv2; 2737 2738 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE, 2739 * we might be here in C_WF_REPORT_PARAMS which is transient. 2740 * we do not need to wait for the after state change work either. */ 2741 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY)); 2742 if (rv2 != SS_SUCCESS) { 2743 drbd_khelper(device, "pri-lost-after-sb"); 2744 } else { 2745 drbd_warn(device, "Successfully gave up primary role.\n"); 2746 rv = hg; 2747 } 2748 } else 2749 rv = hg; 2750 } 2751 2752 return rv; 2753 } 2754 2755 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid, 2756 u64 bits, u64 flags) 2757 { 2758 if (!uuid) { 2759 drbd_info(device, "%s uuid info vanished while I was looking!\n", text); 2760 return; 2761 } 2762 drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n", 2763 text, 2764 (unsigned long long)uuid[UI_CURRENT], 2765 (unsigned long long)uuid[UI_BITMAP], 2766 (unsigned long long)uuid[UI_HISTORY_START], 2767 (unsigned long long)uuid[UI_HISTORY_END], 2768 (unsigned long long)bits, 2769 (unsigned long long)flags); 2770 } 2771 2772 /* 2773 100 after split brain try auto recover 2774 2 C_SYNC_SOURCE set BitMap 2775 1 C_SYNC_SOURCE use BitMap 2776 0 no Sync 2777 -1 C_SYNC_TARGET use BitMap 2778 -2 C_SYNC_TARGET set BitMap 2779 -100 after split brain, disconnect 2780 -1000 unrelated data 2781 -1091 requires proto 91 2782 -1096 requires proto 96 2783 */ 2784 static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local) 2785 { 2786 u64 self, peer; 2787 int i, j; 2788 2789 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1); 2790 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 2791 2792 *rule_nr = 10; 2793 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED) 2794 return 0; 2795 2796 *rule_nr = 20; 2797 if ((self == UUID_JUST_CREATED || self == (u64)0) && 2798 peer != UUID_JUST_CREATED) 2799 return -2; 2800 2801 *rule_nr = 30; 2802 if (self != UUID_JUST_CREATED && 2803 (peer == UUID_JUST_CREATED || peer == (u64)0)) 2804 return 2; 2805 2806 if (self == peer) { 2807 int rct, dc; /* roles at crash time */ 2808 2809 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) { 2810 2811 if (first_peer_device(device)->connection->agreed_pro_version < 91) 2812 return -1091; 2813 2814 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) && 2815 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) { 2816 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n"); 2817 drbd_uuid_move_history(device); 2818 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP]; 2819 device->ldev->md.uuid[UI_BITMAP] = 0; 2820 2821 drbd_uuid_dump(device, "self", device->ldev->md.uuid, 2822 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0); 2823 *rule_nr = 34; 2824 } else { 2825 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n"); 2826 *rule_nr = 36; 2827 } 2828 2829 return 1; 2830 } 2831 2832 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) { 2833 2834 if (first_peer_device(device)->connection->agreed_pro_version < 91) 2835 return -1091; 2836 2837 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) && 2838 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) { 2839 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n"); 2840 2841 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START]; 2842 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP]; 2843 device->p_uuid[UI_BITMAP] = 0UL; 2844 2845 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 2846 *rule_nr = 35; 2847 } else { 2848 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n"); 2849 *rule_nr = 37; 2850 } 2851 2852 return -1; 2853 } 2854 2855 /* Common power [off|failure] */ 2856 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) + 2857 (device->p_uuid[UI_FLAGS] & 2); 2858 /* lowest bit is set when we were primary, 2859 * next bit (weight 2) is set when peer was primary */ 2860 *rule_nr = 40; 2861 2862 switch (rct) { 2863 case 0: /* !self_pri && !peer_pri */ return 0; 2864 case 1: /* self_pri && !peer_pri */ return 1; 2865 case 2: /* !self_pri && peer_pri */ return -1; 2866 case 3: /* self_pri && peer_pri */ 2867 dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags); 2868 return dc ? -1 : 1; 2869 } 2870 } 2871 2872 *rule_nr = 50; 2873 peer = device->p_uuid[UI_BITMAP] & ~((u64)1); 2874 if (self == peer) 2875 return -1; 2876 2877 *rule_nr = 51; 2878 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1); 2879 if (self == peer) { 2880 if (first_peer_device(device)->connection->agreed_pro_version < 96 ? 2881 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == 2882 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) : 2883 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) { 2884 /* The last P_SYNC_UUID did not get though. Undo the last start of 2885 resync as sync source modifications of the peer's UUIDs. */ 2886 2887 if (first_peer_device(device)->connection->agreed_pro_version < 91) 2888 return -1091; 2889 2890 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START]; 2891 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1]; 2892 2893 drbd_info(device, "Lost last syncUUID packet, corrected:\n"); 2894 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 2895 2896 return -1; 2897 } 2898 } 2899 2900 *rule_nr = 60; 2901 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1); 2902 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 2903 peer = device->p_uuid[i] & ~((u64)1); 2904 if (self == peer) 2905 return -2; 2906 } 2907 2908 *rule_nr = 70; 2909 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1); 2910 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 2911 if (self == peer) 2912 return 1; 2913 2914 *rule_nr = 71; 2915 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1); 2916 if (self == peer) { 2917 if (first_peer_device(device)->connection->agreed_pro_version < 96 ? 2918 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == 2919 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) : 2920 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) { 2921 /* The last P_SYNC_UUID did not get though. Undo the last start of 2922 resync as sync source modifications of our UUIDs. */ 2923 2924 if (first_peer_device(device)->connection->agreed_pro_version < 91) 2925 return -1091; 2926 2927 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]); 2928 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]); 2929 2930 drbd_info(device, "Last syncUUID did not get through, corrected:\n"); 2931 drbd_uuid_dump(device, "self", device->ldev->md.uuid, 2932 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0); 2933 2934 return 1; 2935 } 2936 } 2937 2938 2939 *rule_nr = 80; 2940 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 2941 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 2942 self = device->ldev->md.uuid[i] & ~((u64)1); 2943 if (self == peer) 2944 return 2; 2945 } 2946 2947 *rule_nr = 90; 2948 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1); 2949 peer = device->p_uuid[UI_BITMAP] & ~((u64)1); 2950 if (self == peer && self != ((u64)0)) 2951 return 100; 2952 2953 *rule_nr = 100; 2954 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 2955 self = device->ldev->md.uuid[i] & ~((u64)1); 2956 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) { 2957 peer = device->p_uuid[j] & ~((u64)1); 2958 if (self == peer) 2959 return -100; 2960 } 2961 } 2962 2963 return -1000; 2964 } 2965 2966 /* drbd_sync_handshake() returns the new conn state on success, or 2967 CONN_MASK (-1) on failure. 2968 */ 2969 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device, 2970 enum drbd_role peer_role, 2971 enum drbd_disk_state peer_disk) __must_hold(local) 2972 { 2973 struct drbd_device *device = peer_device->device; 2974 enum drbd_conns rv = C_MASK; 2975 enum drbd_disk_state mydisk; 2976 struct net_conf *nc; 2977 int hg, rule_nr, rr_conflict, tentative; 2978 2979 mydisk = device->state.disk; 2980 if (mydisk == D_NEGOTIATING) 2981 mydisk = device->new_state_tmp.disk; 2982 2983 drbd_info(device, "drbd_sync_handshake:\n"); 2984 2985 spin_lock_irq(&device->ldev->md.uuid_lock); 2986 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0); 2987 drbd_uuid_dump(device, "peer", device->p_uuid, 2988 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 2989 2990 hg = drbd_uuid_compare(device, &rule_nr); 2991 spin_unlock_irq(&device->ldev->md.uuid_lock); 2992 2993 drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr); 2994 2995 if (hg == -1000) { 2996 drbd_alert(device, "Unrelated data, aborting!\n"); 2997 return C_MASK; 2998 } 2999 if (hg < -1000) { 3000 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000); 3001 return C_MASK; 3002 } 3003 3004 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) || 3005 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) { 3006 int f = (hg == -100) || abs(hg) == 2; 3007 hg = mydisk > D_INCONSISTENT ? 1 : -1; 3008 if (f) 3009 hg = hg*2; 3010 drbd_info(device, "Becoming sync %s due to disk states.\n", 3011 hg > 0 ? "source" : "target"); 3012 } 3013 3014 if (abs(hg) == 100) 3015 drbd_khelper(device, "initial-split-brain"); 3016 3017 rcu_read_lock(); 3018 nc = rcu_dereference(peer_device->connection->net_conf); 3019 3020 if (hg == 100 || (hg == -100 && nc->always_asbp)) { 3021 int pcount = (device->state.role == R_PRIMARY) 3022 + (peer_role == R_PRIMARY); 3023 int forced = (hg == -100); 3024 3025 switch (pcount) { 3026 case 0: 3027 hg = drbd_asb_recover_0p(peer_device); 3028 break; 3029 case 1: 3030 hg = drbd_asb_recover_1p(peer_device); 3031 break; 3032 case 2: 3033 hg = drbd_asb_recover_2p(peer_device); 3034 break; 3035 } 3036 if (abs(hg) < 100) { 3037 drbd_warn(device, "Split-Brain detected, %d primaries, " 3038 "automatically solved. Sync from %s node\n", 3039 pcount, (hg < 0) ? "peer" : "this"); 3040 if (forced) { 3041 drbd_warn(device, "Doing a full sync, since" 3042 " UUIDs where ambiguous.\n"); 3043 hg = hg*2; 3044 } 3045 } 3046 } 3047 3048 if (hg == -100) { 3049 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1)) 3050 hg = -1; 3051 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1)) 3052 hg = 1; 3053 3054 if (abs(hg) < 100) 3055 drbd_warn(device, "Split-Brain detected, manually solved. " 3056 "Sync from %s node\n", 3057 (hg < 0) ? "peer" : "this"); 3058 } 3059 rr_conflict = nc->rr_conflict; 3060 tentative = nc->tentative; 3061 rcu_read_unlock(); 3062 3063 if (hg == -100) { 3064 /* FIXME this log message is not correct if we end up here 3065 * after an attempted attach on a diskless node. 3066 * We just refuse to attach -- well, we drop the "connection" 3067 * to that disk, in a way... */ 3068 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n"); 3069 drbd_khelper(device, "split-brain"); 3070 return C_MASK; 3071 } 3072 3073 if (hg > 0 && mydisk <= D_INCONSISTENT) { 3074 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n"); 3075 return C_MASK; 3076 } 3077 3078 if (hg < 0 && /* by intention we do not use mydisk here. */ 3079 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) { 3080 switch (rr_conflict) { 3081 case ASB_CALL_HELPER: 3082 drbd_khelper(device, "pri-lost"); 3083 /* fall through */ 3084 case ASB_DISCONNECT: 3085 drbd_err(device, "I shall become SyncTarget, but I am primary!\n"); 3086 return C_MASK; 3087 case ASB_VIOLENTLY: 3088 drbd_warn(device, "Becoming SyncTarget, violating the stable-data" 3089 "assumption\n"); 3090 } 3091 } 3092 3093 if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) { 3094 if (hg == 0) 3095 drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n"); 3096 else 3097 drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.", 3098 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET), 3099 abs(hg) >= 2 ? "full" : "bit-map based"); 3100 return C_MASK; 3101 } 3102 3103 if (abs(hg) >= 2) { 3104 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n"); 3105 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake", 3106 BM_LOCKED_SET_ALLOWED)) 3107 return C_MASK; 3108 } 3109 3110 if (hg > 0) { /* become sync source. */ 3111 rv = C_WF_BITMAP_S; 3112 } else if (hg < 0) { /* become sync target */ 3113 rv = C_WF_BITMAP_T; 3114 } else { 3115 rv = C_CONNECTED; 3116 if (drbd_bm_total_weight(device)) { 3117 drbd_info(device, "No resync, but %lu bits in bitmap!\n", 3118 drbd_bm_total_weight(device)); 3119 } 3120 } 3121 3122 return rv; 3123 } 3124 3125 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer) 3126 { 3127 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */ 3128 if (peer == ASB_DISCARD_REMOTE) 3129 return ASB_DISCARD_LOCAL; 3130 3131 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */ 3132 if (peer == ASB_DISCARD_LOCAL) 3133 return ASB_DISCARD_REMOTE; 3134 3135 /* everything else is valid if they are equal on both sides. */ 3136 return peer; 3137 } 3138 3139 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi) 3140 { 3141 struct p_protocol *p = pi->data; 3142 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p; 3143 int p_proto, p_discard_my_data, p_two_primaries, cf; 3144 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL; 3145 char integrity_alg[SHARED_SECRET_MAX] = ""; 3146 struct crypto_hash *peer_integrity_tfm = NULL; 3147 void *int_dig_in = NULL, *int_dig_vv = NULL; 3148 3149 p_proto = be32_to_cpu(p->protocol); 3150 p_after_sb_0p = be32_to_cpu(p->after_sb_0p); 3151 p_after_sb_1p = be32_to_cpu(p->after_sb_1p); 3152 p_after_sb_2p = be32_to_cpu(p->after_sb_2p); 3153 p_two_primaries = be32_to_cpu(p->two_primaries); 3154 cf = be32_to_cpu(p->conn_flags); 3155 p_discard_my_data = cf & CF_DISCARD_MY_DATA; 3156 3157 if (connection->agreed_pro_version >= 87) { 3158 int err; 3159 3160 if (pi->size > sizeof(integrity_alg)) 3161 return -EIO; 3162 err = drbd_recv_all(connection, integrity_alg, pi->size); 3163 if (err) 3164 return err; 3165 integrity_alg[SHARED_SECRET_MAX - 1] = 0; 3166 } 3167 3168 if (pi->cmd != P_PROTOCOL_UPDATE) { 3169 clear_bit(CONN_DRY_RUN, &connection->flags); 3170 3171 if (cf & CF_DRY_RUN) 3172 set_bit(CONN_DRY_RUN, &connection->flags); 3173 3174 rcu_read_lock(); 3175 nc = rcu_dereference(connection->net_conf); 3176 3177 if (p_proto != nc->wire_protocol) { 3178 drbd_err(connection, "incompatible %s settings\n", "protocol"); 3179 goto disconnect_rcu_unlock; 3180 } 3181 3182 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) { 3183 drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri"); 3184 goto disconnect_rcu_unlock; 3185 } 3186 3187 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) { 3188 drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri"); 3189 goto disconnect_rcu_unlock; 3190 } 3191 3192 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) { 3193 drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri"); 3194 goto disconnect_rcu_unlock; 3195 } 3196 3197 if (p_discard_my_data && nc->discard_my_data) { 3198 drbd_err(connection, "incompatible %s settings\n", "discard-my-data"); 3199 goto disconnect_rcu_unlock; 3200 } 3201 3202 if (p_two_primaries != nc->two_primaries) { 3203 drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries"); 3204 goto disconnect_rcu_unlock; 3205 } 3206 3207 if (strcmp(integrity_alg, nc->integrity_alg)) { 3208 drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg"); 3209 goto disconnect_rcu_unlock; 3210 } 3211 3212 rcu_read_unlock(); 3213 } 3214 3215 if (integrity_alg[0]) { 3216 int hash_size; 3217 3218 /* 3219 * We can only change the peer data integrity algorithm 3220 * here. Changing our own data integrity algorithm 3221 * requires that we send a P_PROTOCOL_UPDATE packet at 3222 * the same time; otherwise, the peer has no way to 3223 * tell between which packets the algorithm should 3224 * change. 3225 */ 3226 3227 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC); 3228 if (!peer_integrity_tfm) { 3229 drbd_err(connection, "peer data-integrity-alg %s not supported\n", 3230 integrity_alg); 3231 goto disconnect; 3232 } 3233 3234 hash_size = crypto_hash_digestsize(peer_integrity_tfm); 3235 int_dig_in = kmalloc(hash_size, GFP_KERNEL); 3236 int_dig_vv = kmalloc(hash_size, GFP_KERNEL); 3237 if (!(int_dig_in && int_dig_vv)) { 3238 drbd_err(connection, "Allocation of buffers for data integrity checking failed\n"); 3239 goto disconnect; 3240 } 3241 } 3242 3243 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL); 3244 if (!new_net_conf) { 3245 drbd_err(connection, "Allocation of new net_conf failed\n"); 3246 goto disconnect; 3247 } 3248 3249 mutex_lock(&connection->data.mutex); 3250 mutex_lock(&connection->resource->conf_update); 3251 old_net_conf = connection->net_conf; 3252 *new_net_conf = *old_net_conf; 3253 3254 new_net_conf->wire_protocol = p_proto; 3255 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p); 3256 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p); 3257 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p); 3258 new_net_conf->two_primaries = p_two_primaries; 3259 3260 rcu_assign_pointer(connection->net_conf, new_net_conf); 3261 mutex_unlock(&connection->resource->conf_update); 3262 mutex_unlock(&connection->data.mutex); 3263 3264 crypto_free_hash(connection->peer_integrity_tfm); 3265 kfree(connection->int_dig_in); 3266 kfree(connection->int_dig_vv); 3267 connection->peer_integrity_tfm = peer_integrity_tfm; 3268 connection->int_dig_in = int_dig_in; 3269 connection->int_dig_vv = int_dig_vv; 3270 3271 if (strcmp(old_net_conf->integrity_alg, integrity_alg)) 3272 drbd_info(connection, "peer data-integrity-alg: %s\n", 3273 integrity_alg[0] ? integrity_alg : "(none)"); 3274 3275 synchronize_rcu(); 3276 kfree(old_net_conf); 3277 return 0; 3278 3279 disconnect_rcu_unlock: 3280 rcu_read_unlock(); 3281 disconnect: 3282 crypto_free_hash(peer_integrity_tfm); 3283 kfree(int_dig_in); 3284 kfree(int_dig_vv); 3285 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 3286 return -EIO; 3287 } 3288 3289 /* helper function 3290 * input: alg name, feature name 3291 * return: NULL (alg name was "") 3292 * ERR_PTR(error) if something goes wrong 3293 * or the crypto hash ptr, if it worked out ok. */ 3294 static 3295 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device, 3296 const char *alg, const char *name) 3297 { 3298 struct crypto_hash *tfm; 3299 3300 if (!alg[0]) 3301 return NULL; 3302 3303 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC); 3304 if (IS_ERR(tfm)) { 3305 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n", 3306 alg, name, PTR_ERR(tfm)); 3307 return tfm; 3308 } 3309 return tfm; 3310 } 3311 3312 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi) 3313 { 3314 void *buffer = connection->data.rbuf; 3315 int size = pi->size; 3316 3317 while (size) { 3318 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE); 3319 s = drbd_recv(connection, buffer, s); 3320 if (s <= 0) { 3321 if (s < 0) 3322 return s; 3323 break; 3324 } 3325 size -= s; 3326 } 3327 if (size) 3328 return -EIO; 3329 return 0; 3330 } 3331 3332 /* 3333 * config_unknown_volume - device configuration command for unknown volume 3334 * 3335 * When a device is added to an existing connection, the node on which the 3336 * device is added first will send configuration commands to its peer but the 3337 * peer will not know about the device yet. It will warn and ignore these 3338 * commands. Once the device is added on the second node, the second node will 3339 * send the same device configuration commands, but in the other direction. 3340 * 3341 * (We can also end up here if drbd is misconfigured.) 3342 */ 3343 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi) 3344 { 3345 drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n", 3346 cmdname(pi->cmd), pi->vnr); 3347 return ignore_remaining_packet(connection, pi); 3348 } 3349 3350 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi) 3351 { 3352 struct drbd_peer_device *peer_device; 3353 struct drbd_device *device; 3354 struct p_rs_param_95 *p; 3355 unsigned int header_size, data_size, exp_max_sz; 3356 struct crypto_hash *verify_tfm = NULL; 3357 struct crypto_hash *csums_tfm = NULL; 3358 struct net_conf *old_net_conf, *new_net_conf = NULL; 3359 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL; 3360 const int apv = connection->agreed_pro_version; 3361 struct fifo_buffer *old_plan = NULL, *new_plan = NULL; 3362 int fifo_size = 0; 3363 int err; 3364 3365 peer_device = conn_peer_device(connection, pi->vnr); 3366 if (!peer_device) 3367 return config_unknown_volume(connection, pi); 3368 device = peer_device->device; 3369 3370 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param) 3371 : apv == 88 ? sizeof(struct p_rs_param) 3372 + SHARED_SECRET_MAX 3373 : apv <= 94 ? sizeof(struct p_rs_param_89) 3374 : /* apv >= 95 */ sizeof(struct p_rs_param_95); 3375 3376 if (pi->size > exp_max_sz) { 3377 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n", 3378 pi->size, exp_max_sz); 3379 return -EIO; 3380 } 3381 3382 if (apv <= 88) { 3383 header_size = sizeof(struct p_rs_param); 3384 data_size = pi->size - header_size; 3385 } else if (apv <= 94) { 3386 header_size = sizeof(struct p_rs_param_89); 3387 data_size = pi->size - header_size; 3388 D_ASSERT(device, data_size == 0); 3389 } else { 3390 header_size = sizeof(struct p_rs_param_95); 3391 data_size = pi->size - header_size; 3392 D_ASSERT(device, data_size == 0); 3393 } 3394 3395 /* initialize verify_alg and csums_alg */ 3396 p = pi->data; 3397 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX); 3398 3399 err = drbd_recv_all(peer_device->connection, p, header_size); 3400 if (err) 3401 return err; 3402 3403 mutex_lock(&connection->resource->conf_update); 3404 old_net_conf = peer_device->connection->net_conf; 3405 if (get_ldev(device)) { 3406 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL); 3407 if (!new_disk_conf) { 3408 put_ldev(device); 3409 mutex_unlock(&connection->resource->conf_update); 3410 drbd_err(device, "Allocation of new disk_conf failed\n"); 3411 return -ENOMEM; 3412 } 3413 3414 old_disk_conf = device->ldev->disk_conf; 3415 *new_disk_conf = *old_disk_conf; 3416 3417 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate); 3418 } 3419 3420 if (apv >= 88) { 3421 if (apv == 88) { 3422 if (data_size > SHARED_SECRET_MAX || data_size == 0) { 3423 drbd_err(device, "verify-alg of wrong size, " 3424 "peer wants %u, accepting only up to %u byte\n", 3425 data_size, SHARED_SECRET_MAX); 3426 err = -EIO; 3427 goto reconnect; 3428 } 3429 3430 err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size); 3431 if (err) 3432 goto reconnect; 3433 /* we expect NUL terminated string */ 3434 /* but just in case someone tries to be evil */ 3435 D_ASSERT(device, p->verify_alg[data_size-1] == 0); 3436 p->verify_alg[data_size-1] = 0; 3437 3438 } else /* apv >= 89 */ { 3439 /* we still expect NUL terminated strings */ 3440 /* but just in case someone tries to be evil */ 3441 D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0); 3442 D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0); 3443 p->verify_alg[SHARED_SECRET_MAX-1] = 0; 3444 p->csums_alg[SHARED_SECRET_MAX-1] = 0; 3445 } 3446 3447 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) { 3448 if (device->state.conn == C_WF_REPORT_PARAMS) { 3449 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n", 3450 old_net_conf->verify_alg, p->verify_alg); 3451 goto disconnect; 3452 } 3453 verify_tfm = drbd_crypto_alloc_digest_safe(device, 3454 p->verify_alg, "verify-alg"); 3455 if (IS_ERR(verify_tfm)) { 3456 verify_tfm = NULL; 3457 goto disconnect; 3458 } 3459 } 3460 3461 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) { 3462 if (device->state.conn == C_WF_REPORT_PARAMS) { 3463 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n", 3464 old_net_conf->csums_alg, p->csums_alg); 3465 goto disconnect; 3466 } 3467 csums_tfm = drbd_crypto_alloc_digest_safe(device, 3468 p->csums_alg, "csums-alg"); 3469 if (IS_ERR(csums_tfm)) { 3470 csums_tfm = NULL; 3471 goto disconnect; 3472 } 3473 } 3474 3475 if (apv > 94 && new_disk_conf) { 3476 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead); 3477 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target); 3478 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target); 3479 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate); 3480 3481 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ; 3482 if (fifo_size != device->rs_plan_s->size) { 3483 new_plan = fifo_alloc(fifo_size); 3484 if (!new_plan) { 3485 drbd_err(device, "kmalloc of fifo_buffer failed"); 3486 put_ldev(device); 3487 goto disconnect; 3488 } 3489 } 3490 } 3491 3492 if (verify_tfm || csums_tfm) { 3493 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL); 3494 if (!new_net_conf) { 3495 drbd_err(device, "Allocation of new net_conf failed\n"); 3496 goto disconnect; 3497 } 3498 3499 *new_net_conf = *old_net_conf; 3500 3501 if (verify_tfm) { 3502 strcpy(new_net_conf->verify_alg, p->verify_alg); 3503 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1; 3504 crypto_free_hash(peer_device->connection->verify_tfm); 3505 peer_device->connection->verify_tfm = verify_tfm; 3506 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg); 3507 } 3508 if (csums_tfm) { 3509 strcpy(new_net_conf->csums_alg, p->csums_alg); 3510 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1; 3511 crypto_free_hash(peer_device->connection->csums_tfm); 3512 peer_device->connection->csums_tfm = csums_tfm; 3513 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg); 3514 } 3515 rcu_assign_pointer(connection->net_conf, new_net_conf); 3516 } 3517 } 3518 3519 if (new_disk_conf) { 3520 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf); 3521 put_ldev(device); 3522 } 3523 3524 if (new_plan) { 3525 old_plan = device->rs_plan_s; 3526 rcu_assign_pointer(device->rs_plan_s, new_plan); 3527 } 3528 3529 mutex_unlock(&connection->resource->conf_update); 3530 synchronize_rcu(); 3531 if (new_net_conf) 3532 kfree(old_net_conf); 3533 kfree(old_disk_conf); 3534 kfree(old_plan); 3535 3536 return 0; 3537 3538 reconnect: 3539 if (new_disk_conf) { 3540 put_ldev(device); 3541 kfree(new_disk_conf); 3542 } 3543 mutex_unlock(&connection->resource->conf_update); 3544 return -EIO; 3545 3546 disconnect: 3547 kfree(new_plan); 3548 if (new_disk_conf) { 3549 put_ldev(device); 3550 kfree(new_disk_conf); 3551 } 3552 mutex_unlock(&connection->resource->conf_update); 3553 /* just for completeness: actually not needed, 3554 * as this is not reached if csums_tfm was ok. */ 3555 crypto_free_hash(csums_tfm); 3556 /* but free the verify_tfm again, if csums_tfm did not work out */ 3557 crypto_free_hash(verify_tfm); 3558 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 3559 return -EIO; 3560 } 3561 3562 /* warn if the arguments differ by more than 12.5% */ 3563 static void warn_if_differ_considerably(struct drbd_device *device, 3564 const char *s, sector_t a, sector_t b) 3565 { 3566 sector_t d; 3567 if (a == 0 || b == 0) 3568 return; 3569 d = (a > b) ? (a - b) : (b - a); 3570 if (d > (a>>3) || d > (b>>3)) 3571 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s, 3572 (unsigned long long)a, (unsigned long long)b); 3573 } 3574 3575 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi) 3576 { 3577 struct drbd_peer_device *peer_device; 3578 struct drbd_device *device; 3579 struct p_sizes *p = pi->data; 3580 enum determine_dev_size dd = DS_UNCHANGED; 3581 sector_t p_size, p_usize, my_usize; 3582 int ldsc = 0; /* local disk size changed */ 3583 enum dds_flags ddsf; 3584 3585 peer_device = conn_peer_device(connection, pi->vnr); 3586 if (!peer_device) 3587 return config_unknown_volume(connection, pi); 3588 device = peer_device->device; 3589 3590 p_size = be64_to_cpu(p->d_size); 3591 p_usize = be64_to_cpu(p->u_size); 3592 3593 /* just store the peer's disk size for now. 3594 * we still need to figure out whether we accept that. */ 3595 device->p_size = p_size; 3596 3597 if (get_ldev(device)) { 3598 rcu_read_lock(); 3599 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size; 3600 rcu_read_unlock(); 3601 3602 warn_if_differ_considerably(device, "lower level device sizes", 3603 p_size, drbd_get_max_capacity(device->ldev)); 3604 warn_if_differ_considerably(device, "user requested size", 3605 p_usize, my_usize); 3606 3607 /* if this is the first connect, or an otherwise expected 3608 * param exchange, choose the minimum */ 3609 if (device->state.conn == C_WF_REPORT_PARAMS) 3610 p_usize = min_not_zero(my_usize, p_usize); 3611 3612 /* Never shrink a device with usable data during connect. 3613 But allow online shrinking if we are connected. */ 3614 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) < 3615 drbd_get_capacity(device->this_bdev) && 3616 device->state.disk >= D_OUTDATED && 3617 device->state.conn < C_CONNECTED) { 3618 drbd_err(device, "The peer's disk size is too small!\n"); 3619 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 3620 put_ldev(device); 3621 return -EIO; 3622 } 3623 3624 if (my_usize != p_usize) { 3625 struct disk_conf *old_disk_conf, *new_disk_conf = NULL; 3626 3627 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL); 3628 if (!new_disk_conf) { 3629 drbd_err(device, "Allocation of new disk_conf failed\n"); 3630 put_ldev(device); 3631 return -ENOMEM; 3632 } 3633 3634 mutex_lock(&connection->resource->conf_update); 3635 old_disk_conf = device->ldev->disk_conf; 3636 *new_disk_conf = *old_disk_conf; 3637 new_disk_conf->disk_size = p_usize; 3638 3639 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf); 3640 mutex_unlock(&connection->resource->conf_update); 3641 synchronize_rcu(); 3642 kfree(old_disk_conf); 3643 3644 drbd_info(device, "Peer sets u_size to %lu sectors\n", 3645 (unsigned long)my_usize); 3646 } 3647 3648 put_ldev(device); 3649 } 3650 3651 ddsf = be16_to_cpu(p->dds_flags); 3652 if (get_ldev(device)) { 3653 dd = drbd_determine_dev_size(device, ddsf, NULL); 3654 put_ldev(device); 3655 if (dd == DS_ERROR) 3656 return -EIO; 3657 drbd_md_sync(device); 3658 } else { 3659 /* I am diskless, need to accept the peer's size. */ 3660 drbd_set_my_capacity(device, p_size); 3661 } 3662 3663 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size); 3664 drbd_reconsider_max_bio_size(device); 3665 3666 if (get_ldev(device)) { 3667 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) { 3668 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev); 3669 ldsc = 1; 3670 } 3671 3672 put_ldev(device); 3673 } 3674 3675 if (device->state.conn > C_WF_REPORT_PARAMS) { 3676 if (be64_to_cpu(p->c_size) != 3677 drbd_get_capacity(device->this_bdev) || ldsc) { 3678 /* we have different sizes, probably peer 3679 * needs to know my new size... */ 3680 drbd_send_sizes(peer_device, 0, ddsf); 3681 } 3682 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) || 3683 (dd == DS_GREW && device->state.conn == C_CONNECTED)) { 3684 if (device->state.pdsk >= D_INCONSISTENT && 3685 device->state.disk >= D_INCONSISTENT) { 3686 if (ddsf & DDSF_NO_RESYNC) 3687 drbd_info(device, "Resync of new storage suppressed with --assume-clean\n"); 3688 else 3689 resync_after_online_grow(device); 3690 } else 3691 set_bit(RESYNC_AFTER_NEG, &device->flags); 3692 } 3693 } 3694 3695 return 0; 3696 } 3697 3698 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi) 3699 { 3700 struct drbd_peer_device *peer_device; 3701 struct drbd_device *device; 3702 struct p_uuids *p = pi->data; 3703 u64 *p_uuid; 3704 int i, updated_uuids = 0; 3705 3706 peer_device = conn_peer_device(connection, pi->vnr); 3707 if (!peer_device) 3708 return config_unknown_volume(connection, pi); 3709 device = peer_device->device; 3710 3711 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO); 3712 if (!p_uuid) { 3713 drbd_err(device, "kmalloc of p_uuid failed\n"); 3714 return false; 3715 } 3716 3717 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++) 3718 p_uuid[i] = be64_to_cpu(p->uuid[i]); 3719 3720 kfree(device->p_uuid); 3721 device->p_uuid = p_uuid; 3722 3723 if (device->state.conn < C_CONNECTED && 3724 device->state.disk < D_INCONSISTENT && 3725 device->state.role == R_PRIMARY && 3726 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) { 3727 drbd_err(device, "Can only connect to data with current UUID=%016llX\n", 3728 (unsigned long long)device->ed_uuid); 3729 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 3730 return -EIO; 3731 } 3732 3733 if (get_ldev(device)) { 3734 int skip_initial_sync = 3735 device->state.conn == C_CONNECTED && 3736 peer_device->connection->agreed_pro_version >= 90 && 3737 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED && 3738 (p_uuid[UI_FLAGS] & 8); 3739 if (skip_initial_sync) { 3740 drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n"); 3741 drbd_bitmap_io(device, &drbd_bmio_clear_n_write, 3742 "clear_n_write from receive_uuids", 3743 BM_LOCKED_TEST_ALLOWED); 3744 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]); 3745 _drbd_uuid_set(device, UI_BITMAP, 0); 3746 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE), 3747 CS_VERBOSE, NULL); 3748 drbd_md_sync(device); 3749 updated_uuids = 1; 3750 } 3751 put_ldev(device); 3752 } else if (device->state.disk < D_INCONSISTENT && 3753 device->state.role == R_PRIMARY) { 3754 /* I am a diskless primary, the peer just created a new current UUID 3755 for me. */ 3756 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]); 3757 } 3758 3759 /* Before we test for the disk state, we should wait until an eventually 3760 ongoing cluster wide state change is finished. That is important if 3761 we are primary and are detaching from our disk. We need to see the 3762 new disk state... */ 3763 mutex_lock(device->state_mutex); 3764 mutex_unlock(device->state_mutex); 3765 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT) 3766 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]); 3767 3768 if (updated_uuids) 3769 drbd_print_uuids(device, "receiver updated UUIDs to"); 3770 3771 return 0; 3772 } 3773 3774 /** 3775 * convert_state() - Converts the peer's view of the cluster state to our point of view 3776 * @ps: The state as seen by the peer. 3777 */ 3778 static union drbd_state convert_state(union drbd_state ps) 3779 { 3780 union drbd_state ms; 3781 3782 static enum drbd_conns c_tab[] = { 3783 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS, 3784 [C_CONNECTED] = C_CONNECTED, 3785 3786 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T, 3787 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S, 3788 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */ 3789 [C_VERIFY_S] = C_VERIFY_T, 3790 [C_MASK] = C_MASK, 3791 }; 3792 3793 ms.i = ps.i; 3794 3795 ms.conn = c_tab[ps.conn]; 3796 ms.peer = ps.role; 3797 ms.role = ps.peer; 3798 ms.pdsk = ps.disk; 3799 ms.disk = ps.pdsk; 3800 ms.peer_isp = (ps.aftr_isp | ps.user_isp); 3801 3802 return ms; 3803 } 3804 3805 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi) 3806 { 3807 struct drbd_peer_device *peer_device; 3808 struct drbd_device *device; 3809 struct p_req_state *p = pi->data; 3810 union drbd_state mask, val; 3811 enum drbd_state_rv rv; 3812 3813 peer_device = conn_peer_device(connection, pi->vnr); 3814 if (!peer_device) 3815 return -EIO; 3816 device = peer_device->device; 3817 3818 mask.i = be32_to_cpu(p->mask); 3819 val.i = be32_to_cpu(p->val); 3820 3821 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) && 3822 mutex_is_locked(device->state_mutex)) { 3823 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG); 3824 return 0; 3825 } 3826 3827 mask = convert_state(mask); 3828 val = convert_state(val); 3829 3830 rv = drbd_change_state(device, CS_VERBOSE, mask, val); 3831 drbd_send_sr_reply(peer_device, rv); 3832 3833 drbd_md_sync(device); 3834 3835 return 0; 3836 } 3837 3838 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi) 3839 { 3840 struct p_req_state *p = pi->data; 3841 union drbd_state mask, val; 3842 enum drbd_state_rv rv; 3843 3844 mask.i = be32_to_cpu(p->mask); 3845 val.i = be32_to_cpu(p->val); 3846 3847 if (test_bit(RESOLVE_CONFLICTS, &connection->flags) && 3848 mutex_is_locked(&connection->cstate_mutex)) { 3849 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG); 3850 return 0; 3851 } 3852 3853 mask = convert_state(mask); 3854 val = convert_state(val); 3855 3856 rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL); 3857 conn_send_sr_reply(connection, rv); 3858 3859 return 0; 3860 } 3861 3862 static int receive_state(struct drbd_connection *connection, struct packet_info *pi) 3863 { 3864 struct drbd_peer_device *peer_device; 3865 struct drbd_device *device; 3866 struct p_state *p = pi->data; 3867 union drbd_state os, ns, peer_state; 3868 enum drbd_disk_state real_peer_disk; 3869 enum chg_state_flags cs_flags; 3870 int rv; 3871 3872 peer_device = conn_peer_device(connection, pi->vnr); 3873 if (!peer_device) 3874 return config_unknown_volume(connection, pi); 3875 device = peer_device->device; 3876 3877 peer_state.i = be32_to_cpu(p->state); 3878 3879 real_peer_disk = peer_state.disk; 3880 if (peer_state.disk == D_NEGOTIATING) { 3881 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT; 3882 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk)); 3883 } 3884 3885 spin_lock_irq(&device->resource->req_lock); 3886 retry: 3887 os = ns = drbd_read_state(device); 3888 spin_unlock_irq(&device->resource->req_lock); 3889 3890 /* If some other part of the code (asender thread, timeout) 3891 * already decided to close the connection again, 3892 * we must not "re-establish" it here. */ 3893 if (os.conn <= C_TEAR_DOWN) 3894 return -ECONNRESET; 3895 3896 /* If this is the "end of sync" confirmation, usually the peer disk 3897 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits 3898 * set) resync started in PausedSyncT, or if the timing of pause-/ 3899 * unpause-sync events has been "just right", the peer disk may 3900 * transition from D_CONSISTENT to D_UP_TO_DATE as well. 3901 */ 3902 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) && 3903 real_peer_disk == D_UP_TO_DATE && 3904 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) { 3905 /* If we are (becoming) SyncSource, but peer is still in sync 3906 * preparation, ignore its uptodate-ness to avoid flapping, it 3907 * will change to inconsistent once the peer reaches active 3908 * syncing states. 3909 * It may have changed syncer-paused flags, however, so we 3910 * cannot ignore this completely. */ 3911 if (peer_state.conn > C_CONNECTED && 3912 peer_state.conn < C_SYNC_SOURCE) 3913 real_peer_disk = D_INCONSISTENT; 3914 3915 /* if peer_state changes to connected at the same time, 3916 * it explicitly notifies us that it finished resync. 3917 * Maybe we should finish it up, too? */ 3918 else if (os.conn >= C_SYNC_SOURCE && 3919 peer_state.conn == C_CONNECTED) { 3920 if (drbd_bm_total_weight(device) <= device->rs_failed) 3921 drbd_resync_finished(device); 3922 return 0; 3923 } 3924 } 3925 3926 /* explicit verify finished notification, stop sector reached. */ 3927 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE && 3928 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) { 3929 ov_out_of_sync_print(device); 3930 drbd_resync_finished(device); 3931 return 0; 3932 } 3933 3934 /* peer says his disk is inconsistent, while we think it is uptodate, 3935 * and this happens while the peer still thinks we have a sync going on, 3936 * but we think we are already done with the sync. 3937 * We ignore this to avoid flapping pdsk. 3938 * This should not happen, if the peer is a recent version of drbd. */ 3939 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT && 3940 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE) 3941 real_peer_disk = D_UP_TO_DATE; 3942 3943 if (ns.conn == C_WF_REPORT_PARAMS) 3944 ns.conn = C_CONNECTED; 3945 3946 if (peer_state.conn == C_AHEAD) 3947 ns.conn = C_BEHIND; 3948 3949 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING && 3950 get_ldev_if_state(device, D_NEGOTIATING)) { 3951 int cr; /* consider resync */ 3952 3953 /* if we established a new connection */ 3954 cr = (os.conn < C_CONNECTED); 3955 /* if we had an established connection 3956 * and one of the nodes newly attaches a disk */ 3957 cr |= (os.conn == C_CONNECTED && 3958 (peer_state.disk == D_NEGOTIATING || 3959 os.disk == D_NEGOTIATING)); 3960 /* if we have both been inconsistent, and the peer has been 3961 * forced to be UpToDate with --overwrite-data */ 3962 cr |= test_bit(CONSIDER_RESYNC, &device->flags); 3963 /* if we had been plain connected, and the admin requested to 3964 * start a sync by "invalidate" or "invalidate-remote" */ 3965 cr |= (os.conn == C_CONNECTED && 3966 (peer_state.conn >= C_STARTING_SYNC_S && 3967 peer_state.conn <= C_WF_BITMAP_T)); 3968 3969 if (cr) 3970 ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk); 3971 3972 put_ldev(device); 3973 if (ns.conn == C_MASK) { 3974 ns.conn = C_CONNECTED; 3975 if (device->state.disk == D_NEGOTIATING) { 3976 drbd_force_state(device, NS(disk, D_FAILED)); 3977 } else if (peer_state.disk == D_NEGOTIATING) { 3978 drbd_err(device, "Disk attach process on the peer node was aborted.\n"); 3979 peer_state.disk = D_DISKLESS; 3980 real_peer_disk = D_DISKLESS; 3981 } else { 3982 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags)) 3983 return -EIO; 3984 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS); 3985 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 3986 return -EIO; 3987 } 3988 } 3989 } 3990 3991 spin_lock_irq(&device->resource->req_lock); 3992 if (os.i != drbd_read_state(device).i) 3993 goto retry; 3994 clear_bit(CONSIDER_RESYNC, &device->flags); 3995 ns.peer = peer_state.role; 3996 ns.pdsk = real_peer_disk; 3997 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp); 3998 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING) 3999 ns.disk = device->new_state_tmp.disk; 4000 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD); 4001 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED && 4002 test_bit(NEW_CUR_UUID, &device->flags)) { 4003 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this 4004 for temporal network outages! */ 4005 spin_unlock_irq(&device->resource->req_lock); 4006 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n"); 4007 tl_clear(peer_device->connection); 4008 drbd_uuid_new_current(device); 4009 clear_bit(NEW_CUR_UUID, &device->flags); 4010 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD); 4011 return -EIO; 4012 } 4013 rv = _drbd_set_state(device, ns, cs_flags, NULL); 4014 ns = drbd_read_state(device); 4015 spin_unlock_irq(&device->resource->req_lock); 4016 4017 if (rv < SS_SUCCESS) { 4018 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 4019 return -EIO; 4020 } 4021 4022 if (os.conn > C_WF_REPORT_PARAMS) { 4023 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED && 4024 peer_state.disk != D_NEGOTIATING ) { 4025 /* we want resync, peer has not yet decided to sync... */ 4026 /* Nowadays only used when forcing a node into primary role and 4027 setting its disk to UpToDate with that */ 4028 drbd_send_uuids(peer_device); 4029 drbd_send_current_state(peer_device); 4030 } 4031 } 4032 4033 clear_bit(DISCARD_MY_DATA, &device->flags); 4034 4035 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */ 4036 4037 return 0; 4038 } 4039 4040 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi) 4041 { 4042 struct drbd_peer_device *peer_device; 4043 struct drbd_device *device; 4044 struct p_rs_uuid *p = pi->data; 4045 4046 peer_device = conn_peer_device(connection, pi->vnr); 4047 if (!peer_device) 4048 return -EIO; 4049 device = peer_device->device; 4050 4051 wait_event(device->misc_wait, 4052 device->state.conn == C_WF_SYNC_UUID || 4053 device->state.conn == C_BEHIND || 4054 device->state.conn < C_CONNECTED || 4055 device->state.disk < D_NEGOTIATING); 4056 4057 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */ 4058 4059 /* Here the _drbd_uuid_ functions are right, current should 4060 _not_ be rotated into the history */ 4061 if (get_ldev_if_state(device, D_NEGOTIATING)) { 4062 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid)); 4063 _drbd_uuid_set(device, UI_BITMAP, 0UL); 4064 4065 drbd_print_uuids(device, "updated sync uuid"); 4066 drbd_start_resync(device, C_SYNC_TARGET); 4067 4068 put_ldev(device); 4069 } else 4070 drbd_err(device, "Ignoring SyncUUID packet!\n"); 4071 4072 return 0; 4073 } 4074 4075 /** 4076 * receive_bitmap_plain 4077 * 4078 * Return 0 when done, 1 when another iteration is needed, and a negative error 4079 * code upon failure. 4080 */ 4081 static int 4082 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size, 4083 unsigned long *p, struct bm_xfer_ctx *c) 4084 { 4085 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - 4086 drbd_header_size(peer_device->connection); 4087 unsigned int num_words = min_t(size_t, data_size / sizeof(*p), 4088 c->bm_words - c->word_offset); 4089 unsigned int want = num_words * sizeof(*p); 4090 int err; 4091 4092 if (want != size) { 4093 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size); 4094 return -EIO; 4095 } 4096 if (want == 0) 4097 return 0; 4098 err = drbd_recv_all(peer_device->connection, p, want); 4099 if (err) 4100 return err; 4101 4102 drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p); 4103 4104 c->word_offset += num_words; 4105 c->bit_offset = c->word_offset * BITS_PER_LONG; 4106 if (c->bit_offset > c->bm_bits) 4107 c->bit_offset = c->bm_bits; 4108 4109 return 1; 4110 } 4111 4112 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p) 4113 { 4114 return (enum drbd_bitmap_code)(p->encoding & 0x0f); 4115 } 4116 4117 static int dcbp_get_start(struct p_compressed_bm *p) 4118 { 4119 return (p->encoding & 0x80) != 0; 4120 } 4121 4122 static int dcbp_get_pad_bits(struct p_compressed_bm *p) 4123 { 4124 return (p->encoding >> 4) & 0x7; 4125 } 4126 4127 /** 4128 * recv_bm_rle_bits 4129 * 4130 * Return 0 when done, 1 when another iteration is needed, and a negative error 4131 * code upon failure. 4132 */ 4133 static int 4134 recv_bm_rle_bits(struct drbd_peer_device *peer_device, 4135 struct p_compressed_bm *p, 4136 struct bm_xfer_ctx *c, 4137 unsigned int len) 4138 { 4139 struct bitstream bs; 4140 u64 look_ahead; 4141 u64 rl; 4142 u64 tmp; 4143 unsigned long s = c->bit_offset; 4144 unsigned long e; 4145 int toggle = dcbp_get_start(p); 4146 int have; 4147 int bits; 4148 4149 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p)); 4150 4151 bits = bitstream_get_bits(&bs, &look_ahead, 64); 4152 if (bits < 0) 4153 return -EIO; 4154 4155 for (have = bits; have > 0; s += rl, toggle = !toggle) { 4156 bits = vli_decode_bits(&rl, look_ahead); 4157 if (bits <= 0) 4158 return -EIO; 4159 4160 if (toggle) { 4161 e = s + rl -1; 4162 if (e >= c->bm_bits) { 4163 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e); 4164 return -EIO; 4165 } 4166 _drbd_bm_set_bits(peer_device->device, s, e); 4167 } 4168 4169 if (have < bits) { 4170 drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n", 4171 have, bits, look_ahead, 4172 (unsigned int)(bs.cur.b - p->code), 4173 (unsigned int)bs.buf_len); 4174 return -EIO; 4175 } 4176 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */ 4177 if (likely(bits < 64)) 4178 look_ahead >>= bits; 4179 else 4180 look_ahead = 0; 4181 have -= bits; 4182 4183 bits = bitstream_get_bits(&bs, &tmp, 64 - have); 4184 if (bits < 0) 4185 return -EIO; 4186 look_ahead |= tmp << have; 4187 have += bits; 4188 } 4189 4190 c->bit_offset = s; 4191 bm_xfer_ctx_bit_to_word_offset(c); 4192 4193 return (s != c->bm_bits); 4194 } 4195 4196 /** 4197 * decode_bitmap_c 4198 * 4199 * Return 0 when done, 1 when another iteration is needed, and a negative error 4200 * code upon failure. 4201 */ 4202 static int 4203 decode_bitmap_c(struct drbd_peer_device *peer_device, 4204 struct p_compressed_bm *p, 4205 struct bm_xfer_ctx *c, 4206 unsigned int len) 4207 { 4208 if (dcbp_get_code(p) == RLE_VLI_Bits) 4209 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p)); 4210 4211 /* other variants had been implemented for evaluation, 4212 * but have been dropped as this one turned out to be "best" 4213 * during all our tests. */ 4214 4215 drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding); 4216 conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD); 4217 return -EIO; 4218 } 4219 4220 void INFO_bm_xfer_stats(struct drbd_device *device, 4221 const char *direction, struct bm_xfer_ctx *c) 4222 { 4223 /* what would it take to transfer it "plaintext" */ 4224 unsigned int header_size = drbd_header_size(first_peer_device(device)->connection); 4225 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size; 4226 unsigned int plain = 4227 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) + 4228 c->bm_words * sizeof(unsigned long); 4229 unsigned int total = c->bytes[0] + c->bytes[1]; 4230 unsigned int r; 4231 4232 /* total can not be zero. but just in case: */ 4233 if (total == 0) 4234 return; 4235 4236 /* don't report if not compressed */ 4237 if (total >= plain) 4238 return; 4239 4240 /* total < plain. check for overflow, still */ 4241 r = (total > UINT_MAX/1000) ? (total / (plain/1000)) 4242 : (1000 * total / plain); 4243 4244 if (r > 1000) 4245 r = 1000; 4246 4247 r = 1000 - r; 4248 drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), " 4249 "total %u; compression: %u.%u%%\n", 4250 direction, 4251 c->bytes[1], c->packets[1], 4252 c->bytes[0], c->packets[0], 4253 total, r/10, r % 10); 4254 } 4255 4256 /* Since we are processing the bitfield from lower addresses to higher, 4257 it does not matter if the process it in 32 bit chunks or 64 bit 4258 chunks as long as it is little endian. (Understand it as byte stream, 4259 beginning with the lowest byte...) If we would use big endian 4260 we would need to process it from the highest address to the lowest, 4261 in order to be agnostic to the 32 vs 64 bits issue. 4262 4263 returns 0 on failure, 1 if we successfully received it. */ 4264 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi) 4265 { 4266 struct drbd_peer_device *peer_device; 4267 struct drbd_device *device; 4268 struct bm_xfer_ctx c; 4269 int err; 4270 4271 peer_device = conn_peer_device(connection, pi->vnr); 4272 if (!peer_device) 4273 return -EIO; 4274 device = peer_device->device; 4275 4276 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED); 4277 /* you are supposed to send additional out-of-sync information 4278 * if you actually set bits during this phase */ 4279 4280 c = (struct bm_xfer_ctx) { 4281 .bm_bits = drbd_bm_bits(device), 4282 .bm_words = drbd_bm_words(device), 4283 }; 4284 4285 for(;;) { 4286 if (pi->cmd == P_BITMAP) 4287 err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c); 4288 else if (pi->cmd == P_COMPRESSED_BITMAP) { 4289 /* MAYBE: sanity check that we speak proto >= 90, 4290 * and the feature is enabled! */ 4291 struct p_compressed_bm *p = pi->data; 4292 4293 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) { 4294 drbd_err(device, "ReportCBitmap packet too large\n"); 4295 err = -EIO; 4296 goto out; 4297 } 4298 if (pi->size <= sizeof(*p)) { 4299 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size); 4300 err = -EIO; 4301 goto out; 4302 } 4303 err = drbd_recv_all(peer_device->connection, p, pi->size); 4304 if (err) 4305 goto out; 4306 err = decode_bitmap_c(peer_device, p, &c, pi->size); 4307 } else { 4308 drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd); 4309 err = -EIO; 4310 goto out; 4311 } 4312 4313 c.packets[pi->cmd == P_BITMAP]++; 4314 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size; 4315 4316 if (err <= 0) { 4317 if (err < 0) 4318 goto out; 4319 break; 4320 } 4321 err = drbd_recv_header(peer_device->connection, pi); 4322 if (err) 4323 goto out; 4324 } 4325 4326 INFO_bm_xfer_stats(device, "receive", &c); 4327 4328 if (device->state.conn == C_WF_BITMAP_T) { 4329 enum drbd_state_rv rv; 4330 4331 err = drbd_send_bitmap(device); 4332 if (err) 4333 goto out; 4334 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */ 4335 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE); 4336 D_ASSERT(device, rv == SS_SUCCESS); 4337 } else if (device->state.conn != C_WF_BITMAP_S) { 4338 /* admin may have requested C_DISCONNECTING, 4339 * other threads may have noticed network errors */ 4340 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n", 4341 drbd_conn_str(device->state.conn)); 4342 } 4343 err = 0; 4344 4345 out: 4346 drbd_bm_unlock(device); 4347 if (!err && device->state.conn == C_WF_BITMAP_S) 4348 drbd_start_resync(device, C_SYNC_SOURCE); 4349 return err; 4350 } 4351 4352 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi) 4353 { 4354 drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n", 4355 pi->cmd, pi->size); 4356 4357 return ignore_remaining_packet(connection, pi); 4358 } 4359 4360 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi) 4361 { 4362 /* Make sure we've acked all the TCP data associated 4363 * with the data requests being unplugged */ 4364 drbd_tcp_quickack(connection->data.socket); 4365 4366 return 0; 4367 } 4368 4369 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi) 4370 { 4371 struct drbd_peer_device *peer_device; 4372 struct drbd_device *device; 4373 struct p_block_desc *p = pi->data; 4374 4375 peer_device = conn_peer_device(connection, pi->vnr); 4376 if (!peer_device) 4377 return -EIO; 4378 device = peer_device->device; 4379 4380 switch (device->state.conn) { 4381 case C_WF_SYNC_UUID: 4382 case C_WF_BITMAP_T: 4383 case C_BEHIND: 4384 break; 4385 default: 4386 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n", 4387 drbd_conn_str(device->state.conn)); 4388 } 4389 4390 drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize)); 4391 4392 return 0; 4393 } 4394 4395 struct data_cmd { 4396 int expect_payload; 4397 size_t pkt_size; 4398 int (*fn)(struct drbd_connection *, struct packet_info *); 4399 }; 4400 4401 static struct data_cmd drbd_cmd_handler[] = { 4402 [P_DATA] = { 1, sizeof(struct p_data), receive_Data }, 4403 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply }, 4404 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } , 4405 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } , 4406 [P_BITMAP] = { 1, 0, receive_bitmap } , 4407 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } , 4408 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote }, 4409 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4410 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4411 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam }, 4412 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam }, 4413 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol }, 4414 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids }, 4415 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes }, 4416 [P_STATE] = { 0, sizeof(struct p_state), receive_state }, 4417 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state }, 4418 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid }, 4419 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4420 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest }, 4421 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest }, 4422 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip }, 4423 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync }, 4424 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state }, 4425 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol }, 4426 }; 4427 4428 static void drbdd(struct drbd_connection *connection) 4429 { 4430 struct packet_info pi; 4431 size_t shs; /* sub header size */ 4432 int err; 4433 4434 while (get_t_state(&connection->receiver) == RUNNING) { 4435 struct data_cmd *cmd; 4436 4437 drbd_thread_current_set_cpu(&connection->receiver); 4438 if (drbd_recv_header(connection, &pi)) 4439 goto err_out; 4440 4441 cmd = &drbd_cmd_handler[pi.cmd]; 4442 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) { 4443 drbd_err(connection, "Unexpected data packet %s (0x%04x)", 4444 cmdname(pi.cmd), pi.cmd); 4445 goto err_out; 4446 } 4447 4448 shs = cmd->pkt_size; 4449 if (pi.size > shs && !cmd->expect_payload) { 4450 drbd_err(connection, "No payload expected %s l:%d\n", 4451 cmdname(pi.cmd), pi.size); 4452 goto err_out; 4453 } 4454 4455 if (shs) { 4456 err = drbd_recv_all_warn(connection, pi.data, shs); 4457 if (err) 4458 goto err_out; 4459 pi.size -= shs; 4460 } 4461 4462 err = cmd->fn(connection, &pi); 4463 if (err) { 4464 drbd_err(connection, "error receiving %s, e: %d l: %d!\n", 4465 cmdname(pi.cmd), err, pi.size); 4466 goto err_out; 4467 } 4468 } 4469 return; 4470 4471 err_out: 4472 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD); 4473 } 4474 4475 static void conn_disconnect(struct drbd_connection *connection) 4476 { 4477 struct drbd_peer_device *peer_device; 4478 enum drbd_conns oc; 4479 int vnr; 4480 4481 if (connection->cstate == C_STANDALONE) 4482 return; 4483 4484 /* We are about to start the cleanup after connection loss. 4485 * Make sure drbd_make_request knows about that. 4486 * Usually we should be in some network failure state already, 4487 * but just in case we are not, we fix it up here. 4488 */ 4489 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 4490 4491 /* asender does not clean up anything. it must not interfere, either */ 4492 drbd_thread_stop(&connection->asender); 4493 drbd_free_sock(connection); 4494 4495 rcu_read_lock(); 4496 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 4497 struct drbd_device *device = peer_device->device; 4498 kref_get(&device->kref); 4499 rcu_read_unlock(); 4500 drbd_disconnected(peer_device); 4501 kref_put(&device->kref, drbd_destroy_device); 4502 rcu_read_lock(); 4503 } 4504 rcu_read_unlock(); 4505 4506 if (!list_empty(&connection->current_epoch->list)) 4507 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n"); 4508 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */ 4509 atomic_set(&connection->current_epoch->epoch_size, 0); 4510 connection->send.seen_any_write_yet = false; 4511 4512 drbd_info(connection, "Connection closed\n"); 4513 4514 if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN) 4515 conn_try_outdate_peer_async(connection); 4516 4517 spin_lock_irq(&connection->resource->req_lock); 4518 oc = connection->cstate; 4519 if (oc >= C_UNCONNECTED) 4520 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE); 4521 4522 spin_unlock_irq(&connection->resource->req_lock); 4523 4524 if (oc == C_DISCONNECTING) 4525 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD); 4526 } 4527 4528 static int drbd_disconnected(struct drbd_peer_device *peer_device) 4529 { 4530 struct drbd_device *device = peer_device->device; 4531 unsigned int i; 4532 4533 /* wait for current activity to cease. */ 4534 spin_lock_irq(&device->resource->req_lock); 4535 _drbd_wait_ee_list_empty(device, &device->active_ee); 4536 _drbd_wait_ee_list_empty(device, &device->sync_ee); 4537 _drbd_wait_ee_list_empty(device, &device->read_ee); 4538 spin_unlock_irq(&device->resource->req_lock); 4539 4540 /* We do not have data structures that would allow us to 4541 * get the rs_pending_cnt down to 0 again. 4542 * * On C_SYNC_TARGET we do not have any data structures describing 4543 * the pending RSDataRequest's we have sent. 4544 * * On C_SYNC_SOURCE there is no data structure that tracks 4545 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget. 4546 * And no, it is not the sum of the reference counts in the 4547 * resync_LRU. The resync_LRU tracks the whole operation including 4548 * the disk-IO, while the rs_pending_cnt only tracks the blocks 4549 * on the fly. */ 4550 drbd_rs_cancel_all(device); 4551 device->rs_total = 0; 4552 device->rs_failed = 0; 4553 atomic_set(&device->rs_pending_cnt, 0); 4554 wake_up(&device->misc_wait); 4555 4556 del_timer_sync(&device->resync_timer); 4557 resync_timer_fn((unsigned long)device); 4558 4559 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier, 4560 * w_make_resync_request etc. which may still be on the worker queue 4561 * to be "canceled" */ 4562 drbd_flush_workqueue(&peer_device->connection->sender_work); 4563 4564 drbd_finish_peer_reqs(device); 4565 4566 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs() 4567 might have issued a work again. The one before drbd_finish_peer_reqs() is 4568 necessary to reclain net_ee in drbd_finish_peer_reqs(). */ 4569 drbd_flush_workqueue(&peer_device->connection->sender_work); 4570 4571 /* need to do it again, drbd_finish_peer_reqs() may have populated it 4572 * again via drbd_try_clear_on_disk_bm(). */ 4573 drbd_rs_cancel_all(device); 4574 4575 kfree(device->p_uuid); 4576 device->p_uuid = NULL; 4577 4578 if (!drbd_suspended(device)) 4579 tl_clear(peer_device->connection); 4580 4581 drbd_md_sync(device); 4582 4583 /* serialize with bitmap writeout triggered by the state change, 4584 * if any. */ 4585 wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags)); 4586 4587 /* tcp_close and release of sendpage pages can be deferred. I don't 4588 * want to use SO_LINGER, because apparently it can be deferred for 4589 * more than 20 seconds (longest time I checked). 4590 * 4591 * Actually we don't care for exactly when the network stack does its 4592 * put_page(), but release our reference on these pages right here. 4593 */ 4594 i = drbd_free_peer_reqs(device, &device->net_ee); 4595 if (i) 4596 drbd_info(device, "net_ee not empty, killed %u entries\n", i); 4597 i = atomic_read(&device->pp_in_use_by_net); 4598 if (i) 4599 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i); 4600 i = atomic_read(&device->pp_in_use); 4601 if (i) 4602 drbd_info(device, "pp_in_use = %d, expected 0\n", i); 4603 4604 D_ASSERT(device, list_empty(&device->read_ee)); 4605 D_ASSERT(device, list_empty(&device->active_ee)); 4606 D_ASSERT(device, list_empty(&device->sync_ee)); 4607 D_ASSERT(device, list_empty(&device->done_ee)); 4608 4609 return 0; 4610 } 4611 4612 /* 4613 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version 4614 * we can agree on is stored in agreed_pro_version. 4615 * 4616 * feature flags and the reserved array should be enough room for future 4617 * enhancements of the handshake protocol, and possible plugins... 4618 * 4619 * for now, they are expected to be zero, but ignored. 4620 */ 4621 static int drbd_send_features(struct drbd_connection *connection) 4622 { 4623 struct drbd_socket *sock; 4624 struct p_connection_features *p; 4625 4626 sock = &connection->data; 4627 p = conn_prepare_command(connection, sock); 4628 if (!p) 4629 return -EIO; 4630 memset(p, 0, sizeof(*p)); 4631 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN); 4632 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX); 4633 return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0); 4634 } 4635 4636 /* 4637 * return values: 4638 * 1 yes, we have a valid connection 4639 * 0 oops, did not work out, please try again 4640 * -1 peer talks different language, 4641 * no point in trying again, please go standalone. 4642 */ 4643 static int drbd_do_features(struct drbd_connection *connection) 4644 { 4645 /* ASSERT current == connection->receiver ... */ 4646 struct p_connection_features *p; 4647 const int expect = sizeof(struct p_connection_features); 4648 struct packet_info pi; 4649 int err; 4650 4651 err = drbd_send_features(connection); 4652 if (err) 4653 return 0; 4654 4655 err = drbd_recv_header(connection, &pi); 4656 if (err) 4657 return 0; 4658 4659 if (pi.cmd != P_CONNECTION_FEATURES) { 4660 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n", 4661 cmdname(pi.cmd), pi.cmd); 4662 return -1; 4663 } 4664 4665 if (pi.size != expect) { 4666 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n", 4667 expect, pi.size); 4668 return -1; 4669 } 4670 4671 p = pi.data; 4672 err = drbd_recv_all_warn(connection, p, expect); 4673 if (err) 4674 return 0; 4675 4676 p->protocol_min = be32_to_cpu(p->protocol_min); 4677 p->protocol_max = be32_to_cpu(p->protocol_max); 4678 if (p->protocol_max == 0) 4679 p->protocol_max = p->protocol_min; 4680 4681 if (PRO_VERSION_MAX < p->protocol_min || 4682 PRO_VERSION_MIN > p->protocol_max) 4683 goto incompat; 4684 4685 connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max); 4686 4687 drbd_info(connection, "Handshake successful: " 4688 "Agreed network protocol version %d\n", connection->agreed_pro_version); 4689 4690 return 1; 4691 4692 incompat: 4693 drbd_err(connection, "incompatible DRBD dialects: " 4694 "I support %d-%d, peer supports %d-%d\n", 4695 PRO_VERSION_MIN, PRO_VERSION_MAX, 4696 p->protocol_min, p->protocol_max); 4697 return -1; 4698 } 4699 4700 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE) 4701 static int drbd_do_auth(struct drbd_connection *connection) 4702 { 4703 drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n"); 4704 drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n"); 4705 return -1; 4706 } 4707 #else 4708 #define CHALLENGE_LEN 64 4709 4710 /* Return value: 4711 1 - auth succeeded, 4712 0 - failed, try again (network error), 4713 -1 - auth failed, don't try again. 4714 */ 4715 4716 static int drbd_do_auth(struct drbd_connection *connection) 4717 { 4718 struct drbd_socket *sock; 4719 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */ 4720 struct scatterlist sg; 4721 char *response = NULL; 4722 char *right_response = NULL; 4723 char *peers_ch = NULL; 4724 unsigned int key_len; 4725 char secret[SHARED_SECRET_MAX]; /* 64 byte */ 4726 unsigned int resp_size; 4727 struct hash_desc desc; 4728 struct packet_info pi; 4729 struct net_conf *nc; 4730 int err, rv; 4731 4732 /* FIXME: Put the challenge/response into the preallocated socket buffer. */ 4733 4734 rcu_read_lock(); 4735 nc = rcu_dereference(connection->net_conf); 4736 key_len = strlen(nc->shared_secret); 4737 memcpy(secret, nc->shared_secret, key_len); 4738 rcu_read_unlock(); 4739 4740 desc.tfm = connection->cram_hmac_tfm; 4741 desc.flags = 0; 4742 4743 rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len); 4744 if (rv) { 4745 drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv); 4746 rv = -1; 4747 goto fail; 4748 } 4749 4750 get_random_bytes(my_challenge, CHALLENGE_LEN); 4751 4752 sock = &connection->data; 4753 if (!conn_prepare_command(connection, sock)) { 4754 rv = 0; 4755 goto fail; 4756 } 4757 rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0, 4758 my_challenge, CHALLENGE_LEN); 4759 if (!rv) 4760 goto fail; 4761 4762 err = drbd_recv_header(connection, &pi); 4763 if (err) { 4764 rv = 0; 4765 goto fail; 4766 } 4767 4768 if (pi.cmd != P_AUTH_CHALLENGE) { 4769 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n", 4770 cmdname(pi.cmd), pi.cmd); 4771 rv = 0; 4772 goto fail; 4773 } 4774 4775 if (pi.size > CHALLENGE_LEN * 2) { 4776 drbd_err(connection, "expected AuthChallenge payload too big.\n"); 4777 rv = -1; 4778 goto fail; 4779 } 4780 4781 peers_ch = kmalloc(pi.size, GFP_NOIO); 4782 if (peers_ch == NULL) { 4783 drbd_err(connection, "kmalloc of peers_ch failed\n"); 4784 rv = -1; 4785 goto fail; 4786 } 4787 4788 err = drbd_recv_all_warn(connection, peers_ch, pi.size); 4789 if (err) { 4790 rv = 0; 4791 goto fail; 4792 } 4793 4794 resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm); 4795 response = kmalloc(resp_size, GFP_NOIO); 4796 if (response == NULL) { 4797 drbd_err(connection, "kmalloc of response failed\n"); 4798 rv = -1; 4799 goto fail; 4800 } 4801 4802 sg_init_table(&sg, 1); 4803 sg_set_buf(&sg, peers_ch, pi.size); 4804 4805 rv = crypto_hash_digest(&desc, &sg, sg.length, response); 4806 if (rv) { 4807 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv); 4808 rv = -1; 4809 goto fail; 4810 } 4811 4812 if (!conn_prepare_command(connection, sock)) { 4813 rv = 0; 4814 goto fail; 4815 } 4816 rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0, 4817 response, resp_size); 4818 if (!rv) 4819 goto fail; 4820 4821 err = drbd_recv_header(connection, &pi); 4822 if (err) { 4823 rv = 0; 4824 goto fail; 4825 } 4826 4827 if (pi.cmd != P_AUTH_RESPONSE) { 4828 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n", 4829 cmdname(pi.cmd), pi.cmd); 4830 rv = 0; 4831 goto fail; 4832 } 4833 4834 if (pi.size != resp_size) { 4835 drbd_err(connection, "expected AuthResponse payload of wrong size\n"); 4836 rv = 0; 4837 goto fail; 4838 } 4839 4840 err = drbd_recv_all_warn(connection, response , resp_size); 4841 if (err) { 4842 rv = 0; 4843 goto fail; 4844 } 4845 4846 right_response = kmalloc(resp_size, GFP_NOIO); 4847 if (right_response == NULL) { 4848 drbd_err(connection, "kmalloc of right_response failed\n"); 4849 rv = -1; 4850 goto fail; 4851 } 4852 4853 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN); 4854 4855 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response); 4856 if (rv) { 4857 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv); 4858 rv = -1; 4859 goto fail; 4860 } 4861 4862 rv = !memcmp(response, right_response, resp_size); 4863 4864 if (rv) 4865 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n", 4866 resp_size); 4867 else 4868 rv = -1; 4869 4870 fail: 4871 kfree(peers_ch); 4872 kfree(response); 4873 kfree(right_response); 4874 4875 return rv; 4876 } 4877 #endif 4878 4879 int drbd_receiver(struct drbd_thread *thi) 4880 { 4881 struct drbd_connection *connection = thi->connection; 4882 int h; 4883 4884 drbd_info(connection, "receiver (re)started\n"); 4885 4886 do { 4887 h = conn_connect(connection); 4888 if (h == 0) { 4889 conn_disconnect(connection); 4890 schedule_timeout_interruptible(HZ); 4891 } 4892 if (h == -1) { 4893 drbd_warn(connection, "Discarding network configuration.\n"); 4894 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 4895 } 4896 } while (h == 0); 4897 4898 if (h > 0) 4899 drbdd(connection); 4900 4901 conn_disconnect(connection); 4902 4903 drbd_info(connection, "receiver terminated\n"); 4904 return 0; 4905 } 4906 4907 /* ********* acknowledge sender ******** */ 4908 4909 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi) 4910 { 4911 struct p_req_state_reply *p = pi->data; 4912 int retcode = be32_to_cpu(p->retcode); 4913 4914 if (retcode >= SS_SUCCESS) { 4915 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags); 4916 } else { 4917 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags); 4918 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n", 4919 drbd_set_st_err_str(retcode), retcode); 4920 } 4921 wake_up(&connection->ping_wait); 4922 4923 return 0; 4924 } 4925 4926 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi) 4927 { 4928 struct drbd_peer_device *peer_device; 4929 struct drbd_device *device; 4930 struct p_req_state_reply *p = pi->data; 4931 int retcode = be32_to_cpu(p->retcode); 4932 4933 peer_device = conn_peer_device(connection, pi->vnr); 4934 if (!peer_device) 4935 return -EIO; 4936 device = peer_device->device; 4937 4938 if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) { 4939 D_ASSERT(device, connection->agreed_pro_version < 100); 4940 return got_conn_RqSReply(connection, pi); 4941 } 4942 4943 if (retcode >= SS_SUCCESS) { 4944 set_bit(CL_ST_CHG_SUCCESS, &device->flags); 4945 } else { 4946 set_bit(CL_ST_CHG_FAIL, &device->flags); 4947 drbd_err(device, "Requested state change failed by peer: %s (%d)\n", 4948 drbd_set_st_err_str(retcode), retcode); 4949 } 4950 wake_up(&device->state_wait); 4951 4952 return 0; 4953 } 4954 4955 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi) 4956 { 4957 return drbd_send_ping_ack(connection); 4958 4959 } 4960 4961 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi) 4962 { 4963 /* restore idle timeout */ 4964 connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ; 4965 if (!test_and_set_bit(GOT_PING_ACK, &connection->flags)) 4966 wake_up(&connection->ping_wait); 4967 4968 return 0; 4969 } 4970 4971 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi) 4972 { 4973 struct drbd_peer_device *peer_device; 4974 struct drbd_device *device; 4975 struct p_block_ack *p = pi->data; 4976 sector_t sector = be64_to_cpu(p->sector); 4977 int blksize = be32_to_cpu(p->blksize); 4978 4979 peer_device = conn_peer_device(connection, pi->vnr); 4980 if (!peer_device) 4981 return -EIO; 4982 device = peer_device->device; 4983 4984 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89); 4985 4986 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 4987 4988 if (get_ldev(device)) { 4989 drbd_rs_complete_io(device, sector); 4990 drbd_set_in_sync(device, sector, blksize); 4991 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */ 4992 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT); 4993 put_ldev(device); 4994 } 4995 dec_rs_pending(device); 4996 atomic_add(blksize >> 9, &device->rs_sect_in); 4997 4998 return 0; 4999 } 5000 5001 static int 5002 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector, 5003 struct rb_root *root, const char *func, 5004 enum drbd_req_event what, bool missing_ok) 5005 { 5006 struct drbd_request *req; 5007 struct bio_and_error m; 5008 5009 spin_lock_irq(&device->resource->req_lock); 5010 req = find_request(device, root, id, sector, missing_ok, func); 5011 if (unlikely(!req)) { 5012 spin_unlock_irq(&device->resource->req_lock); 5013 return -EIO; 5014 } 5015 __req_mod(req, what, &m); 5016 spin_unlock_irq(&device->resource->req_lock); 5017 5018 if (m.bio) 5019 complete_master_bio(device, &m); 5020 return 0; 5021 } 5022 5023 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi) 5024 { 5025 struct drbd_peer_device *peer_device; 5026 struct drbd_device *device; 5027 struct p_block_ack *p = pi->data; 5028 sector_t sector = be64_to_cpu(p->sector); 5029 int blksize = be32_to_cpu(p->blksize); 5030 enum drbd_req_event what; 5031 5032 peer_device = conn_peer_device(connection, pi->vnr); 5033 if (!peer_device) 5034 return -EIO; 5035 device = peer_device->device; 5036 5037 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5038 5039 if (p->block_id == ID_SYNCER) { 5040 drbd_set_in_sync(device, sector, blksize); 5041 dec_rs_pending(device); 5042 return 0; 5043 } 5044 switch (pi->cmd) { 5045 case P_RS_WRITE_ACK: 5046 what = WRITE_ACKED_BY_PEER_AND_SIS; 5047 break; 5048 case P_WRITE_ACK: 5049 what = WRITE_ACKED_BY_PEER; 5050 break; 5051 case P_RECV_ACK: 5052 what = RECV_ACKED_BY_PEER; 5053 break; 5054 case P_SUPERSEDED: 5055 what = CONFLICT_RESOLVED; 5056 break; 5057 case P_RETRY_WRITE: 5058 what = POSTPONE_WRITE; 5059 break; 5060 default: 5061 BUG(); 5062 } 5063 5064 return validate_req_change_req_state(device, p->block_id, sector, 5065 &device->write_requests, __func__, 5066 what, false); 5067 } 5068 5069 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi) 5070 { 5071 struct drbd_peer_device *peer_device; 5072 struct drbd_device *device; 5073 struct p_block_ack *p = pi->data; 5074 sector_t sector = be64_to_cpu(p->sector); 5075 int size = be32_to_cpu(p->blksize); 5076 int err; 5077 5078 peer_device = conn_peer_device(connection, pi->vnr); 5079 if (!peer_device) 5080 return -EIO; 5081 device = peer_device->device; 5082 5083 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5084 5085 if (p->block_id == ID_SYNCER) { 5086 dec_rs_pending(device); 5087 drbd_rs_failed_io(device, sector, size); 5088 return 0; 5089 } 5090 5091 err = validate_req_change_req_state(device, p->block_id, sector, 5092 &device->write_requests, __func__, 5093 NEG_ACKED, true); 5094 if (err) { 5095 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs. 5096 The master bio might already be completed, therefore the 5097 request is no longer in the collision hash. */ 5098 /* In Protocol B we might already have got a P_RECV_ACK 5099 but then get a P_NEG_ACK afterwards. */ 5100 drbd_set_out_of_sync(device, sector, size); 5101 } 5102 return 0; 5103 } 5104 5105 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi) 5106 { 5107 struct drbd_peer_device *peer_device; 5108 struct drbd_device *device; 5109 struct p_block_ack *p = pi->data; 5110 sector_t sector = be64_to_cpu(p->sector); 5111 5112 peer_device = conn_peer_device(connection, pi->vnr); 5113 if (!peer_device) 5114 return -EIO; 5115 device = peer_device->device; 5116 5117 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5118 5119 drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n", 5120 (unsigned long long)sector, be32_to_cpu(p->blksize)); 5121 5122 return validate_req_change_req_state(device, p->block_id, sector, 5123 &device->read_requests, __func__, 5124 NEG_ACKED, false); 5125 } 5126 5127 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi) 5128 { 5129 struct drbd_peer_device *peer_device; 5130 struct drbd_device *device; 5131 sector_t sector; 5132 int size; 5133 struct p_block_ack *p = pi->data; 5134 5135 peer_device = conn_peer_device(connection, pi->vnr); 5136 if (!peer_device) 5137 return -EIO; 5138 device = peer_device->device; 5139 5140 sector = be64_to_cpu(p->sector); 5141 size = be32_to_cpu(p->blksize); 5142 5143 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5144 5145 dec_rs_pending(device); 5146 5147 if (get_ldev_if_state(device, D_FAILED)) { 5148 drbd_rs_complete_io(device, sector); 5149 switch (pi->cmd) { 5150 case P_NEG_RS_DREPLY: 5151 drbd_rs_failed_io(device, sector, size); 5152 case P_RS_CANCEL: 5153 break; 5154 default: 5155 BUG(); 5156 } 5157 put_ldev(device); 5158 } 5159 5160 return 0; 5161 } 5162 5163 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi) 5164 { 5165 struct p_barrier_ack *p = pi->data; 5166 struct drbd_peer_device *peer_device; 5167 int vnr; 5168 5169 tl_release(connection, p->barrier, be32_to_cpu(p->set_size)); 5170 5171 rcu_read_lock(); 5172 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 5173 struct drbd_device *device = peer_device->device; 5174 5175 if (device->state.conn == C_AHEAD && 5176 atomic_read(&device->ap_in_flight) == 0 && 5177 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) { 5178 device->start_resync_timer.expires = jiffies + HZ; 5179 add_timer(&device->start_resync_timer); 5180 } 5181 } 5182 rcu_read_unlock(); 5183 5184 return 0; 5185 } 5186 5187 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi) 5188 { 5189 struct drbd_peer_device *peer_device; 5190 struct drbd_device *device; 5191 struct p_block_ack *p = pi->data; 5192 struct drbd_device_work *dw; 5193 sector_t sector; 5194 int size; 5195 5196 peer_device = conn_peer_device(connection, pi->vnr); 5197 if (!peer_device) 5198 return -EIO; 5199 device = peer_device->device; 5200 5201 sector = be64_to_cpu(p->sector); 5202 size = be32_to_cpu(p->blksize); 5203 5204 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5205 5206 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC) 5207 drbd_ov_out_of_sync_found(device, sector, size); 5208 else 5209 ov_out_of_sync_print(device); 5210 5211 if (!get_ldev(device)) 5212 return 0; 5213 5214 drbd_rs_complete_io(device, sector); 5215 dec_rs_pending(device); 5216 5217 --device->ov_left; 5218 5219 /* let's advance progress step marks only for every other megabyte */ 5220 if ((device->ov_left & 0x200) == 0x200) 5221 drbd_advance_rs_marks(device, device->ov_left); 5222 5223 if (device->ov_left == 0) { 5224 dw = kmalloc(sizeof(*dw), GFP_NOIO); 5225 if (dw) { 5226 dw->w.cb = w_ov_finished; 5227 dw->device = device; 5228 drbd_queue_work(&peer_device->connection->sender_work, &dw->w); 5229 } else { 5230 drbd_err(device, "kmalloc(dw) failed."); 5231 ov_out_of_sync_print(device); 5232 drbd_resync_finished(device); 5233 } 5234 } 5235 put_ldev(device); 5236 return 0; 5237 } 5238 5239 static int got_skip(struct drbd_connection *connection, struct packet_info *pi) 5240 { 5241 return 0; 5242 } 5243 5244 static int connection_finish_peer_reqs(struct drbd_connection *connection) 5245 { 5246 struct drbd_peer_device *peer_device; 5247 int vnr, not_empty = 0; 5248 5249 do { 5250 clear_bit(SIGNAL_ASENDER, &connection->flags); 5251 flush_signals(current); 5252 5253 rcu_read_lock(); 5254 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 5255 struct drbd_device *device = peer_device->device; 5256 kref_get(&device->kref); 5257 rcu_read_unlock(); 5258 if (drbd_finish_peer_reqs(device)) { 5259 kref_put(&device->kref, drbd_destroy_device); 5260 return 1; 5261 } 5262 kref_put(&device->kref, drbd_destroy_device); 5263 rcu_read_lock(); 5264 } 5265 set_bit(SIGNAL_ASENDER, &connection->flags); 5266 5267 spin_lock_irq(&connection->resource->req_lock); 5268 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 5269 struct drbd_device *device = peer_device->device; 5270 not_empty = !list_empty(&device->done_ee); 5271 if (not_empty) 5272 break; 5273 } 5274 spin_unlock_irq(&connection->resource->req_lock); 5275 rcu_read_unlock(); 5276 } while (not_empty); 5277 5278 return 0; 5279 } 5280 5281 struct asender_cmd { 5282 size_t pkt_size; 5283 int (*fn)(struct drbd_connection *connection, struct packet_info *); 5284 }; 5285 5286 static struct asender_cmd asender_tbl[] = { 5287 [P_PING] = { 0, got_Ping }, 5288 [P_PING_ACK] = { 0, got_PingAck }, 5289 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5290 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5291 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5292 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck }, 5293 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck }, 5294 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply }, 5295 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply }, 5296 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult }, 5297 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck }, 5298 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply }, 5299 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync }, 5300 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip }, 5301 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply }, 5302 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply }, 5303 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck }, 5304 }; 5305 5306 int drbd_asender(struct drbd_thread *thi) 5307 { 5308 struct drbd_connection *connection = thi->connection; 5309 struct asender_cmd *cmd = NULL; 5310 struct packet_info pi; 5311 int rv; 5312 void *buf = connection->meta.rbuf; 5313 int received = 0; 5314 unsigned int header_size = drbd_header_size(connection); 5315 int expect = header_size; 5316 bool ping_timeout_active = false; 5317 struct net_conf *nc; 5318 int ping_timeo, tcp_cork, ping_int; 5319 struct sched_param param = { .sched_priority = 2 }; 5320 5321 rv = sched_setscheduler(current, SCHED_RR, ¶m); 5322 if (rv < 0) 5323 drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv); 5324 5325 while (get_t_state(thi) == RUNNING) { 5326 drbd_thread_current_set_cpu(thi); 5327 5328 rcu_read_lock(); 5329 nc = rcu_dereference(connection->net_conf); 5330 ping_timeo = nc->ping_timeo; 5331 tcp_cork = nc->tcp_cork; 5332 ping_int = nc->ping_int; 5333 rcu_read_unlock(); 5334 5335 if (test_and_clear_bit(SEND_PING, &connection->flags)) { 5336 if (drbd_send_ping(connection)) { 5337 drbd_err(connection, "drbd_send_ping has failed\n"); 5338 goto reconnect; 5339 } 5340 connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10; 5341 ping_timeout_active = true; 5342 } 5343 5344 /* TODO: conditionally cork; it may hurt latency if we cork without 5345 much to send */ 5346 if (tcp_cork) 5347 drbd_tcp_cork(connection->meta.socket); 5348 if (connection_finish_peer_reqs(connection)) { 5349 drbd_err(connection, "connection_finish_peer_reqs() failed\n"); 5350 goto reconnect; 5351 } 5352 /* but unconditionally uncork unless disabled */ 5353 if (tcp_cork) 5354 drbd_tcp_uncork(connection->meta.socket); 5355 5356 /* short circuit, recv_msg would return EINTR anyways. */ 5357 if (signal_pending(current)) 5358 continue; 5359 5360 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0); 5361 clear_bit(SIGNAL_ASENDER, &connection->flags); 5362 5363 flush_signals(current); 5364 5365 /* Note: 5366 * -EINTR (on meta) we got a signal 5367 * -EAGAIN (on meta) rcvtimeo expired 5368 * -ECONNRESET other side closed the connection 5369 * -ERESTARTSYS (on data) we got a signal 5370 * rv < 0 other than above: unexpected error! 5371 * rv == expected: full header or command 5372 * rv < expected: "woken" by signal during receive 5373 * rv == 0 : "connection shut down by peer" 5374 */ 5375 if (likely(rv > 0)) { 5376 received += rv; 5377 buf += rv; 5378 } else if (rv == 0) { 5379 if (test_bit(DISCONNECT_SENT, &connection->flags)) { 5380 long t; 5381 rcu_read_lock(); 5382 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10; 5383 rcu_read_unlock(); 5384 5385 t = wait_event_timeout(connection->ping_wait, 5386 connection->cstate < C_WF_REPORT_PARAMS, 5387 t); 5388 if (t) 5389 break; 5390 } 5391 drbd_err(connection, "meta connection shut down by peer.\n"); 5392 goto reconnect; 5393 } else if (rv == -EAGAIN) { 5394 /* If the data socket received something meanwhile, 5395 * that is good enough: peer is still alive. */ 5396 if (time_after(connection->last_received, 5397 jiffies - connection->meta.socket->sk->sk_rcvtimeo)) 5398 continue; 5399 if (ping_timeout_active) { 5400 drbd_err(connection, "PingAck did not arrive in time.\n"); 5401 goto reconnect; 5402 } 5403 set_bit(SEND_PING, &connection->flags); 5404 continue; 5405 } else if (rv == -EINTR) { 5406 continue; 5407 } else { 5408 drbd_err(connection, "sock_recvmsg returned %d\n", rv); 5409 goto reconnect; 5410 } 5411 5412 if (received == expect && cmd == NULL) { 5413 if (decode_header(connection, connection->meta.rbuf, &pi)) 5414 goto reconnect; 5415 cmd = &asender_tbl[pi.cmd]; 5416 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) { 5417 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n", 5418 cmdname(pi.cmd), pi.cmd); 5419 goto disconnect; 5420 } 5421 expect = header_size + cmd->pkt_size; 5422 if (pi.size != expect - header_size) { 5423 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n", 5424 pi.cmd, pi.size); 5425 goto reconnect; 5426 } 5427 } 5428 if (received == expect) { 5429 bool err; 5430 5431 err = cmd->fn(connection, &pi); 5432 if (err) { 5433 drbd_err(connection, "%pf failed\n", cmd->fn); 5434 goto reconnect; 5435 } 5436 5437 connection->last_received = jiffies; 5438 5439 if (cmd == &asender_tbl[P_PING_ACK]) { 5440 /* restore idle timeout */ 5441 connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ; 5442 ping_timeout_active = false; 5443 } 5444 5445 buf = connection->meta.rbuf; 5446 received = 0; 5447 expect = header_size; 5448 cmd = NULL; 5449 } 5450 } 5451 5452 if (0) { 5453 reconnect: 5454 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 5455 conn_md_sync(connection); 5456 } 5457 if (0) { 5458 disconnect: 5459 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 5460 } 5461 clear_bit(SIGNAL_ASENDER, &connection->flags); 5462 5463 drbd_info(connection, "asender terminated\n"); 5464 5465 return 0; 5466 } 5467