1 /* 2 drbd_receiver.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 */ 24 25 26 #include <linux/module.h> 27 28 #include <asm/uaccess.h> 29 #include <net/sock.h> 30 31 #include <linux/drbd.h> 32 #include <linux/fs.h> 33 #include <linux/file.h> 34 #include <linux/in.h> 35 #include <linux/mm.h> 36 #include <linux/memcontrol.h> 37 #include <linux/mm_inline.h> 38 #include <linux/slab.h> 39 #include <linux/pkt_sched.h> 40 #define __KERNEL_SYSCALLS__ 41 #include <linux/unistd.h> 42 #include <linux/vmalloc.h> 43 #include <linux/random.h> 44 #include <linux/string.h> 45 #include <linux/scatterlist.h> 46 #include "drbd_int.h" 47 #include "drbd_protocol.h" 48 #include "drbd_req.h" 49 #include "drbd_vli.h" 50 51 #define PRO_FEATURES (FF_TRIM) 52 53 struct packet_info { 54 enum drbd_packet cmd; 55 unsigned int size; 56 unsigned int vnr; 57 void *data; 58 }; 59 60 enum finish_epoch { 61 FE_STILL_LIVE, 62 FE_DESTROYED, 63 FE_RECYCLED, 64 }; 65 66 static int drbd_do_features(struct drbd_connection *connection); 67 static int drbd_do_auth(struct drbd_connection *connection); 68 static int drbd_disconnected(struct drbd_peer_device *); 69 static void conn_wait_active_ee_empty(struct drbd_connection *connection); 70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event); 71 static int e_end_block(struct drbd_work *, int); 72 73 74 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 75 76 /* 77 * some helper functions to deal with single linked page lists, 78 * page->private being our "next" pointer. 79 */ 80 81 /* If at least n pages are linked at head, get n pages off. 82 * Otherwise, don't modify head, and return NULL. 83 * Locking is the responsibility of the caller. 84 */ 85 static struct page *page_chain_del(struct page **head, int n) 86 { 87 struct page *page; 88 struct page *tmp; 89 90 BUG_ON(!n); 91 BUG_ON(!head); 92 93 page = *head; 94 95 if (!page) 96 return NULL; 97 98 while (page) { 99 tmp = page_chain_next(page); 100 if (--n == 0) 101 break; /* found sufficient pages */ 102 if (tmp == NULL) 103 /* insufficient pages, don't use any of them. */ 104 return NULL; 105 page = tmp; 106 } 107 108 /* add end of list marker for the returned list */ 109 set_page_private(page, 0); 110 /* actual return value, and adjustment of head */ 111 page = *head; 112 *head = tmp; 113 return page; 114 } 115 116 /* may be used outside of locks to find the tail of a (usually short) 117 * "private" page chain, before adding it back to a global chain head 118 * with page_chain_add() under a spinlock. */ 119 static struct page *page_chain_tail(struct page *page, int *len) 120 { 121 struct page *tmp; 122 int i = 1; 123 while ((tmp = page_chain_next(page))) 124 ++i, page = tmp; 125 if (len) 126 *len = i; 127 return page; 128 } 129 130 static int page_chain_free(struct page *page) 131 { 132 struct page *tmp; 133 int i = 0; 134 page_chain_for_each_safe(page, tmp) { 135 put_page(page); 136 ++i; 137 } 138 return i; 139 } 140 141 static void page_chain_add(struct page **head, 142 struct page *chain_first, struct page *chain_last) 143 { 144 #if 1 145 struct page *tmp; 146 tmp = page_chain_tail(chain_first, NULL); 147 BUG_ON(tmp != chain_last); 148 #endif 149 150 /* add chain to head */ 151 set_page_private(chain_last, (unsigned long)*head); 152 *head = chain_first; 153 } 154 155 static struct page *__drbd_alloc_pages(struct drbd_device *device, 156 unsigned int number) 157 { 158 struct page *page = NULL; 159 struct page *tmp = NULL; 160 unsigned int i = 0; 161 162 /* Yes, testing drbd_pp_vacant outside the lock is racy. 163 * So what. It saves a spin_lock. */ 164 if (drbd_pp_vacant >= number) { 165 spin_lock(&drbd_pp_lock); 166 page = page_chain_del(&drbd_pp_pool, number); 167 if (page) 168 drbd_pp_vacant -= number; 169 spin_unlock(&drbd_pp_lock); 170 if (page) 171 return page; 172 } 173 174 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD 175 * "criss-cross" setup, that might cause write-out on some other DRBD, 176 * which in turn might block on the other node at this very place. */ 177 for (i = 0; i < number; i++) { 178 tmp = alloc_page(GFP_TRY); 179 if (!tmp) 180 break; 181 set_page_private(tmp, (unsigned long)page); 182 page = tmp; 183 } 184 185 if (i == number) 186 return page; 187 188 /* Not enough pages immediately available this time. 189 * No need to jump around here, drbd_alloc_pages will retry this 190 * function "soon". */ 191 if (page) { 192 tmp = page_chain_tail(page, NULL); 193 spin_lock(&drbd_pp_lock); 194 page_chain_add(&drbd_pp_pool, page, tmp); 195 drbd_pp_vacant += i; 196 spin_unlock(&drbd_pp_lock); 197 } 198 return NULL; 199 } 200 201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device, 202 struct list_head *to_be_freed) 203 { 204 struct drbd_peer_request *peer_req, *tmp; 205 206 /* The EEs are always appended to the end of the list. Since 207 they are sent in order over the wire, they have to finish 208 in order. As soon as we see the first not finished we can 209 stop to examine the list... */ 210 211 list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) { 212 if (drbd_peer_req_has_active_page(peer_req)) 213 break; 214 list_move(&peer_req->w.list, to_be_freed); 215 } 216 } 217 218 static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device) 219 { 220 LIST_HEAD(reclaimed); 221 struct drbd_peer_request *peer_req, *t; 222 223 spin_lock_irq(&device->resource->req_lock); 224 reclaim_finished_net_peer_reqs(device, &reclaimed); 225 spin_unlock_irq(&device->resource->req_lock); 226 227 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list) 228 drbd_free_net_peer_req(device, peer_req); 229 } 230 231 /** 232 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled) 233 * @device: DRBD device. 234 * @number: number of pages requested 235 * @retry: whether to retry, if not enough pages are available right now 236 * 237 * Tries to allocate number pages, first from our own page pool, then from 238 * the kernel. 239 * Possibly retry until DRBD frees sufficient pages somewhere else. 240 * 241 * If this allocation would exceed the max_buffers setting, we throttle 242 * allocation (schedule_timeout) to give the system some room to breathe. 243 * 244 * We do not use max-buffers as hard limit, because it could lead to 245 * congestion and further to a distributed deadlock during online-verify or 246 * (checksum based) resync, if the max-buffers, socket buffer sizes and 247 * resync-rate settings are mis-configured. 248 * 249 * Returns a page chain linked via page->private. 250 */ 251 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number, 252 bool retry) 253 { 254 struct drbd_device *device = peer_device->device; 255 struct page *page = NULL; 256 struct net_conf *nc; 257 DEFINE_WAIT(wait); 258 unsigned int mxb; 259 260 rcu_read_lock(); 261 nc = rcu_dereference(peer_device->connection->net_conf); 262 mxb = nc ? nc->max_buffers : 1000000; 263 rcu_read_unlock(); 264 265 if (atomic_read(&device->pp_in_use) < mxb) 266 page = __drbd_alloc_pages(device, number); 267 268 while (page == NULL) { 269 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE); 270 271 drbd_kick_lo_and_reclaim_net(device); 272 273 if (atomic_read(&device->pp_in_use) < mxb) { 274 page = __drbd_alloc_pages(device, number); 275 if (page) 276 break; 277 } 278 279 if (!retry) 280 break; 281 282 if (signal_pending(current)) { 283 drbd_warn(device, "drbd_alloc_pages interrupted!\n"); 284 break; 285 } 286 287 if (schedule_timeout(HZ/10) == 0) 288 mxb = UINT_MAX; 289 } 290 finish_wait(&drbd_pp_wait, &wait); 291 292 if (page) 293 atomic_add(number, &device->pp_in_use); 294 return page; 295 } 296 297 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages. 298 * Is also used from inside an other spin_lock_irq(&resource->req_lock); 299 * Either links the page chain back to the global pool, 300 * or returns all pages to the system. */ 301 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net) 302 { 303 atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use; 304 int i; 305 306 if (page == NULL) 307 return; 308 309 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count) 310 i = page_chain_free(page); 311 else { 312 struct page *tmp; 313 tmp = page_chain_tail(page, &i); 314 spin_lock(&drbd_pp_lock); 315 page_chain_add(&drbd_pp_pool, page, tmp); 316 drbd_pp_vacant += i; 317 spin_unlock(&drbd_pp_lock); 318 } 319 i = atomic_sub_return(i, a); 320 if (i < 0) 321 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n", 322 is_net ? "pp_in_use_by_net" : "pp_in_use", i); 323 wake_up(&drbd_pp_wait); 324 } 325 326 /* 327 You need to hold the req_lock: 328 _drbd_wait_ee_list_empty() 329 330 You must not have the req_lock: 331 drbd_free_peer_req() 332 drbd_alloc_peer_req() 333 drbd_free_peer_reqs() 334 drbd_ee_fix_bhs() 335 drbd_finish_peer_reqs() 336 drbd_clear_done_ee() 337 drbd_wait_ee_list_empty() 338 */ 339 340 struct drbd_peer_request * 341 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector, 342 unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local) 343 { 344 struct drbd_device *device = peer_device->device; 345 struct drbd_peer_request *peer_req; 346 struct page *page = NULL; 347 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT; 348 349 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE)) 350 return NULL; 351 352 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM); 353 if (!peer_req) { 354 if (!(gfp_mask & __GFP_NOWARN)) 355 drbd_err(device, "%s: allocation failed\n", __func__); 356 return NULL; 357 } 358 359 if (has_payload && data_size) { 360 page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT)); 361 if (!page) 362 goto fail; 363 } 364 365 drbd_clear_interval(&peer_req->i); 366 peer_req->i.size = data_size; 367 peer_req->i.sector = sector; 368 peer_req->i.local = false; 369 peer_req->i.waiting = false; 370 371 peer_req->epoch = NULL; 372 peer_req->peer_device = peer_device; 373 peer_req->pages = page; 374 atomic_set(&peer_req->pending_bios, 0); 375 peer_req->flags = 0; 376 /* 377 * The block_id is opaque to the receiver. It is not endianness 378 * converted, and sent back to the sender unchanged. 379 */ 380 peer_req->block_id = id; 381 382 return peer_req; 383 384 fail: 385 mempool_free(peer_req, drbd_ee_mempool); 386 return NULL; 387 } 388 389 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req, 390 int is_net) 391 { 392 if (peer_req->flags & EE_HAS_DIGEST) 393 kfree(peer_req->digest); 394 drbd_free_pages(device, peer_req->pages, is_net); 395 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0); 396 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 397 mempool_free(peer_req, drbd_ee_mempool); 398 } 399 400 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list) 401 { 402 LIST_HEAD(work_list); 403 struct drbd_peer_request *peer_req, *t; 404 int count = 0; 405 int is_net = list == &device->net_ee; 406 407 spin_lock_irq(&device->resource->req_lock); 408 list_splice_init(list, &work_list); 409 spin_unlock_irq(&device->resource->req_lock); 410 411 list_for_each_entry_safe(peer_req, t, &work_list, w.list) { 412 __drbd_free_peer_req(device, peer_req, is_net); 413 count++; 414 } 415 return count; 416 } 417 418 /* 419 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier. 420 */ 421 static int drbd_finish_peer_reqs(struct drbd_device *device) 422 { 423 LIST_HEAD(work_list); 424 LIST_HEAD(reclaimed); 425 struct drbd_peer_request *peer_req, *t; 426 int err = 0; 427 428 spin_lock_irq(&device->resource->req_lock); 429 reclaim_finished_net_peer_reqs(device, &reclaimed); 430 list_splice_init(&device->done_ee, &work_list); 431 spin_unlock_irq(&device->resource->req_lock); 432 433 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list) 434 drbd_free_net_peer_req(device, peer_req); 435 436 /* possible callbacks here: 437 * e_end_block, and e_end_resync_block, e_send_superseded. 438 * all ignore the last argument. 439 */ 440 list_for_each_entry_safe(peer_req, t, &work_list, w.list) { 441 int err2; 442 443 /* list_del not necessary, next/prev members not touched */ 444 err2 = peer_req->w.cb(&peer_req->w, !!err); 445 if (!err) 446 err = err2; 447 drbd_free_peer_req(device, peer_req); 448 } 449 wake_up(&device->ee_wait); 450 451 return err; 452 } 453 454 static void _drbd_wait_ee_list_empty(struct drbd_device *device, 455 struct list_head *head) 456 { 457 DEFINE_WAIT(wait); 458 459 /* avoids spin_lock/unlock 460 * and calling prepare_to_wait in the fast path */ 461 while (!list_empty(head)) { 462 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE); 463 spin_unlock_irq(&device->resource->req_lock); 464 io_schedule(); 465 finish_wait(&device->ee_wait, &wait); 466 spin_lock_irq(&device->resource->req_lock); 467 } 468 } 469 470 static void drbd_wait_ee_list_empty(struct drbd_device *device, 471 struct list_head *head) 472 { 473 spin_lock_irq(&device->resource->req_lock); 474 _drbd_wait_ee_list_empty(device, head); 475 spin_unlock_irq(&device->resource->req_lock); 476 } 477 478 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags) 479 { 480 struct kvec iov = { 481 .iov_base = buf, 482 .iov_len = size, 483 }; 484 struct msghdr msg = { 485 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL) 486 }; 487 return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags); 488 } 489 490 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size) 491 { 492 int rv; 493 494 rv = drbd_recv_short(connection->data.socket, buf, size, 0); 495 496 if (rv < 0) { 497 if (rv == -ECONNRESET) 498 drbd_info(connection, "sock was reset by peer\n"); 499 else if (rv != -ERESTARTSYS) 500 drbd_err(connection, "sock_recvmsg returned %d\n", rv); 501 } else if (rv == 0) { 502 if (test_bit(DISCONNECT_SENT, &connection->flags)) { 503 long t; 504 rcu_read_lock(); 505 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10; 506 rcu_read_unlock(); 507 508 t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t); 509 510 if (t) 511 goto out; 512 } 513 drbd_info(connection, "sock was shut down by peer\n"); 514 } 515 516 if (rv != size) 517 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD); 518 519 out: 520 return rv; 521 } 522 523 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size) 524 { 525 int err; 526 527 err = drbd_recv(connection, buf, size); 528 if (err != size) { 529 if (err >= 0) 530 err = -EIO; 531 } else 532 err = 0; 533 return err; 534 } 535 536 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size) 537 { 538 int err; 539 540 err = drbd_recv_all(connection, buf, size); 541 if (err && !signal_pending(current)) 542 drbd_warn(connection, "short read (expected size %d)\n", (int)size); 543 return err; 544 } 545 546 /* quoting tcp(7): 547 * On individual connections, the socket buffer size must be set prior to the 548 * listen(2) or connect(2) calls in order to have it take effect. 549 * This is our wrapper to do so. 550 */ 551 static void drbd_setbufsize(struct socket *sock, unsigned int snd, 552 unsigned int rcv) 553 { 554 /* open coded SO_SNDBUF, SO_RCVBUF */ 555 if (snd) { 556 sock->sk->sk_sndbuf = snd; 557 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK; 558 } 559 if (rcv) { 560 sock->sk->sk_rcvbuf = rcv; 561 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK; 562 } 563 } 564 565 static struct socket *drbd_try_connect(struct drbd_connection *connection) 566 { 567 const char *what; 568 struct socket *sock; 569 struct sockaddr_in6 src_in6; 570 struct sockaddr_in6 peer_in6; 571 struct net_conf *nc; 572 int err, peer_addr_len, my_addr_len; 573 int sndbuf_size, rcvbuf_size, connect_int; 574 int disconnect_on_error = 1; 575 576 rcu_read_lock(); 577 nc = rcu_dereference(connection->net_conf); 578 if (!nc) { 579 rcu_read_unlock(); 580 return NULL; 581 } 582 sndbuf_size = nc->sndbuf_size; 583 rcvbuf_size = nc->rcvbuf_size; 584 connect_int = nc->connect_int; 585 rcu_read_unlock(); 586 587 my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6)); 588 memcpy(&src_in6, &connection->my_addr, my_addr_len); 589 590 if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6) 591 src_in6.sin6_port = 0; 592 else 593 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */ 594 595 peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6)); 596 memcpy(&peer_in6, &connection->peer_addr, peer_addr_len); 597 598 what = "sock_create_kern"; 599 err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family, 600 SOCK_STREAM, IPPROTO_TCP, &sock); 601 if (err < 0) { 602 sock = NULL; 603 goto out; 604 } 605 606 sock->sk->sk_rcvtimeo = 607 sock->sk->sk_sndtimeo = connect_int * HZ; 608 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size); 609 610 /* explicitly bind to the configured IP as source IP 611 * for the outgoing connections. 612 * This is needed for multihomed hosts and to be 613 * able to use lo: interfaces for drbd. 614 * Make sure to use 0 as port number, so linux selects 615 * a free one dynamically. 616 */ 617 what = "bind before connect"; 618 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len); 619 if (err < 0) 620 goto out; 621 622 /* connect may fail, peer not yet available. 623 * stay C_WF_CONNECTION, don't go Disconnecting! */ 624 disconnect_on_error = 0; 625 what = "connect"; 626 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0); 627 628 out: 629 if (err < 0) { 630 if (sock) { 631 sock_release(sock); 632 sock = NULL; 633 } 634 switch (-err) { 635 /* timeout, busy, signal pending */ 636 case ETIMEDOUT: case EAGAIN: case EINPROGRESS: 637 case EINTR: case ERESTARTSYS: 638 /* peer not (yet) available, network problem */ 639 case ECONNREFUSED: case ENETUNREACH: 640 case EHOSTDOWN: case EHOSTUNREACH: 641 disconnect_on_error = 0; 642 break; 643 default: 644 drbd_err(connection, "%s failed, err = %d\n", what, err); 645 } 646 if (disconnect_on_error) 647 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 648 } 649 650 return sock; 651 } 652 653 struct accept_wait_data { 654 struct drbd_connection *connection; 655 struct socket *s_listen; 656 struct completion door_bell; 657 void (*original_sk_state_change)(struct sock *sk); 658 659 }; 660 661 static void drbd_incoming_connection(struct sock *sk) 662 { 663 struct accept_wait_data *ad = sk->sk_user_data; 664 void (*state_change)(struct sock *sk); 665 666 state_change = ad->original_sk_state_change; 667 if (sk->sk_state == TCP_ESTABLISHED) 668 complete(&ad->door_bell); 669 state_change(sk); 670 } 671 672 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad) 673 { 674 int err, sndbuf_size, rcvbuf_size, my_addr_len; 675 struct sockaddr_in6 my_addr; 676 struct socket *s_listen; 677 struct net_conf *nc; 678 const char *what; 679 680 rcu_read_lock(); 681 nc = rcu_dereference(connection->net_conf); 682 if (!nc) { 683 rcu_read_unlock(); 684 return -EIO; 685 } 686 sndbuf_size = nc->sndbuf_size; 687 rcvbuf_size = nc->rcvbuf_size; 688 rcu_read_unlock(); 689 690 my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6)); 691 memcpy(&my_addr, &connection->my_addr, my_addr_len); 692 693 what = "sock_create_kern"; 694 err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family, 695 SOCK_STREAM, IPPROTO_TCP, &s_listen); 696 if (err) { 697 s_listen = NULL; 698 goto out; 699 } 700 701 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 702 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size); 703 704 what = "bind before listen"; 705 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len); 706 if (err < 0) 707 goto out; 708 709 ad->s_listen = s_listen; 710 write_lock_bh(&s_listen->sk->sk_callback_lock); 711 ad->original_sk_state_change = s_listen->sk->sk_state_change; 712 s_listen->sk->sk_state_change = drbd_incoming_connection; 713 s_listen->sk->sk_user_data = ad; 714 write_unlock_bh(&s_listen->sk->sk_callback_lock); 715 716 what = "listen"; 717 err = s_listen->ops->listen(s_listen, 5); 718 if (err < 0) 719 goto out; 720 721 return 0; 722 out: 723 if (s_listen) 724 sock_release(s_listen); 725 if (err < 0) { 726 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) { 727 drbd_err(connection, "%s failed, err = %d\n", what, err); 728 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 729 } 730 } 731 732 return -EIO; 733 } 734 735 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad) 736 { 737 write_lock_bh(&sk->sk_callback_lock); 738 sk->sk_state_change = ad->original_sk_state_change; 739 sk->sk_user_data = NULL; 740 write_unlock_bh(&sk->sk_callback_lock); 741 } 742 743 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad) 744 { 745 int timeo, connect_int, err = 0; 746 struct socket *s_estab = NULL; 747 struct net_conf *nc; 748 749 rcu_read_lock(); 750 nc = rcu_dereference(connection->net_conf); 751 if (!nc) { 752 rcu_read_unlock(); 753 return NULL; 754 } 755 connect_int = nc->connect_int; 756 rcu_read_unlock(); 757 758 timeo = connect_int * HZ; 759 /* 28.5% random jitter */ 760 timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7; 761 762 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo); 763 if (err <= 0) 764 return NULL; 765 766 err = kernel_accept(ad->s_listen, &s_estab, 0); 767 if (err < 0) { 768 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) { 769 drbd_err(connection, "accept failed, err = %d\n", err); 770 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 771 } 772 } 773 774 if (s_estab) 775 unregister_state_change(s_estab->sk, ad); 776 777 return s_estab; 778 } 779 780 static int decode_header(struct drbd_connection *, void *, struct packet_info *); 781 782 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock, 783 enum drbd_packet cmd) 784 { 785 if (!conn_prepare_command(connection, sock)) 786 return -EIO; 787 return conn_send_command(connection, sock, cmd, 0, NULL, 0); 788 } 789 790 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock) 791 { 792 unsigned int header_size = drbd_header_size(connection); 793 struct packet_info pi; 794 int err; 795 796 err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0); 797 if (err != header_size) { 798 if (err >= 0) 799 err = -EIO; 800 return err; 801 } 802 err = decode_header(connection, connection->data.rbuf, &pi); 803 if (err) 804 return err; 805 return pi.cmd; 806 } 807 808 /** 809 * drbd_socket_okay() - Free the socket if its connection is not okay 810 * @sock: pointer to the pointer to the socket. 811 */ 812 static int drbd_socket_okay(struct socket **sock) 813 { 814 int rr; 815 char tb[4]; 816 817 if (!*sock) 818 return false; 819 820 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK); 821 822 if (rr > 0 || rr == -EAGAIN) { 823 return true; 824 } else { 825 sock_release(*sock); 826 *sock = NULL; 827 return false; 828 } 829 } 830 /* Gets called if a connection is established, or if a new minor gets created 831 in a connection */ 832 int drbd_connected(struct drbd_peer_device *peer_device) 833 { 834 struct drbd_device *device = peer_device->device; 835 int err; 836 837 atomic_set(&device->packet_seq, 0); 838 device->peer_seq = 0; 839 840 device->state_mutex = peer_device->connection->agreed_pro_version < 100 ? 841 &peer_device->connection->cstate_mutex : 842 &device->own_state_mutex; 843 844 err = drbd_send_sync_param(peer_device); 845 if (!err) 846 err = drbd_send_sizes(peer_device, 0, 0); 847 if (!err) 848 err = drbd_send_uuids(peer_device); 849 if (!err) 850 err = drbd_send_current_state(peer_device); 851 clear_bit(USE_DEGR_WFC_T, &device->flags); 852 clear_bit(RESIZE_PENDING, &device->flags); 853 atomic_set(&device->ap_in_flight, 0); 854 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */ 855 return err; 856 } 857 858 /* 859 * return values: 860 * 1 yes, we have a valid connection 861 * 0 oops, did not work out, please try again 862 * -1 peer talks different language, 863 * no point in trying again, please go standalone. 864 * -2 We do not have a network config... 865 */ 866 static int conn_connect(struct drbd_connection *connection) 867 { 868 struct drbd_socket sock, msock; 869 struct drbd_peer_device *peer_device; 870 struct net_conf *nc; 871 int vnr, timeout, h, ok; 872 bool discard_my_data; 873 enum drbd_state_rv rv; 874 struct accept_wait_data ad = { 875 .connection = connection, 876 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell), 877 }; 878 879 clear_bit(DISCONNECT_SENT, &connection->flags); 880 if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS) 881 return -2; 882 883 mutex_init(&sock.mutex); 884 sock.sbuf = connection->data.sbuf; 885 sock.rbuf = connection->data.rbuf; 886 sock.socket = NULL; 887 mutex_init(&msock.mutex); 888 msock.sbuf = connection->meta.sbuf; 889 msock.rbuf = connection->meta.rbuf; 890 msock.socket = NULL; 891 892 /* Assume that the peer only understands protocol 80 until we know better. */ 893 connection->agreed_pro_version = 80; 894 895 if (prepare_listen_socket(connection, &ad)) 896 return 0; 897 898 do { 899 struct socket *s; 900 901 s = drbd_try_connect(connection); 902 if (s) { 903 if (!sock.socket) { 904 sock.socket = s; 905 send_first_packet(connection, &sock, P_INITIAL_DATA); 906 } else if (!msock.socket) { 907 clear_bit(RESOLVE_CONFLICTS, &connection->flags); 908 msock.socket = s; 909 send_first_packet(connection, &msock, P_INITIAL_META); 910 } else { 911 drbd_err(connection, "Logic error in conn_connect()\n"); 912 goto out_release_sockets; 913 } 914 } 915 916 if (sock.socket && msock.socket) { 917 rcu_read_lock(); 918 nc = rcu_dereference(connection->net_conf); 919 timeout = nc->ping_timeo * HZ / 10; 920 rcu_read_unlock(); 921 schedule_timeout_interruptible(timeout); 922 ok = drbd_socket_okay(&sock.socket); 923 ok = drbd_socket_okay(&msock.socket) && ok; 924 if (ok) 925 break; 926 } 927 928 retry: 929 s = drbd_wait_for_connect(connection, &ad); 930 if (s) { 931 int fp = receive_first_packet(connection, s); 932 drbd_socket_okay(&sock.socket); 933 drbd_socket_okay(&msock.socket); 934 switch (fp) { 935 case P_INITIAL_DATA: 936 if (sock.socket) { 937 drbd_warn(connection, "initial packet S crossed\n"); 938 sock_release(sock.socket); 939 sock.socket = s; 940 goto randomize; 941 } 942 sock.socket = s; 943 break; 944 case P_INITIAL_META: 945 set_bit(RESOLVE_CONFLICTS, &connection->flags); 946 if (msock.socket) { 947 drbd_warn(connection, "initial packet M crossed\n"); 948 sock_release(msock.socket); 949 msock.socket = s; 950 goto randomize; 951 } 952 msock.socket = s; 953 break; 954 default: 955 drbd_warn(connection, "Error receiving initial packet\n"); 956 sock_release(s); 957 randomize: 958 if (prandom_u32() & 1) 959 goto retry; 960 } 961 } 962 963 if (connection->cstate <= C_DISCONNECTING) 964 goto out_release_sockets; 965 if (signal_pending(current)) { 966 flush_signals(current); 967 smp_rmb(); 968 if (get_t_state(&connection->receiver) == EXITING) 969 goto out_release_sockets; 970 } 971 972 ok = drbd_socket_okay(&sock.socket); 973 ok = drbd_socket_okay(&msock.socket) && ok; 974 } while (!ok); 975 976 if (ad.s_listen) 977 sock_release(ad.s_listen); 978 979 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 980 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 981 982 sock.socket->sk->sk_allocation = GFP_NOIO; 983 msock.socket->sk->sk_allocation = GFP_NOIO; 984 985 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK; 986 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE; 987 988 /* NOT YET ... 989 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10; 990 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; 991 * first set it to the P_CONNECTION_FEATURES timeout, 992 * which we set to 4x the configured ping_timeout. */ 993 rcu_read_lock(); 994 nc = rcu_dereference(connection->net_conf); 995 996 sock.socket->sk->sk_sndtimeo = 997 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10; 998 999 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ; 1000 timeout = nc->timeout * HZ / 10; 1001 discard_my_data = nc->discard_my_data; 1002 rcu_read_unlock(); 1003 1004 msock.socket->sk->sk_sndtimeo = timeout; 1005 1006 /* we don't want delays. 1007 * we use TCP_CORK where appropriate, though */ 1008 drbd_tcp_nodelay(sock.socket); 1009 drbd_tcp_nodelay(msock.socket); 1010 1011 connection->data.socket = sock.socket; 1012 connection->meta.socket = msock.socket; 1013 connection->last_received = jiffies; 1014 1015 h = drbd_do_features(connection); 1016 if (h <= 0) 1017 return h; 1018 1019 if (connection->cram_hmac_tfm) { 1020 /* drbd_request_state(device, NS(conn, WFAuth)); */ 1021 switch (drbd_do_auth(connection)) { 1022 case -1: 1023 drbd_err(connection, "Authentication of peer failed\n"); 1024 return -1; 1025 case 0: 1026 drbd_err(connection, "Authentication of peer failed, trying again.\n"); 1027 return 0; 1028 } 1029 } 1030 1031 connection->data.socket->sk->sk_sndtimeo = timeout; 1032 connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; 1033 1034 if (drbd_send_protocol(connection) == -EOPNOTSUPP) 1035 return -1; 1036 1037 /* Prevent a race between resync-handshake and 1038 * being promoted to Primary. 1039 * 1040 * Grab and release the state mutex, so we know that any current 1041 * drbd_set_role() is finished, and any incoming drbd_set_role 1042 * will see the STATE_SENT flag, and wait for it to be cleared. 1043 */ 1044 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) 1045 mutex_lock(peer_device->device->state_mutex); 1046 1047 set_bit(STATE_SENT, &connection->flags); 1048 1049 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) 1050 mutex_unlock(peer_device->device->state_mutex); 1051 1052 rcu_read_lock(); 1053 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1054 struct drbd_device *device = peer_device->device; 1055 kref_get(&device->kref); 1056 rcu_read_unlock(); 1057 1058 if (discard_my_data) 1059 set_bit(DISCARD_MY_DATA, &device->flags); 1060 else 1061 clear_bit(DISCARD_MY_DATA, &device->flags); 1062 1063 drbd_connected(peer_device); 1064 kref_put(&device->kref, drbd_destroy_device); 1065 rcu_read_lock(); 1066 } 1067 rcu_read_unlock(); 1068 1069 rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE); 1070 if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) { 1071 clear_bit(STATE_SENT, &connection->flags); 1072 return 0; 1073 } 1074 1075 drbd_thread_start(&connection->asender); 1076 1077 mutex_lock(&connection->resource->conf_update); 1078 /* The discard_my_data flag is a single-shot modifier to the next 1079 * connection attempt, the handshake of which is now well underway. 1080 * No need for rcu style copying of the whole struct 1081 * just to clear a single value. */ 1082 connection->net_conf->discard_my_data = 0; 1083 mutex_unlock(&connection->resource->conf_update); 1084 1085 return h; 1086 1087 out_release_sockets: 1088 if (ad.s_listen) 1089 sock_release(ad.s_listen); 1090 if (sock.socket) 1091 sock_release(sock.socket); 1092 if (msock.socket) 1093 sock_release(msock.socket); 1094 return -1; 1095 } 1096 1097 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi) 1098 { 1099 unsigned int header_size = drbd_header_size(connection); 1100 1101 if (header_size == sizeof(struct p_header100) && 1102 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) { 1103 struct p_header100 *h = header; 1104 if (h->pad != 0) { 1105 drbd_err(connection, "Header padding is not zero\n"); 1106 return -EINVAL; 1107 } 1108 pi->vnr = be16_to_cpu(h->volume); 1109 pi->cmd = be16_to_cpu(h->command); 1110 pi->size = be32_to_cpu(h->length); 1111 } else if (header_size == sizeof(struct p_header95) && 1112 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) { 1113 struct p_header95 *h = header; 1114 pi->cmd = be16_to_cpu(h->command); 1115 pi->size = be32_to_cpu(h->length); 1116 pi->vnr = 0; 1117 } else if (header_size == sizeof(struct p_header80) && 1118 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) { 1119 struct p_header80 *h = header; 1120 pi->cmd = be16_to_cpu(h->command); 1121 pi->size = be16_to_cpu(h->length); 1122 pi->vnr = 0; 1123 } else { 1124 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n", 1125 be32_to_cpu(*(__be32 *)header), 1126 connection->agreed_pro_version); 1127 return -EINVAL; 1128 } 1129 pi->data = header + header_size; 1130 return 0; 1131 } 1132 1133 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi) 1134 { 1135 void *buffer = connection->data.rbuf; 1136 int err; 1137 1138 err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection)); 1139 if (err) 1140 return err; 1141 1142 err = decode_header(connection, buffer, pi); 1143 connection->last_received = jiffies; 1144 1145 return err; 1146 } 1147 1148 static void drbd_flush(struct drbd_connection *connection) 1149 { 1150 int rv; 1151 struct drbd_peer_device *peer_device; 1152 int vnr; 1153 1154 if (connection->write_ordering >= WO_bdev_flush) { 1155 rcu_read_lock(); 1156 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1157 struct drbd_device *device = peer_device->device; 1158 1159 if (!get_ldev(device)) 1160 continue; 1161 kref_get(&device->kref); 1162 rcu_read_unlock(); 1163 1164 rv = blkdev_issue_flush(device->ldev->backing_bdev, 1165 GFP_NOIO, NULL); 1166 if (rv) { 1167 drbd_info(device, "local disk flush failed with status %d\n", rv); 1168 /* would rather check on EOPNOTSUPP, but that is not reliable. 1169 * don't try again for ANY return value != 0 1170 * if (rv == -EOPNOTSUPP) */ 1171 drbd_bump_write_ordering(connection, WO_drain_io); 1172 } 1173 put_ldev(device); 1174 kref_put(&device->kref, drbd_destroy_device); 1175 1176 rcu_read_lock(); 1177 if (rv) 1178 break; 1179 } 1180 rcu_read_unlock(); 1181 } 1182 } 1183 1184 /** 1185 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it. 1186 * @device: DRBD device. 1187 * @epoch: Epoch object. 1188 * @ev: Epoch event. 1189 */ 1190 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection, 1191 struct drbd_epoch *epoch, 1192 enum epoch_event ev) 1193 { 1194 int epoch_size; 1195 struct drbd_epoch *next_epoch; 1196 enum finish_epoch rv = FE_STILL_LIVE; 1197 1198 spin_lock(&connection->epoch_lock); 1199 do { 1200 next_epoch = NULL; 1201 1202 epoch_size = atomic_read(&epoch->epoch_size); 1203 1204 switch (ev & ~EV_CLEANUP) { 1205 case EV_PUT: 1206 atomic_dec(&epoch->active); 1207 break; 1208 case EV_GOT_BARRIER_NR: 1209 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags); 1210 break; 1211 case EV_BECAME_LAST: 1212 /* nothing to do*/ 1213 break; 1214 } 1215 1216 if (epoch_size != 0 && 1217 atomic_read(&epoch->active) == 0 && 1218 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) { 1219 if (!(ev & EV_CLEANUP)) { 1220 spin_unlock(&connection->epoch_lock); 1221 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size); 1222 spin_lock(&connection->epoch_lock); 1223 } 1224 #if 0 1225 /* FIXME: dec unacked on connection, once we have 1226 * something to count pending connection packets in. */ 1227 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) 1228 dec_unacked(epoch->connection); 1229 #endif 1230 1231 if (connection->current_epoch != epoch) { 1232 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list); 1233 list_del(&epoch->list); 1234 ev = EV_BECAME_LAST | (ev & EV_CLEANUP); 1235 connection->epochs--; 1236 kfree(epoch); 1237 1238 if (rv == FE_STILL_LIVE) 1239 rv = FE_DESTROYED; 1240 } else { 1241 epoch->flags = 0; 1242 atomic_set(&epoch->epoch_size, 0); 1243 /* atomic_set(&epoch->active, 0); is already zero */ 1244 if (rv == FE_STILL_LIVE) 1245 rv = FE_RECYCLED; 1246 } 1247 } 1248 1249 if (!next_epoch) 1250 break; 1251 1252 epoch = next_epoch; 1253 } while (1); 1254 1255 spin_unlock(&connection->epoch_lock); 1256 1257 return rv; 1258 } 1259 1260 /** 1261 * drbd_bump_write_ordering() - Fall back to an other write ordering method 1262 * @connection: DRBD connection. 1263 * @wo: Write ordering method to try. 1264 */ 1265 void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo) 1266 { 1267 struct disk_conf *dc; 1268 struct drbd_peer_device *peer_device; 1269 enum write_ordering_e pwo; 1270 int vnr; 1271 static char *write_ordering_str[] = { 1272 [WO_none] = "none", 1273 [WO_drain_io] = "drain", 1274 [WO_bdev_flush] = "flush", 1275 }; 1276 1277 pwo = connection->write_ordering; 1278 wo = min(pwo, wo); 1279 rcu_read_lock(); 1280 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1281 struct drbd_device *device = peer_device->device; 1282 1283 if (!get_ldev_if_state(device, D_ATTACHING)) 1284 continue; 1285 dc = rcu_dereference(device->ldev->disk_conf); 1286 1287 if (wo == WO_bdev_flush && !dc->disk_flushes) 1288 wo = WO_drain_io; 1289 if (wo == WO_drain_io && !dc->disk_drain) 1290 wo = WO_none; 1291 put_ldev(device); 1292 } 1293 rcu_read_unlock(); 1294 connection->write_ordering = wo; 1295 if (pwo != connection->write_ordering || wo == WO_bdev_flush) 1296 drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]); 1297 } 1298 1299 /** 1300 * drbd_submit_peer_request() 1301 * @device: DRBD device. 1302 * @peer_req: peer request 1303 * @rw: flag field, see bio->bi_rw 1304 * 1305 * May spread the pages to multiple bios, 1306 * depending on bio_add_page restrictions. 1307 * 1308 * Returns 0 if all bios have been submitted, 1309 * -ENOMEM if we could not allocate enough bios, 1310 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a 1311 * single page to an empty bio (which should never happen and likely indicates 1312 * that the lower level IO stack is in some way broken). This has been observed 1313 * on certain Xen deployments. 1314 */ 1315 /* TODO allocate from our own bio_set. */ 1316 int drbd_submit_peer_request(struct drbd_device *device, 1317 struct drbd_peer_request *peer_req, 1318 const unsigned rw, const int fault_type) 1319 { 1320 struct bio *bios = NULL; 1321 struct bio *bio; 1322 struct page *page = peer_req->pages; 1323 sector_t sector = peer_req->i.sector; 1324 unsigned ds = peer_req->i.size; 1325 unsigned n_bios = 0; 1326 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT; 1327 int err = -ENOMEM; 1328 1329 if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) { 1330 /* wait for all pending IO completions, before we start 1331 * zeroing things out. */ 1332 conn_wait_active_ee_empty(first_peer_device(device)->connection); 1333 if (blkdev_issue_zeroout(device->ldev->backing_bdev, 1334 sector, ds >> 9, GFP_NOIO)) 1335 peer_req->flags |= EE_WAS_ERROR; 1336 drbd_endio_write_sec_final(peer_req); 1337 return 0; 1338 } 1339 1340 if (peer_req->flags & EE_IS_TRIM) 1341 nr_pages = 0; /* discards don't have any payload. */ 1342 1343 /* In most cases, we will only need one bio. But in case the lower 1344 * level restrictions happen to be different at this offset on this 1345 * side than those of the sending peer, we may need to submit the 1346 * request in more than one bio. 1347 * 1348 * Plain bio_alloc is good enough here, this is no DRBD internally 1349 * generated bio, but a bio allocated on behalf of the peer. 1350 */ 1351 next_bio: 1352 bio = bio_alloc(GFP_NOIO, nr_pages); 1353 if (!bio) { 1354 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages); 1355 goto fail; 1356 } 1357 /* > peer_req->i.sector, unless this is the first bio */ 1358 bio->bi_iter.bi_sector = sector; 1359 bio->bi_bdev = device->ldev->backing_bdev; 1360 bio->bi_rw = rw; 1361 bio->bi_private = peer_req; 1362 bio->bi_end_io = drbd_peer_request_endio; 1363 1364 bio->bi_next = bios; 1365 bios = bio; 1366 ++n_bios; 1367 1368 if (rw & REQ_DISCARD) { 1369 bio->bi_iter.bi_size = ds; 1370 goto submit; 1371 } 1372 1373 page_chain_for_each(page) { 1374 unsigned len = min_t(unsigned, ds, PAGE_SIZE); 1375 if (!bio_add_page(bio, page, len, 0)) { 1376 /* A single page must always be possible! 1377 * But in case it fails anyways, 1378 * we deal with it, and complain (below). */ 1379 if (bio->bi_vcnt == 0) { 1380 drbd_err(device, 1381 "bio_add_page failed for len=%u, " 1382 "bi_vcnt=0 (bi_sector=%llu)\n", 1383 len, (uint64_t)bio->bi_iter.bi_sector); 1384 err = -ENOSPC; 1385 goto fail; 1386 } 1387 goto next_bio; 1388 } 1389 ds -= len; 1390 sector += len >> 9; 1391 --nr_pages; 1392 } 1393 D_ASSERT(device, ds == 0); 1394 submit: 1395 D_ASSERT(device, page == NULL); 1396 1397 atomic_set(&peer_req->pending_bios, n_bios); 1398 do { 1399 bio = bios; 1400 bios = bios->bi_next; 1401 bio->bi_next = NULL; 1402 1403 drbd_generic_make_request(device, fault_type, bio); 1404 } while (bios); 1405 return 0; 1406 1407 fail: 1408 while (bios) { 1409 bio = bios; 1410 bios = bios->bi_next; 1411 bio_put(bio); 1412 } 1413 return err; 1414 } 1415 1416 static void drbd_remove_epoch_entry_interval(struct drbd_device *device, 1417 struct drbd_peer_request *peer_req) 1418 { 1419 struct drbd_interval *i = &peer_req->i; 1420 1421 drbd_remove_interval(&device->write_requests, i); 1422 drbd_clear_interval(i); 1423 1424 /* Wake up any processes waiting for this peer request to complete. */ 1425 if (i->waiting) 1426 wake_up(&device->misc_wait); 1427 } 1428 1429 static void conn_wait_active_ee_empty(struct drbd_connection *connection) 1430 { 1431 struct drbd_peer_device *peer_device; 1432 int vnr; 1433 1434 rcu_read_lock(); 1435 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1436 struct drbd_device *device = peer_device->device; 1437 1438 kref_get(&device->kref); 1439 rcu_read_unlock(); 1440 drbd_wait_ee_list_empty(device, &device->active_ee); 1441 kref_put(&device->kref, drbd_destroy_device); 1442 rcu_read_lock(); 1443 } 1444 rcu_read_unlock(); 1445 } 1446 1447 static struct drbd_peer_device * 1448 conn_peer_device(struct drbd_connection *connection, int volume_number) 1449 { 1450 return idr_find(&connection->peer_devices, volume_number); 1451 } 1452 1453 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi) 1454 { 1455 int rv; 1456 struct p_barrier *p = pi->data; 1457 struct drbd_epoch *epoch; 1458 1459 /* FIXME these are unacked on connection, 1460 * not a specific (peer)device. 1461 */ 1462 connection->current_epoch->barrier_nr = p->barrier; 1463 connection->current_epoch->connection = connection; 1464 rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR); 1465 1466 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from 1467 * the activity log, which means it would not be resynced in case the 1468 * R_PRIMARY crashes now. 1469 * Therefore we must send the barrier_ack after the barrier request was 1470 * completed. */ 1471 switch (connection->write_ordering) { 1472 case WO_none: 1473 if (rv == FE_RECYCLED) 1474 return 0; 1475 1476 /* receiver context, in the writeout path of the other node. 1477 * avoid potential distributed deadlock */ 1478 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO); 1479 if (epoch) 1480 break; 1481 else 1482 drbd_warn(connection, "Allocation of an epoch failed, slowing down\n"); 1483 /* Fall through */ 1484 1485 case WO_bdev_flush: 1486 case WO_drain_io: 1487 conn_wait_active_ee_empty(connection); 1488 drbd_flush(connection); 1489 1490 if (atomic_read(&connection->current_epoch->epoch_size)) { 1491 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO); 1492 if (epoch) 1493 break; 1494 } 1495 1496 return 0; 1497 default: 1498 drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering); 1499 return -EIO; 1500 } 1501 1502 epoch->flags = 0; 1503 atomic_set(&epoch->epoch_size, 0); 1504 atomic_set(&epoch->active, 0); 1505 1506 spin_lock(&connection->epoch_lock); 1507 if (atomic_read(&connection->current_epoch->epoch_size)) { 1508 list_add(&epoch->list, &connection->current_epoch->list); 1509 connection->current_epoch = epoch; 1510 connection->epochs++; 1511 } else { 1512 /* The current_epoch got recycled while we allocated this one... */ 1513 kfree(epoch); 1514 } 1515 spin_unlock(&connection->epoch_lock); 1516 1517 return 0; 1518 } 1519 1520 /* used from receive_RSDataReply (recv_resync_read) 1521 * and from receive_Data */ 1522 static struct drbd_peer_request * 1523 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector, 1524 struct packet_info *pi) __must_hold(local) 1525 { 1526 struct drbd_device *device = peer_device->device; 1527 const sector_t capacity = drbd_get_capacity(device->this_bdev); 1528 struct drbd_peer_request *peer_req; 1529 struct page *page; 1530 int dgs, ds, err; 1531 int data_size = pi->size; 1532 void *dig_in = peer_device->connection->int_dig_in; 1533 void *dig_vv = peer_device->connection->int_dig_vv; 1534 unsigned long *data; 1535 struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL; 1536 1537 dgs = 0; 1538 if (!trim && peer_device->connection->peer_integrity_tfm) { 1539 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm); 1540 /* 1541 * FIXME: Receive the incoming digest into the receive buffer 1542 * here, together with its struct p_data? 1543 */ 1544 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs); 1545 if (err) 1546 return NULL; 1547 data_size -= dgs; 1548 } 1549 1550 if (trim) { 1551 D_ASSERT(peer_device, data_size == 0); 1552 data_size = be32_to_cpu(trim->size); 1553 } 1554 1555 if (!expect(IS_ALIGNED(data_size, 512))) 1556 return NULL; 1557 /* prepare for larger trim requests. */ 1558 if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE)) 1559 return NULL; 1560 1561 /* even though we trust out peer, 1562 * we sometimes have to double check. */ 1563 if (sector + (data_size>>9) > capacity) { 1564 drbd_err(device, "request from peer beyond end of local disk: " 1565 "capacity: %llus < sector: %llus + size: %u\n", 1566 (unsigned long long)capacity, 1567 (unsigned long long)sector, data_size); 1568 return NULL; 1569 } 1570 1571 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD 1572 * "criss-cross" setup, that might cause write-out on some other DRBD, 1573 * which in turn might block on the other node at this very place. */ 1574 peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO); 1575 if (!peer_req) 1576 return NULL; 1577 1578 if (trim) 1579 return peer_req; 1580 1581 ds = data_size; 1582 page = peer_req->pages; 1583 page_chain_for_each(page) { 1584 unsigned len = min_t(int, ds, PAGE_SIZE); 1585 data = kmap(page); 1586 err = drbd_recv_all_warn(peer_device->connection, data, len); 1587 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) { 1588 drbd_err(device, "Fault injection: Corrupting data on receive\n"); 1589 data[0] = data[0] ^ (unsigned long)-1; 1590 } 1591 kunmap(page); 1592 if (err) { 1593 drbd_free_peer_req(device, peer_req); 1594 return NULL; 1595 } 1596 ds -= len; 1597 } 1598 1599 if (dgs) { 1600 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv); 1601 if (memcmp(dig_in, dig_vv, dgs)) { 1602 drbd_err(device, "Digest integrity check FAILED: %llus +%u\n", 1603 (unsigned long long)sector, data_size); 1604 drbd_free_peer_req(device, peer_req); 1605 return NULL; 1606 } 1607 } 1608 device->recv_cnt += data_size>>9; 1609 return peer_req; 1610 } 1611 1612 /* drbd_drain_block() just takes a data block 1613 * out of the socket input buffer, and discards it. 1614 */ 1615 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size) 1616 { 1617 struct page *page; 1618 int err = 0; 1619 void *data; 1620 1621 if (!data_size) 1622 return 0; 1623 1624 page = drbd_alloc_pages(peer_device, 1, 1); 1625 1626 data = kmap(page); 1627 while (data_size) { 1628 unsigned int len = min_t(int, data_size, PAGE_SIZE); 1629 1630 err = drbd_recv_all_warn(peer_device->connection, data, len); 1631 if (err) 1632 break; 1633 data_size -= len; 1634 } 1635 kunmap(page); 1636 drbd_free_pages(peer_device->device, page, 0); 1637 return err; 1638 } 1639 1640 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req, 1641 sector_t sector, int data_size) 1642 { 1643 struct bio_vec bvec; 1644 struct bvec_iter iter; 1645 struct bio *bio; 1646 int dgs, err, expect; 1647 void *dig_in = peer_device->connection->int_dig_in; 1648 void *dig_vv = peer_device->connection->int_dig_vv; 1649 1650 dgs = 0; 1651 if (peer_device->connection->peer_integrity_tfm) { 1652 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm); 1653 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs); 1654 if (err) 1655 return err; 1656 data_size -= dgs; 1657 } 1658 1659 /* optimistically update recv_cnt. if receiving fails below, 1660 * we disconnect anyways, and counters will be reset. */ 1661 peer_device->device->recv_cnt += data_size>>9; 1662 1663 bio = req->master_bio; 1664 D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector); 1665 1666 bio_for_each_segment(bvec, bio, iter) { 1667 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset; 1668 expect = min_t(int, data_size, bvec.bv_len); 1669 err = drbd_recv_all_warn(peer_device->connection, mapped, expect); 1670 kunmap(bvec.bv_page); 1671 if (err) 1672 return err; 1673 data_size -= expect; 1674 } 1675 1676 if (dgs) { 1677 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv); 1678 if (memcmp(dig_in, dig_vv, dgs)) { 1679 drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n"); 1680 return -EINVAL; 1681 } 1682 } 1683 1684 D_ASSERT(peer_device->device, data_size == 0); 1685 return 0; 1686 } 1687 1688 /* 1689 * e_end_resync_block() is called in asender context via 1690 * drbd_finish_peer_reqs(). 1691 */ 1692 static int e_end_resync_block(struct drbd_work *w, int unused) 1693 { 1694 struct drbd_peer_request *peer_req = 1695 container_of(w, struct drbd_peer_request, w); 1696 struct drbd_peer_device *peer_device = peer_req->peer_device; 1697 struct drbd_device *device = peer_device->device; 1698 sector_t sector = peer_req->i.sector; 1699 int err; 1700 1701 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 1702 1703 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1704 drbd_set_in_sync(device, sector, peer_req->i.size); 1705 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req); 1706 } else { 1707 /* Record failure to sync */ 1708 drbd_rs_failed_io(device, sector, peer_req->i.size); 1709 1710 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req); 1711 } 1712 dec_unacked(device); 1713 1714 return err; 1715 } 1716 1717 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector, 1718 struct packet_info *pi) __releases(local) 1719 { 1720 struct drbd_device *device = peer_device->device; 1721 struct drbd_peer_request *peer_req; 1722 1723 peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi); 1724 if (!peer_req) 1725 goto fail; 1726 1727 dec_rs_pending(device); 1728 1729 inc_unacked(device); 1730 /* corresponding dec_unacked() in e_end_resync_block() 1731 * respective _drbd_clear_done_ee */ 1732 1733 peer_req->w.cb = e_end_resync_block; 1734 1735 spin_lock_irq(&device->resource->req_lock); 1736 list_add(&peer_req->w.list, &device->sync_ee); 1737 spin_unlock_irq(&device->resource->req_lock); 1738 1739 atomic_add(pi->size >> 9, &device->rs_sect_ev); 1740 if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0) 1741 return 0; 1742 1743 /* don't care for the reason here */ 1744 drbd_err(device, "submit failed, triggering re-connect\n"); 1745 spin_lock_irq(&device->resource->req_lock); 1746 list_del(&peer_req->w.list); 1747 spin_unlock_irq(&device->resource->req_lock); 1748 1749 drbd_free_peer_req(device, peer_req); 1750 fail: 1751 put_ldev(device); 1752 return -EIO; 1753 } 1754 1755 static struct drbd_request * 1756 find_request(struct drbd_device *device, struct rb_root *root, u64 id, 1757 sector_t sector, bool missing_ok, const char *func) 1758 { 1759 struct drbd_request *req; 1760 1761 /* Request object according to our peer */ 1762 req = (struct drbd_request *)(unsigned long)id; 1763 if (drbd_contains_interval(root, sector, &req->i) && req->i.local) 1764 return req; 1765 if (!missing_ok) { 1766 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func, 1767 (unsigned long)id, (unsigned long long)sector); 1768 } 1769 return NULL; 1770 } 1771 1772 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi) 1773 { 1774 struct drbd_peer_device *peer_device; 1775 struct drbd_device *device; 1776 struct drbd_request *req; 1777 sector_t sector; 1778 int err; 1779 struct p_data *p = pi->data; 1780 1781 peer_device = conn_peer_device(connection, pi->vnr); 1782 if (!peer_device) 1783 return -EIO; 1784 device = peer_device->device; 1785 1786 sector = be64_to_cpu(p->sector); 1787 1788 spin_lock_irq(&device->resource->req_lock); 1789 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__); 1790 spin_unlock_irq(&device->resource->req_lock); 1791 if (unlikely(!req)) 1792 return -EIO; 1793 1794 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid 1795 * special casing it there for the various failure cases. 1796 * still no race with drbd_fail_pending_reads */ 1797 err = recv_dless_read(peer_device, req, sector, pi->size); 1798 if (!err) 1799 req_mod(req, DATA_RECEIVED); 1800 /* else: nothing. handled from drbd_disconnect... 1801 * I don't think we may complete this just yet 1802 * in case we are "on-disconnect: freeze" */ 1803 1804 return err; 1805 } 1806 1807 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi) 1808 { 1809 struct drbd_peer_device *peer_device; 1810 struct drbd_device *device; 1811 sector_t sector; 1812 int err; 1813 struct p_data *p = pi->data; 1814 1815 peer_device = conn_peer_device(connection, pi->vnr); 1816 if (!peer_device) 1817 return -EIO; 1818 device = peer_device->device; 1819 1820 sector = be64_to_cpu(p->sector); 1821 D_ASSERT(device, p->block_id == ID_SYNCER); 1822 1823 if (get_ldev(device)) { 1824 /* data is submitted to disk within recv_resync_read. 1825 * corresponding put_ldev done below on error, 1826 * or in drbd_peer_request_endio. */ 1827 err = recv_resync_read(peer_device, sector, pi); 1828 } else { 1829 if (__ratelimit(&drbd_ratelimit_state)) 1830 drbd_err(device, "Can not write resync data to local disk.\n"); 1831 1832 err = drbd_drain_block(peer_device, pi->size); 1833 1834 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size); 1835 } 1836 1837 atomic_add(pi->size >> 9, &device->rs_sect_in); 1838 1839 return err; 1840 } 1841 1842 static void restart_conflicting_writes(struct drbd_device *device, 1843 sector_t sector, int size) 1844 { 1845 struct drbd_interval *i; 1846 struct drbd_request *req; 1847 1848 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 1849 if (!i->local) 1850 continue; 1851 req = container_of(i, struct drbd_request, i); 1852 if (req->rq_state & RQ_LOCAL_PENDING || 1853 !(req->rq_state & RQ_POSTPONED)) 1854 continue; 1855 /* as it is RQ_POSTPONED, this will cause it to 1856 * be queued on the retry workqueue. */ 1857 __req_mod(req, CONFLICT_RESOLVED, NULL); 1858 } 1859 } 1860 1861 /* 1862 * e_end_block() is called in asender context via drbd_finish_peer_reqs(). 1863 */ 1864 static int e_end_block(struct drbd_work *w, int cancel) 1865 { 1866 struct drbd_peer_request *peer_req = 1867 container_of(w, struct drbd_peer_request, w); 1868 struct drbd_peer_device *peer_device = peer_req->peer_device; 1869 struct drbd_device *device = peer_device->device; 1870 sector_t sector = peer_req->i.sector; 1871 int err = 0, pcmd; 1872 1873 if (peer_req->flags & EE_SEND_WRITE_ACK) { 1874 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1875 pcmd = (device->state.conn >= C_SYNC_SOURCE && 1876 device->state.conn <= C_PAUSED_SYNC_T && 1877 peer_req->flags & EE_MAY_SET_IN_SYNC) ? 1878 P_RS_WRITE_ACK : P_WRITE_ACK; 1879 err = drbd_send_ack(peer_device, pcmd, peer_req); 1880 if (pcmd == P_RS_WRITE_ACK) 1881 drbd_set_in_sync(device, sector, peer_req->i.size); 1882 } else { 1883 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req); 1884 /* we expect it to be marked out of sync anyways... 1885 * maybe assert this? */ 1886 } 1887 dec_unacked(device); 1888 } 1889 /* we delete from the conflict detection hash _after_ we sent out the 1890 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */ 1891 if (peer_req->flags & EE_IN_INTERVAL_TREE) { 1892 spin_lock_irq(&device->resource->req_lock); 1893 D_ASSERT(device, !drbd_interval_empty(&peer_req->i)); 1894 drbd_remove_epoch_entry_interval(device, peer_req); 1895 if (peer_req->flags & EE_RESTART_REQUESTS) 1896 restart_conflicting_writes(device, sector, peer_req->i.size); 1897 spin_unlock_irq(&device->resource->req_lock); 1898 } else 1899 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 1900 1901 drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0)); 1902 1903 return err; 1904 } 1905 1906 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack) 1907 { 1908 struct drbd_peer_request *peer_req = 1909 container_of(w, struct drbd_peer_request, w); 1910 struct drbd_peer_device *peer_device = peer_req->peer_device; 1911 int err; 1912 1913 err = drbd_send_ack(peer_device, ack, peer_req); 1914 dec_unacked(peer_device->device); 1915 1916 return err; 1917 } 1918 1919 static int e_send_superseded(struct drbd_work *w, int unused) 1920 { 1921 return e_send_ack(w, P_SUPERSEDED); 1922 } 1923 1924 static int e_send_retry_write(struct drbd_work *w, int unused) 1925 { 1926 struct drbd_peer_request *peer_req = 1927 container_of(w, struct drbd_peer_request, w); 1928 struct drbd_connection *connection = peer_req->peer_device->connection; 1929 1930 return e_send_ack(w, connection->agreed_pro_version >= 100 ? 1931 P_RETRY_WRITE : P_SUPERSEDED); 1932 } 1933 1934 static bool seq_greater(u32 a, u32 b) 1935 { 1936 /* 1937 * We assume 32-bit wrap-around here. 1938 * For 24-bit wrap-around, we would have to shift: 1939 * a <<= 8; b <<= 8; 1940 */ 1941 return (s32)a - (s32)b > 0; 1942 } 1943 1944 static u32 seq_max(u32 a, u32 b) 1945 { 1946 return seq_greater(a, b) ? a : b; 1947 } 1948 1949 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq) 1950 { 1951 struct drbd_device *device = peer_device->device; 1952 unsigned int newest_peer_seq; 1953 1954 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) { 1955 spin_lock(&device->peer_seq_lock); 1956 newest_peer_seq = seq_max(device->peer_seq, peer_seq); 1957 device->peer_seq = newest_peer_seq; 1958 spin_unlock(&device->peer_seq_lock); 1959 /* wake up only if we actually changed device->peer_seq */ 1960 if (peer_seq == newest_peer_seq) 1961 wake_up(&device->seq_wait); 1962 } 1963 } 1964 1965 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2) 1966 { 1967 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9))); 1968 } 1969 1970 /* maybe change sync_ee into interval trees as well? */ 1971 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req) 1972 { 1973 struct drbd_peer_request *rs_req; 1974 bool rv = 0; 1975 1976 spin_lock_irq(&device->resource->req_lock); 1977 list_for_each_entry(rs_req, &device->sync_ee, w.list) { 1978 if (overlaps(peer_req->i.sector, peer_req->i.size, 1979 rs_req->i.sector, rs_req->i.size)) { 1980 rv = 1; 1981 break; 1982 } 1983 } 1984 spin_unlock_irq(&device->resource->req_lock); 1985 1986 return rv; 1987 } 1988 1989 /* Called from receive_Data. 1990 * Synchronize packets on sock with packets on msock. 1991 * 1992 * This is here so even when a P_DATA packet traveling via sock overtook an Ack 1993 * packet traveling on msock, they are still processed in the order they have 1994 * been sent. 1995 * 1996 * Note: we don't care for Ack packets overtaking P_DATA packets. 1997 * 1998 * In case packet_seq is larger than device->peer_seq number, there are 1999 * outstanding packets on the msock. We wait for them to arrive. 2000 * In case we are the logically next packet, we update device->peer_seq 2001 * ourselves. Correctly handles 32bit wrap around. 2002 * 2003 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second, 2004 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds 2005 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have 2006 * 1<<9 == 512 seconds aka ages for the 32bit wrap around... 2007 * 2008 * returns 0 if we may process the packet, 2009 * -ERESTARTSYS if we were interrupted (by disconnect signal). */ 2010 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq) 2011 { 2012 struct drbd_device *device = peer_device->device; 2013 DEFINE_WAIT(wait); 2014 long timeout; 2015 int ret = 0, tp; 2016 2017 if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) 2018 return 0; 2019 2020 spin_lock(&device->peer_seq_lock); 2021 for (;;) { 2022 if (!seq_greater(peer_seq - 1, device->peer_seq)) { 2023 device->peer_seq = seq_max(device->peer_seq, peer_seq); 2024 break; 2025 } 2026 2027 if (signal_pending(current)) { 2028 ret = -ERESTARTSYS; 2029 break; 2030 } 2031 2032 rcu_read_lock(); 2033 tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries; 2034 rcu_read_unlock(); 2035 2036 if (!tp) 2037 break; 2038 2039 /* Only need to wait if two_primaries is enabled */ 2040 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE); 2041 spin_unlock(&device->peer_seq_lock); 2042 rcu_read_lock(); 2043 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10; 2044 rcu_read_unlock(); 2045 timeout = schedule_timeout(timeout); 2046 spin_lock(&device->peer_seq_lock); 2047 if (!timeout) { 2048 ret = -ETIMEDOUT; 2049 drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n"); 2050 break; 2051 } 2052 } 2053 spin_unlock(&device->peer_seq_lock); 2054 finish_wait(&device->seq_wait, &wait); 2055 return ret; 2056 } 2057 2058 /* see also bio_flags_to_wire() 2059 * DRBD_REQ_*, because we need to semantically map the flags to data packet 2060 * flags and back. We may replicate to other kernel versions. */ 2061 static unsigned long wire_flags_to_bio(u32 dpf) 2062 { 2063 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) | 2064 (dpf & DP_FUA ? REQ_FUA : 0) | 2065 (dpf & DP_FLUSH ? REQ_FLUSH : 0) | 2066 (dpf & DP_DISCARD ? REQ_DISCARD : 0); 2067 } 2068 2069 static void fail_postponed_requests(struct drbd_device *device, sector_t sector, 2070 unsigned int size) 2071 { 2072 struct drbd_interval *i; 2073 2074 repeat: 2075 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 2076 struct drbd_request *req; 2077 struct bio_and_error m; 2078 2079 if (!i->local) 2080 continue; 2081 req = container_of(i, struct drbd_request, i); 2082 if (!(req->rq_state & RQ_POSTPONED)) 2083 continue; 2084 req->rq_state &= ~RQ_POSTPONED; 2085 __req_mod(req, NEG_ACKED, &m); 2086 spin_unlock_irq(&device->resource->req_lock); 2087 if (m.bio) 2088 complete_master_bio(device, &m); 2089 spin_lock_irq(&device->resource->req_lock); 2090 goto repeat; 2091 } 2092 } 2093 2094 static int handle_write_conflicts(struct drbd_device *device, 2095 struct drbd_peer_request *peer_req) 2096 { 2097 struct drbd_connection *connection = peer_req->peer_device->connection; 2098 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags); 2099 sector_t sector = peer_req->i.sector; 2100 const unsigned int size = peer_req->i.size; 2101 struct drbd_interval *i; 2102 bool equal; 2103 int err; 2104 2105 /* 2106 * Inserting the peer request into the write_requests tree will prevent 2107 * new conflicting local requests from being added. 2108 */ 2109 drbd_insert_interval(&device->write_requests, &peer_req->i); 2110 2111 repeat: 2112 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 2113 if (i == &peer_req->i) 2114 continue; 2115 2116 if (!i->local) { 2117 /* 2118 * Our peer has sent a conflicting remote request; this 2119 * should not happen in a two-node setup. Wait for the 2120 * earlier peer request to complete. 2121 */ 2122 err = drbd_wait_misc(device, i); 2123 if (err) 2124 goto out; 2125 goto repeat; 2126 } 2127 2128 equal = i->sector == sector && i->size == size; 2129 if (resolve_conflicts) { 2130 /* 2131 * If the peer request is fully contained within the 2132 * overlapping request, it can be considered overwritten 2133 * and thus superseded; otherwise, it will be retried 2134 * once all overlapping requests have completed. 2135 */ 2136 bool superseded = i->sector <= sector && i->sector + 2137 (i->size >> 9) >= sector + (size >> 9); 2138 2139 if (!equal) 2140 drbd_alert(device, "Concurrent writes detected: " 2141 "local=%llus +%u, remote=%llus +%u, " 2142 "assuming %s came first\n", 2143 (unsigned long long)i->sector, i->size, 2144 (unsigned long long)sector, size, 2145 superseded ? "local" : "remote"); 2146 2147 inc_unacked(device); 2148 peer_req->w.cb = superseded ? e_send_superseded : 2149 e_send_retry_write; 2150 list_add_tail(&peer_req->w.list, &device->done_ee); 2151 wake_asender(connection); 2152 2153 err = -ENOENT; 2154 goto out; 2155 } else { 2156 struct drbd_request *req = 2157 container_of(i, struct drbd_request, i); 2158 2159 if (!equal) 2160 drbd_alert(device, "Concurrent writes detected: " 2161 "local=%llus +%u, remote=%llus +%u\n", 2162 (unsigned long long)i->sector, i->size, 2163 (unsigned long long)sector, size); 2164 2165 if (req->rq_state & RQ_LOCAL_PENDING || 2166 !(req->rq_state & RQ_POSTPONED)) { 2167 /* 2168 * Wait for the node with the discard flag to 2169 * decide if this request has been superseded 2170 * or needs to be retried. 2171 * Requests that have been superseded will 2172 * disappear from the write_requests tree. 2173 * 2174 * In addition, wait for the conflicting 2175 * request to finish locally before submitting 2176 * the conflicting peer request. 2177 */ 2178 err = drbd_wait_misc(device, &req->i); 2179 if (err) { 2180 _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD); 2181 fail_postponed_requests(device, sector, size); 2182 goto out; 2183 } 2184 goto repeat; 2185 } 2186 /* 2187 * Remember to restart the conflicting requests after 2188 * the new peer request has completed. 2189 */ 2190 peer_req->flags |= EE_RESTART_REQUESTS; 2191 } 2192 } 2193 err = 0; 2194 2195 out: 2196 if (err) 2197 drbd_remove_epoch_entry_interval(device, peer_req); 2198 return err; 2199 } 2200 2201 /* mirrored write */ 2202 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi) 2203 { 2204 struct drbd_peer_device *peer_device; 2205 struct drbd_device *device; 2206 sector_t sector; 2207 struct drbd_peer_request *peer_req; 2208 struct p_data *p = pi->data; 2209 u32 peer_seq = be32_to_cpu(p->seq_num); 2210 int rw = WRITE; 2211 u32 dp_flags; 2212 int err, tp; 2213 2214 peer_device = conn_peer_device(connection, pi->vnr); 2215 if (!peer_device) 2216 return -EIO; 2217 device = peer_device->device; 2218 2219 if (!get_ldev(device)) { 2220 int err2; 2221 2222 err = wait_for_and_update_peer_seq(peer_device, peer_seq); 2223 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size); 2224 atomic_inc(&connection->current_epoch->epoch_size); 2225 err2 = drbd_drain_block(peer_device, pi->size); 2226 if (!err) 2227 err = err2; 2228 return err; 2229 } 2230 2231 /* 2232 * Corresponding put_ldev done either below (on various errors), or in 2233 * drbd_peer_request_endio, if we successfully submit the data at the 2234 * end of this function. 2235 */ 2236 2237 sector = be64_to_cpu(p->sector); 2238 peer_req = read_in_block(peer_device, p->block_id, sector, pi); 2239 if (!peer_req) { 2240 put_ldev(device); 2241 return -EIO; 2242 } 2243 2244 peer_req->w.cb = e_end_block; 2245 2246 dp_flags = be32_to_cpu(p->dp_flags); 2247 rw |= wire_flags_to_bio(dp_flags); 2248 if (pi->cmd == P_TRIM) { 2249 struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev); 2250 peer_req->flags |= EE_IS_TRIM; 2251 if (!blk_queue_discard(q)) 2252 peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT; 2253 D_ASSERT(peer_device, peer_req->i.size > 0); 2254 D_ASSERT(peer_device, rw & REQ_DISCARD); 2255 D_ASSERT(peer_device, peer_req->pages == NULL); 2256 } else if (peer_req->pages == NULL) { 2257 D_ASSERT(device, peer_req->i.size == 0); 2258 D_ASSERT(device, dp_flags & DP_FLUSH); 2259 } 2260 2261 if (dp_flags & DP_MAY_SET_IN_SYNC) 2262 peer_req->flags |= EE_MAY_SET_IN_SYNC; 2263 2264 spin_lock(&connection->epoch_lock); 2265 peer_req->epoch = connection->current_epoch; 2266 atomic_inc(&peer_req->epoch->epoch_size); 2267 atomic_inc(&peer_req->epoch->active); 2268 spin_unlock(&connection->epoch_lock); 2269 2270 rcu_read_lock(); 2271 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries; 2272 rcu_read_unlock(); 2273 if (tp) { 2274 peer_req->flags |= EE_IN_INTERVAL_TREE; 2275 err = wait_for_and_update_peer_seq(peer_device, peer_seq); 2276 if (err) 2277 goto out_interrupted; 2278 spin_lock_irq(&device->resource->req_lock); 2279 err = handle_write_conflicts(device, peer_req); 2280 if (err) { 2281 spin_unlock_irq(&device->resource->req_lock); 2282 if (err == -ENOENT) { 2283 put_ldev(device); 2284 return 0; 2285 } 2286 goto out_interrupted; 2287 } 2288 } else { 2289 update_peer_seq(peer_device, peer_seq); 2290 spin_lock_irq(&device->resource->req_lock); 2291 } 2292 /* if we use the zeroout fallback code, we process synchronously 2293 * and we wait for all pending requests, respectively wait for 2294 * active_ee to become empty in drbd_submit_peer_request(); 2295 * better not add ourselves here. */ 2296 if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0) 2297 list_add(&peer_req->w.list, &device->active_ee); 2298 spin_unlock_irq(&device->resource->req_lock); 2299 2300 if (device->state.conn == C_SYNC_TARGET) 2301 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req)); 2302 2303 if (peer_device->connection->agreed_pro_version < 100) { 2304 rcu_read_lock(); 2305 switch (rcu_dereference(peer_device->connection->net_conf)->wire_protocol) { 2306 case DRBD_PROT_C: 2307 dp_flags |= DP_SEND_WRITE_ACK; 2308 break; 2309 case DRBD_PROT_B: 2310 dp_flags |= DP_SEND_RECEIVE_ACK; 2311 break; 2312 } 2313 rcu_read_unlock(); 2314 } 2315 2316 if (dp_flags & DP_SEND_WRITE_ACK) { 2317 peer_req->flags |= EE_SEND_WRITE_ACK; 2318 inc_unacked(device); 2319 /* corresponding dec_unacked() in e_end_block() 2320 * respective _drbd_clear_done_ee */ 2321 } 2322 2323 if (dp_flags & DP_SEND_RECEIVE_ACK) { 2324 /* I really don't like it that the receiver thread 2325 * sends on the msock, but anyways */ 2326 drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req); 2327 } 2328 2329 if (device->state.pdsk < D_INCONSISTENT) { 2330 /* In case we have the only disk of the cluster, */ 2331 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size); 2332 peer_req->flags |= EE_CALL_AL_COMPLETE_IO; 2333 peer_req->flags &= ~EE_MAY_SET_IN_SYNC; 2334 drbd_al_begin_io(device, &peer_req->i, true); 2335 } 2336 2337 err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR); 2338 if (!err) 2339 return 0; 2340 2341 /* don't care for the reason here */ 2342 drbd_err(device, "submit failed, triggering re-connect\n"); 2343 spin_lock_irq(&device->resource->req_lock); 2344 list_del(&peer_req->w.list); 2345 drbd_remove_epoch_entry_interval(device, peer_req); 2346 spin_unlock_irq(&device->resource->req_lock); 2347 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) 2348 drbd_al_complete_io(device, &peer_req->i); 2349 2350 out_interrupted: 2351 drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP); 2352 put_ldev(device); 2353 drbd_free_peer_req(device, peer_req); 2354 return err; 2355 } 2356 2357 /* We may throttle resync, if the lower device seems to be busy, 2358 * and current sync rate is above c_min_rate. 2359 * 2360 * To decide whether or not the lower device is busy, we use a scheme similar 2361 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant" 2362 * (more than 64 sectors) of activity we cannot account for with our own resync 2363 * activity, it obviously is "busy". 2364 * 2365 * The current sync rate used here uses only the most recent two step marks, 2366 * to have a short time average so we can react faster. 2367 */ 2368 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector) 2369 { 2370 struct lc_element *tmp; 2371 bool throttle = true; 2372 2373 if (!drbd_rs_c_min_rate_throttle(device)) 2374 return false; 2375 2376 spin_lock_irq(&device->al_lock); 2377 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector)); 2378 if (tmp) { 2379 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce); 2380 if (test_bit(BME_PRIORITY, &bm_ext->flags)) 2381 throttle = false; 2382 /* Do not slow down if app IO is already waiting for this extent */ 2383 } 2384 spin_unlock_irq(&device->al_lock); 2385 2386 return throttle; 2387 } 2388 2389 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device) 2390 { 2391 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk; 2392 unsigned long db, dt, dbdt; 2393 unsigned int c_min_rate; 2394 int curr_events; 2395 2396 rcu_read_lock(); 2397 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate; 2398 rcu_read_unlock(); 2399 2400 /* feature disabled? */ 2401 if (c_min_rate == 0) 2402 return false; 2403 2404 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) + 2405 (int)part_stat_read(&disk->part0, sectors[1]) - 2406 atomic_read(&device->rs_sect_ev); 2407 if (!device->rs_last_events || curr_events - device->rs_last_events > 64) { 2408 unsigned long rs_left; 2409 int i; 2410 2411 device->rs_last_events = curr_events; 2412 2413 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP, 2414 * approx. */ 2415 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS; 2416 2417 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T) 2418 rs_left = device->ov_left; 2419 else 2420 rs_left = drbd_bm_total_weight(device) - device->rs_failed; 2421 2422 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ; 2423 if (!dt) 2424 dt++; 2425 db = device->rs_mark_left[i] - rs_left; 2426 dbdt = Bit2KB(db/dt); 2427 2428 if (dbdt > c_min_rate) 2429 return true; 2430 } 2431 return false; 2432 } 2433 2434 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi) 2435 { 2436 struct drbd_peer_device *peer_device; 2437 struct drbd_device *device; 2438 sector_t sector; 2439 sector_t capacity; 2440 struct drbd_peer_request *peer_req; 2441 struct digest_info *di = NULL; 2442 int size, verb; 2443 unsigned int fault_type; 2444 struct p_block_req *p = pi->data; 2445 2446 peer_device = conn_peer_device(connection, pi->vnr); 2447 if (!peer_device) 2448 return -EIO; 2449 device = peer_device->device; 2450 capacity = drbd_get_capacity(device->this_bdev); 2451 2452 sector = be64_to_cpu(p->sector); 2453 size = be32_to_cpu(p->blksize); 2454 2455 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) { 2456 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__, 2457 (unsigned long long)sector, size); 2458 return -EINVAL; 2459 } 2460 if (sector + (size>>9) > capacity) { 2461 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__, 2462 (unsigned long long)sector, size); 2463 return -EINVAL; 2464 } 2465 2466 if (!get_ldev_if_state(device, D_UP_TO_DATE)) { 2467 verb = 1; 2468 switch (pi->cmd) { 2469 case P_DATA_REQUEST: 2470 drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p); 2471 break; 2472 case P_RS_DATA_REQUEST: 2473 case P_CSUM_RS_REQUEST: 2474 case P_OV_REQUEST: 2475 drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p); 2476 break; 2477 case P_OV_REPLY: 2478 verb = 0; 2479 dec_rs_pending(device); 2480 drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC); 2481 break; 2482 default: 2483 BUG(); 2484 } 2485 if (verb && __ratelimit(&drbd_ratelimit_state)) 2486 drbd_err(device, "Can not satisfy peer's read request, " 2487 "no local data.\n"); 2488 2489 /* drain possibly payload */ 2490 return drbd_drain_block(peer_device, pi->size); 2491 } 2492 2493 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD 2494 * "criss-cross" setup, that might cause write-out on some other DRBD, 2495 * which in turn might block on the other node at this very place. */ 2496 peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size, 2497 true /* has real payload */, GFP_NOIO); 2498 if (!peer_req) { 2499 put_ldev(device); 2500 return -ENOMEM; 2501 } 2502 2503 switch (pi->cmd) { 2504 case P_DATA_REQUEST: 2505 peer_req->w.cb = w_e_end_data_req; 2506 fault_type = DRBD_FAULT_DT_RD; 2507 /* application IO, don't drbd_rs_begin_io */ 2508 goto submit; 2509 2510 case P_RS_DATA_REQUEST: 2511 peer_req->w.cb = w_e_end_rsdata_req; 2512 fault_type = DRBD_FAULT_RS_RD; 2513 /* used in the sector offset progress display */ 2514 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 2515 break; 2516 2517 case P_OV_REPLY: 2518 case P_CSUM_RS_REQUEST: 2519 fault_type = DRBD_FAULT_RS_RD; 2520 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO); 2521 if (!di) 2522 goto out_free_e; 2523 2524 di->digest_size = pi->size; 2525 di->digest = (((char *)di)+sizeof(struct digest_info)); 2526 2527 peer_req->digest = di; 2528 peer_req->flags |= EE_HAS_DIGEST; 2529 2530 if (drbd_recv_all(peer_device->connection, di->digest, pi->size)) 2531 goto out_free_e; 2532 2533 if (pi->cmd == P_CSUM_RS_REQUEST) { 2534 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89); 2535 peer_req->w.cb = w_e_end_csum_rs_req; 2536 /* used in the sector offset progress display */ 2537 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 2538 } else if (pi->cmd == P_OV_REPLY) { 2539 /* track progress, we may need to throttle */ 2540 atomic_add(size >> 9, &device->rs_sect_in); 2541 peer_req->w.cb = w_e_end_ov_reply; 2542 dec_rs_pending(device); 2543 /* drbd_rs_begin_io done when we sent this request, 2544 * but accounting still needs to be done. */ 2545 goto submit_for_resync; 2546 } 2547 break; 2548 2549 case P_OV_REQUEST: 2550 if (device->ov_start_sector == ~(sector_t)0 && 2551 peer_device->connection->agreed_pro_version >= 90) { 2552 unsigned long now = jiffies; 2553 int i; 2554 device->ov_start_sector = sector; 2555 device->ov_position = sector; 2556 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector); 2557 device->rs_total = device->ov_left; 2558 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 2559 device->rs_mark_left[i] = device->ov_left; 2560 device->rs_mark_time[i] = now; 2561 } 2562 drbd_info(device, "Online Verify start sector: %llu\n", 2563 (unsigned long long)sector); 2564 } 2565 peer_req->w.cb = w_e_end_ov_req; 2566 fault_type = DRBD_FAULT_RS_RD; 2567 break; 2568 2569 default: 2570 BUG(); 2571 } 2572 2573 /* Throttle, drbd_rs_begin_io and submit should become asynchronous 2574 * wrt the receiver, but it is not as straightforward as it may seem. 2575 * Various places in the resync start and stop logic assume resync 2576 * requests are processed in order, requeuing this on the worker thread 2577 * introduces a bunch of new code for synchronization between threads. 2578 * 2579 * Unlimited throttling before drbd_rs_begin_io may stall the resync 2580 * "forever", throttling after drbd_rs_begin_io will lock that extent 2581 * for application writes for the same time. For now, just throttle 2582 * here, where the rest of the code expects the receiver to sleep for 2583 * a while, anyways. 2584 */ 2585 2586 /* Throttle before drbd_rs_begin_io, as that locks out application IO; 2587 * this defers syncer requests for some time, before letting at least 2588 * on request through. The resync controller on the receiving side 2589 * will adapt to the incoming rate accordingly. 2590 * 2591 * We cannot throttle here if remote is Primary/SyncTarget: 2592 * we would also throttle its application reads. 2593 * In that case, throttling is done on the SyncTarget only. 2594 */ 2595 if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector)) 2596 schedule_timeout_uninterruptible(HZ/10); 2597 if (drbd_rs_begin_io(device, sector)) 2598 goto out_free_e; 2599 2600 submit_for_resync: 2601 atomic_add(size >> 9, &device->rs_sect_ev); 2602 2603 submit: 2604 inc_unacked(device); 2605 spin_lock_irq(&device->resource->req_lock); 2606 list_add_tail(&peer_req->w.list, &device->read_ee); 2607 spin_unlock_irq(&device->resource->req_lock); 2608 2609 if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0) 2610 return 0; 2611 2612 /* don't care for the reason here */ 2613 drbd_err(device, "submit failed, triggering re-connect\n"); 2614 spin_lock_irq(&device->resource->req_lock); 2615 list_del(&peer_req->w.list); 2616 spin_unlock_irq(&device->resource->req_lock); 2617 /* no drbd_rs_complete_io(), we are dropping the connection anyways */ 2618 2619 out_free_e: 2620 put_ldev(device); 2621 drbd_free_peer_req(device, peer_req); 2622 return -EIO; 2623 } 2624 2625 /** 2626 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries 2627 */ 2628 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local) 2629 { 2630 struct drbd_device *device = peer_device->device; 2631 int self, peer, rv = -100; 2632 unsigned long ch_self, ch_peer; 2633 enum drbd_after_sb_p after_sb_0p; 2634 2635 self = device->ldev->md.uuid[UI_BITMAP] & 1; 2636 peer = device->p_uuid[UI_BITMAP] & 1; 2637 2638 ch_peer = device->p_uuid[UI_SIZE]; 2639 ch_self = device->comm_bm_set; 2640 2641 rcu_read_lock(); 2642 after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p; 2643 rcu_read_unlock(); 2644 switch (after_sb_0p) { 2645 case ASB_CONSENSUS: 2646 case ASB_DISCARD_SECONDARY: 2647 case ASB_CALL_HELPER: 2648 case ASB_VIOLENTLY: 2649 drbd_err(device, "Configuration error.\n"); 2650 break; 2651 case ASB_DISCONNECT: 2652 break; 2653 case ASB_DISCARD_YOUNGER_PRI: 2654 if (self == 0 && peer == 1) { 2655 rv = -1; 2656 break; 2657 } 2658 if (self == 1 && peer == 0) { 2659 rv = 1; 2660 break; 2661 } 2662 /* Else fall through to one of the other strategies... */ 2663 case ASB_DISCARD_OLDER_PRI: 2664 if (self == 0 && peer == 1) { 2665 rv = 1; 2666 break; 2667 } 2668 if (self == 1 && peer == 0) { 2669 rv = -1; 2670 break; 2671 } 2672 /* Else fall through to one of the other strategies... */ 2673 drbd_warn(device, "Discard younger/older primary did not find a decision\n" 2674 "Using discard-least-changes instead\n"); 2675 case ASB_DISCARD_ZERO_CHG: 2676 if (ch_peer == 0 && ch_self == 0) { 2677 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) 2678 ? -1 : 1; 2679 break; 2680 } else { 2681 if (ch_peer == 0) { rv = 1; break; } 2682 if (ch_self == 0) { rv = -1; break; } 2683 } 2684 if (after_sb_0p == ASB_DISCARD_ZERO_CHG) 2685 break; 2686 case ASB_DISCARD_LEAST_CHG: 2687 if (ch_self < ch_peer) 2688 rv = -1; 2689 else if (ch_self > ch_peer) 2690 rv = 1; 2691 else /* ( ch_self == ch_peer ) */ 2692 /* Well, then use something else. */ 2693 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) 2694 ? -1 : 1; 2695 break; 2696 case ASB_DISCARD_LOCAL: 2697 rv = -1; 2698 break; 2699 case ASB_DISCARD_REMOTE: 2700 rv = 1; 2701 } 2702 2703 return rv; 2704 } 2705 2706 /** 2707 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary 2708 */ 2709 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local) 2710 { 2711 struct drbd_device *device = peer_device->device; 2712 int hg, rv = -100; 2713 enum drbd_after_sb_p after_sb_1p; 2714 2715 rcu_read_lock(); 2716 after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p; 2717 rcu_read_unlock(); 2718 switch (after_sb_1p) { 2719 case ASB_DISCARD_YOUNGER_PRI: 2720 case ASB_DISCARD_OLDER_PRI: 2721 case ASB_DISCARD_LEAST_CHG: 2722 case ASB_DISCARD_LOCAL: 2723 case ASB_DISCARD_REMOTE: 2724 case ASB_DISCARD_ZERO_CHG: 2725 drbd_err(device, "Configuration error.\n"); 2726 break; 2727 case ASB_DISCONNECT: 2728 break; 2729 case ASB_CONSENSUS: 2730 hg = drbd_asb_recover_0p(peer_device); 2731 if (hg == -1 && device->state.role == R_SECONDARY) 2732 rv = hg; 2733 if (hg == 1 && device->state.role == R_PRIMARY) 2734 rv = hg; 2735 break; 2736 case ASB_VIOLENTLY: 2737 rv = drbd_asb_recover_0p(peer_device); 2738 break; 2739 case ASB_DISCARD_SECONDARY: 2740 return device->state.role == R_PRIMARY ? 1 : -1; 2741 case ASB_CALL_HELPER: 2742 hg = drbd_asb_recover_0p(peer_device); 2743 if (hg == -1 && device->state.role == R_PRIMARY) { 2744 enum drbd_state_rv rv2; 2745 2746 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE, 2747 * we might be here in C_WF_REPORT_PARAMS which is transient. 2748 * we do not need to wait for the after state change work either. */ 2749 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY)); 2750 if (rv2 != SS_SUCCESS) { 2751 drbd_khelper(device, "pri-lost-after-sb"); 2752 } else { 2753 drbd_warn(device, "Successfully gave up primary role.\n"); 2754 rv = hg; 2755 } 2756 } else 2757 rv = hg; 2758 } 2759 2760 return rv; 2761 } 2762 2763 /** 2764 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries 2765 */ 2766 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local) 2767 { 2768 struct drbd_device *device = peer_device->device; 2769 int hg, rv = -100; 2770 enum drbd_after_sb_p after_sb_2p; 2771 2772 rcu_read_lock(); 2773 after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p; 2774 rcu_read_unlock(); 2775 switch (after_sb_2p) { 2776 case ASB_DISCARD_YOUNGER_PRI: 2777 case ASB_DISCARD_OLDER_PRI: 2778 case ASB_DISCARD_LEAST_CHG: 2779 case ASB_DISCARD_LOCAL: 2780 case ASB_DISCARD_REMOTE: 2781 case ASB_CONSENSUS: 2782 case ASB_DISCARD_SECONDARY: 2783 case ASB_DISCARD_ZERO_CHG: 2784 drbd_err(device, "Configuration error.\n"); 2785 break; 2786 case ASB_VIOLENTLY: 2787 rv = drbd_asb_recover_0p(peer_device); 2788 break; 2789 case ASB_DISCONNECT: 2790 break; 2791 case ASB_CALL_HELPER: 2792 hg = drbd_asb_recover_0p(peer_device); 2793 if (hg == -1) { 2794 enum drbd_state_rv rv2; 2795 2796 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE, 2797 * we might be here in C_WF_REPORT_PARAMS which is transient. 2798 * we do not need to wait for the after state change work either. */ 2799 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY)); 2800 if (rv2 != SS_SUCCESS) { 2801 drbd_khelper(device, "pri-lost-after-sb"); 2802 } else { 2803 drbd_warn(device, "Successfully gave up primary role.\n"); 2804 rv = hg; 2805 } 2806 } else 2807 rv = hg; 2808 } 2809 2810 return rv; 2811 } 2812 2813 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid, 2814 u64 bits, u64 flags) 2815 { 2816 if (!uuid) { 2817 drbd_info(device, "%s uuid info vanished while I was looking!\n", text); 2818 return; 2819 } 2820 drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n", 2821 text, 2822 (unsigned long long)uuid[UI_CURRENT], 2823 (unsigned long long)uuid[UI_BITMAP], 2824 (unsigned long long)uuid[UI_HISTORY_START], 2825 (unsigned long long)uuid[UI_HISTORY_END], 2826 (unsigned long long)bits, 2827 (unsigned long long)flags); 2828 } 2829 2830 /* 2831 100 after split brain try auto recover 2832 2 C_SYNC_SOURCE set BitMap 2833 1 C_SYNC_SOURCE use BitMap 2834 0 no Sync 2835 -1 C_SYNC_TARGET use BitMap 2836 -2 C_SYNC_TARGET set BitMap 2837 -100 after split brain, disconnect 2838 -1000 unrelated data 2839 -1091 requires proto 91 2840 -1096 requires proto 96 2841 */ 2842 static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local) 2843 { 2844 u64 self, peer; 2845 int i, j; 2846 2847 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1); 2848 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 2849 2850 *rule_nr = 10; 2851 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED) 2852 return 0; 2853 2854 *rule_nr = 20; 2855 if ((self == UUID_JUST_CREATED || self == (u64)0) && 2856 peer != UUID_JUST_CREATED) 2857 return -2; 2858 2859 *rule_nr = 30; 2860 if (self != UUID_JUST_CREATED && 2861 (peer == UUID_JUST_CREATED || peer == (u64)0)) 2862 return 2; 2863 2864 if (self == peer) { 2865 int rct, dc; /* roles at crash time */ 2866 2867 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) { 2868 2869 if (first_peer_device(device)->connection->agreed_pro_version < 91) 2870 return -1091; 2871 2872 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) && 2873 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) { 2874 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n"); 2875 drbd_uuid_move_history(device); 2876 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP]; 2877 device->ldev->md.uuid[UI_BITMAP] = 0; 2878 2879 drbd_uuid_dump(device, "self", device->ldev->md.uuid, 2880 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0); 2881 *rule_nr = 34; 2882 } else { 2883 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n"); 2884 *rule_nr = 36; 2885 } 2886 2887 return 1; 2888 } 2889 2890 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) { 2891 2892 if (first_peer_device(device)->connection->agreed_pro_version < 91) 2893 return -1091; 2894 2895 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) && 2896 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) { 2897 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n"); 2898 2899 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START]; 2900 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP]; 2901 device->p_uuid[UI_BITMAP] = 0UL; 2902 2903 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 2904 *rule_nr = 35; 2905 } else { 2906 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n"); 2907 *rule_nr = 37; 2908 } 2909 2910 return -1; 2911 } 2912 2913 /* Common power [off|failure] */ 2914 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) + 2915 (device->p_uuid[UI_FLAGS] & 2); 2916 /* lowest bit is set when we were primary, 2917 * next bit (weight 2) is set when peer was primary */ 2918 *rule_nr = 40; 2919 2920 switch (rct) { 2921 case 0: /* !self_pri && !peer_pri */ return 0; 2922 case 1: /* self_pri && !peer_pri */ return 1; 2923 case 2: /* !self_pri && peer_pri */ return -1; 2924 case 3: /* self_pri && peer_pri */ 2925 dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags); 2926 return dc ? -1 : 1; 2927 } 2928 } 2929 2930 *rule_nr = 50; 2931 peer = device->p_uuid[UI_BITMAP] & ~((u64)1); 2932 if (self == peer) 2933 return -1; 2934 2935 *rule_nr = 51; 2936 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1); 2937 if (self == peer) { 2938 if (first_peer_device(device)->connection->agreed_pro_version < 96 ? 2939 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == 2940 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) : 2941 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) { 2942 /* The last P_SYNC_UUID did not get though. Undo the last start of 2943 resync as sync source modifications of the peer's UUIDs. */ 2944 2945 if (first_peer_device(device)->connection->agreed_pro_version < 91) 2946 return -1091; 2947 2948 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START]; 2949 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1]; 2950 2951 drbd_info(device, "Lost last syncUUID packet, corrected:\n"); 2952 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 2953 2954 return -1; 2955 } 2956 } 2957 2958 *rule_nr = 60; 2959 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1); 2960 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 2961 peer = device->p_uuid[i] & ~((u64)1); 2962 if (self == peer) 2963 return -2; 2964 } 2965 2966 *rule_nr = 70; 2967 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1); 2968 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 2969 if (self == peer) 2970 return 1; 2971 2972 *rule_nr = 71; 2973 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1); 2974 if (self == peer) { 2975 if (first_peer_device(device)->connection->agreed_pro_version < 96 ? 2976 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == 2977 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) : 2978 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) { 2979 /* The last P_SYNC_UUID did not get though. Undo the last start of 2980 resync as sync source modifications of our UUIDs. */ 2981 2982 if (first_peer_device(device)->connection->agreed_pro_version < 91) 2983 return -1091; 2984 2985 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]); 2986 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]); 2987 2988 drbd_info(device, "Last syncUUID did not get through, corrected:\n"); 2989 drbd_uuid_dump(device, "self", device->ldev->md.uuid, 2990 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0); 2991 2992 return 1; 2993 } 2994 } 2995 2996 2997 *rule_nr = 80; 2998 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 2999 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 3000 self = device->ldev->md.uuid[i] & ~((u64)1); 3001 if (self == peer) 3002 return 2; 3003 } 3004 3005 *rule_nr = 90; 3006 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1); 3007 peer = device->p_uuid[UI_BITMAP] & ~((u64)1); 3008 if (self == peer && self != ((u64)0)) 3009 return 100; 3010 3011 *rule_nr = 100; 3012 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 3013 self = device->ldev->md.uuid[i] & ~((u64)1); 3014 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) { 3015 peer = device->p_uuid[j] & ~((u64)1); 3016 if (self == peer) 3017 return -100; 3018 } 3019 } 3020 3021 return -1000; 3022 } 3023 3024 /* drbd_sync_handshake() returns the new conn state on success, or 3025 CONN_MASK (-1) on failure. 3026 */ 3027 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device, 3028 enum drbd_role peer_role, 3029 enum drbd_disk_state peer_disk) __must_hold(local) 3030 { 3031 struct drbd_device *device = peer_device->device; 3032 enum drbd_conns rv = C_MASK; 3033 enum drbd_disk_state mydisk; 3034 struct net_conf *nc; 3035 int hg, rule_nr, rr_conflict, tentative; 3036 3037 mydisk = device->state.disk; 3038 if (mydisk == D_NEGOTIATING) 3039 mydisk = device->new_state_tmp.disk; 3040 3041 drbd_info(device, "drbd_sync_handshake:\n"); 3042 3043 spin_lock_irq(&device->ldev->md.uuid_lock); 3044 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0); 3045 drbd_uuid_dump(device, "peer", device->p_uuid, 3046 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 3047 3048 hg = drbd_uuid_compare(device, &rule_nr); 3049 spin_unlock_irq(&device->ldev->md.uuid_lock); 3050 3051 drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr); 3052 3053 if (hg == -1000) { 3054 drbd_alert(device, "Unrelated data, aborting!\n"); 3055 return C_MASK; 3056 } 3057 if (hg < -1000) { 3058 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000); 3059 return C_MASK; 3060 } 3061 3062 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) || 3063 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) { 3064 int f = (hg == -100) || abs(hg) == 2; 3065 hg = mydisk > D_INCONSISTENT ? 1 : -1; 3066 if (f) 3067 hg = hg*2; 3068 drbd_info(device, "Becoming sync %s due to disk states.\n", 3069 hg > 0 ? "source" : "target"); 3070 } 3071 3072 if (abs(hg) == 100) 3073 drbd_khelper(device, "initial-split-brain"); 3074 3075 rcu_read_lock(); 3076 nc = rcu_dereference(peer_device->connection->net_conf); 3077 3078 if (hg == 100 || (hg == -100 && nc->always_asbp)) { 3079 int pcount = (device->state.role == R_PRIMARY) 3080 + (peer_role == R_PRIMARY); 3081 int forced = (hg == -100); 3082 3083 switch (pcount) { 3084 case 0: 3085 hg = drbd_asb_recover_0p(peer_device); 3086 break; 3087 case 1: 3088 hg = drbd_asb_recover_1p(peer_device); 3089 break; 3090 case 2: 3091 hg = drbd_asb_recover_2p(peer_device); 3092 break; 3093 } 3094 if (abs(hg) < 100) { 3095 drbd_warn(device, "Split-Brain detected, %d primaries, " 3096 "automatically solved. Sync from %s node\n", 3097 pcount, (hg < 0) ? "peer" : "this"); 3098 if (forced) { 3099 drbd_warn(device, "Doing a full sync, since" 3100 " UUIDs where ambiguous.\n"); 3101 hg = hg*2; 3102 } 3103 } 3104 } 3105 3106 if (hg == -100) { 3107 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1)) 3108 hg = -1; 3109 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1)) 3110 hg = 1; 3111 3112 if (abs(hg) < 100) 3113 drbd_warn(device, "Split-Brain detected, manually solved. " 3114 "Sync from %s node\n", 3115 (hg < 0) ? "peer" : "this"); 3116 } 3117 rr_conflict = nc->rr_conflict; 3118 tentative = nc->tentative; 3119 rcu_read_unlock(); 3120 3121 if (hg == -100) { 3122 /* FIXME this log message is not correct if we end up here 3123 * after an attempted attach on a diskless node. 3124 * We just refuse to attach -- well, we drop the "connection" 3125 * to that disk, in a way... */ 3126 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n"); 3127 drbd_khelper(device, "split-brain"); 3128 return C_MASK; 3129 } 3130 3131 if (hg > 0 && mydisk <= D_INCONSISTENT) { 3132 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n"); 3133 return C_MASK; 3134 } 3135 3136 if (hg < 0 && /* by intention we do not use mydisk here. */ 3137 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) { 3138 switch (rr_conflict) { 3139 case ASB_CALL_HELPER: 3140 drbd_khelper(device, "pri-lost"); 3141 /* fall through */ 3142 case ASB_DISCONNECT: 3143 drbd_err(device, "I shall become SyncTarget, but I am primary!\n"); 3144 return C_MASK; 3145 case ASB_VIOLENTLY: 3146 drbd_warn(device, "Becoming SyncTarget, violating the stable-data" 3147 "assumption\n"); 3148 } 3149 } 3150 3151 if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) { 3152 if (hg == 0) 3153 drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n"); 3154 else 3155 drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.", 3156 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET), 3157 abs(hg) >= 2 ? "full" : "bit-map based"); 3158 return C_MASK; 3159 } 3160 3161 if (abs(hg) >= 2) { 3162 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n"); 3163 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake", 3164 BM_LOCKED_SET_ALLOWED)) 3165 return C_MASK; 3166 } 3167 3168 if (hg > 0) { /* become sync source. */ 3169 rv = C_WF_BITMAP_S; 3170 } else if (hg < 0) { /* become sync target */ 3171 rv = C_WF_BITMAP_T; 3172 } else { 3173 rv = C_CONNECTED; 3174 if (drbd_bm_total_weight(device)) { 3175 drbd_info(device, "No resync, but %lu bits in bitmap!\n", 3176 drbd_bm_total_weight(device)); 3177 } 3178 } 3179 3180 return rv; 3181 } 3182 3183 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer) 3184 { 3185 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */ 3186 if (peer == ASB_DISCARD_REMOTE) 3187 return ASB_DISCARD_LOCAL; 3188 3189 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */ 3190 if (peer == ASB_DISCARD_LOCAL) 3191 return ASB_DISCARD_REMOTE; 3192 3193 /* everything else is valid if they are equal on both sides. */ 3194 return peer; 3195 } 3196 3197 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi) 3198 { 3199 struct p_protocol *p = pi->data; 3200 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p; 3201 int p_proto, p_discard_my_data, p_two_primaries, cf; 3202 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL; 3203 char integrity_alg[SHARED_SECRET_MAX] = ""; 3204 struct crypto_hash *peer_integrity_tfm = NULL; 3205 void *int_dig_in = NULL, *int_dig_vv = NULL; 3206 3207 p_proto = be32_to_cpu(p->protocol); 3208 p_after_sb_0p = be32_to_cpu(p->after_sb_0p); 3209 p_after_sb_1p = be32_to_cpu(p->after_sb_1p); 3210 p_after_sb_2p = be32_to_cpu(p->after_sb_2p); 3211 p_two_primaries = be32_to_cpu(p->two_primaries); 3212 cf = be32_to_cpu(p->conn_flags); 3213 p_discard_my_data = cf & CF_DISCARD_MY_DATA; 3214 3215 if (connection->agreed_pro_version >= 87) { 3216 int err; 3217 3218 if (pi->size > sizeof(integrity_alg)) 3219 return -EIO; 3220 err = drbd_recv_all(connection, integrity_alg, pi->size); 3221 if (err) 3222 return err; 3223 integrity_alg[SHARED_SECRET_MAX - 1] = 0; 3224 } 3225 3226 if (pi->cmd != P_PROTOCOL_UPDATE) { 3227 clear_bit(CONN_DRY_RUN, &connection->flags); 3228 3229 if (cf & CF_DRY_RUN) 3230 set_bit(CONN_DRY_RUN, &connection->flags); 3231 3232 rcu_read_lock(); 3233 nc = rcu_dereference(connection->net_conf); 3234 3235 if (p_proto != nc->wire_protocol) { 3236 drbd_err(connection, "incompatible %s settings\n", "protocol"); 3237 goto disconnect_rcu_unlock; 3238 } 3239 3240 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) { 3241 drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri"); 3242 goto disconnect_rcu_unlock; 3243 } 3244 3245 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) { 3246 drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri"); 3247 goto disconnect_rcu_unlock; 3248 } 3249 3250 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) { 3251 drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri"); 3252 goto disconnect_rcu_unlock; 3253 } 3254 3255 if (p_discard_my_data && nc->discard_my_data) { 3256 drbd_err(connection, "incompatible %s settings\n", "discard-my-data"); 3257 goto disconnect_rcu_unlock; 3258 } 3259 3260 if (p_two_primaries != nc->two_primaries) { 3261 drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries"); 3262 goto disconnect_rcu_unlock; 3263 } 3264 3265 if (strcmp(integrity_alg, nc->integrity_alg)) { 3266 drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg"); 3267 goto disconnect_rcu_unlock; 3268 } 3269 3270 rcu_read_unlock(); 3271 } 3272 3273 if (integrity_alg[0]) { 3274 int hash_size; 3275 3276 /* 3277 * We can only change the peer data integrity algorithm 3278 * here. Changing our own data integrity algorithm 3279 * requires that we send a P_PROTOCOL_UPDATE packet at 3280 * the same time; otherwise, the peer has no way to 3281 * tell between which packets the algorithm should 3282 * change. 3283 */ 3284 3285 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC); 3286 if (!peer_integrity_tfm) { 3287 drbd_err(connection, "peer data-integrity-alg %s not supported\n", 3288 integrity_alg); 3289 goto disconnect; 3290 } 3291 3292 hash_size = crypto_hash_digestsize(peer_integrity_tfm); 3293 int_dig_in = kmalloc(hash_size, GFP_KERNEL); 3294 int_dig_vv = kmalloc(hash_size, GFP_KERNEL); 3295 if (!(int_dig_in && int_dig_vv)) { 3296 drbd_err(connection, "Allocation of buffers for data integrity checking failed\n"); 3297 goto disconnect; 3298 } 3299 } 3300 3301 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL); 3302 if (!new_net_conf) { 3303 drbd_err(connection, "Allocation of new net_conf failed\n"); 3304 goto disconnect; 3305 } 3306 3307 mutex_lock(&connection->data.mutex); 3308 mutex_lock(&connection->resource->conf_update); 3309 old_net_conf = connection->net_conf; 3310 *new_net_conf = *old_net_conf; 3311 3312 new_net_conf->wire_protocol = p_proto; 3313 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p); 3314 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p); 3315 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p); 3316 new_net_conf->two_primaries = p_two_primaries; 3317 3318 rcu_assign_pointer(connection->net_conf, new_net_conf); 3319 mutex_unlock(&connection->resource->conf_update); 3320 mutex_unlock(&connection->data.mutex); 3321 3322 crypto_free_hash(connection->peer_integrity_tfm); 3323 kfree(connection->int_dig_in); 3324 kfree(connection->int_dig_vv); 3325 connection->peer_integrity_tfm = peer_integrity_tfm; 3326 connection->int_dig_in = int_dig_in; 3327 connection->int_dig_vv = int_dig_vv; 3328 3329 if (strcmp(old_net_conf->integrity_alg, integrity_alg)) 3330 drbd_info(connection, "peer data-integrity-alg: %s\n", 3331 integrity_alg[0] ? integrity_alg : "(none)"); 3332 3333 synchronize_rcu(); 3334 kfree(old_net_conf); 3335 return 0; 3336 3337 disconnect_rcu_unlock: 3338 rcu_read_unlock(); 3339 disconnect: 3340 crypto_free_hash(peer_integrity_tfm); 3341 kfree(int_dig_in); 3342 kfree(int_dig_vv); 3343 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 3344 return -EIO; 3345 } 3346 3347 /* helper function 3348 * input: alg name, feature name 3349 * return: NULL (alg name was "") 3350 * ERR_PTR(error) if something goes wrong 3351 * or the crypto hash ptr, if it worked out ok. */ 3352 static 3353 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device, 3354 const char *alg, const char *name) 3355 { 3356 struct crypto_hash *tfm; 3357 3358 if (!alg[0]) 3359 return NULL; 3360 3361 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC); 3362 if (IS_ERR(tfm)) { 3363 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n", 3364 alg, name, PTR_ERR(tfm)); 3365 return tfm; 3366 } 3367 return tfm; 3368 } 3369 3370 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi) 3371 { 3372 void *buffer = connection->data.rbuf; 3373 int size = pi->size; 3374 3375 while (size) { 3376 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE); 3377 s = drbd_recv(connection, buffer, s); 3378 if (s <= 0) { 3379 if (s < 0) 3380 return s; 3381 break; 3382 } 3383 size -= s; 3384 } 3385 if (size) 3386 return -EIO; 3387 return 0; 3388 } 3389 3390 /* 3391 * config_unknown_volume - device configuration command for unknown volume 3392 * 3393 * When a device is added to an existing connection, the node on which the 3394 * device is added first will send configuration commands to its peer but the 3395 * peer will not know about the device yet. It will warn and ignore these 3396 * commands. Once the device is added on the second node, the second node will 3397 * send the same device configuration commands, but in the other direction. 3398 * 3399 * (We can also end up here if drbd is misconfigured.) 3400 */ 3401 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi) 3402 { 3403 drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n", 3404 cmdname(pi->cmd), pi->vnr); 3405 return ignore_remaining_packet(connection, pi); 3406 } 3407 3408 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi) 3409 { 3410 struct drbd_peer_device *peer_device; 3411 struct drbd_device *device; 3412 struct p_rs_param_95 *p; 3413 unsigned int header_size, data_size, exp_max_sz; 3414 struct crypto_hash *verify_tfm = NULL; 3415 struct crypto_hash *csums_tfm = NULL; 3416 struct net_conf *old_net_conf, *new_net_conf = NULL; 3417 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL; 3418 const int apv = connection->agreed_pro_version; 3419 struct fifo_buffer *old_plan = NULL, *new_plan = NULL; 3420 int fifo_size = 0; 3421 int err; 3422 3423 peer_device = conn_peer_device(connection, pi->vnr); 3424 if (!peer_device) 3425 return config_unknown_volume(connection, pi); 3426 device = peer_device->device; 3427 3428 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param) 3429 : apv == 88 ? sizeof(struct p_rs_param) 3430 + SHARED_SECRET_MAX 3431 : apv <= 94 ? sizeof(struct p_rs_param_89) 3432 : /* apv >= 95 */ sizeof(struct p_rs_param_95); 3433 3434 if (pi->size > exp_max_sz) { 3435 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n", 3436 pi->size, exp_max_sz); 3437 return -EIO; 3438 } 3439 3440 if (apv <= 88) { 3441 header_size = sizeof(struct p_rs_param); 3442 data_size = pi->size - header_size; 3443 } else if (apv <= 94) { 3444 header_size = sizeof(struct p_rs_param_89); 3445 data_size = pi->size - header_size; 3446 D_ASSERT(device, data_size == 0); 3447 } else { 3448 header_size = sizeof(struct p_rs_param_95); 3449 data_size = pi->size - header_size; 3450 D_ASSERT(device, data_size == 0); 3451 } 3452 3453 /* initialize verify_alg and csums_alg */ 3454 p = pi->data; 3455 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX); 3456 3457 err = drbd_recv_all(peer_device->connection, p, header_size); 3458 if (err) 3459 return err; 3460 3461 mutex_lock(&connection->resource->conf_update); 3462 old_net_conf = peer_device->connection->net_conf; 3463 if (get_ldev(device)) { 3464 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL); 3465 if (!new_disk_conf) { 3466 put_ldev(device); 3467 mutex_unlock(&connection->resource->conf_update); 3468 drbd_err(device, "Allocation of new disk_conf failed\n"); 3469 return -ENOMEM; 3470 } 3471 3472 old_disk_conf = device->ldev->disk_conf; 3473 *new_disk_conf = *old_disk_conf; 3474 3475 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate); 3476 } 3477 3478 if (apv >= 88) { 3479 if (apv == 88) { 3480 if (data_size > SHARED_SECRET_MAX || data_size == 0) { 3481 drbd_err(device, "verify-alg of wrong size, " 3482 "peer wants %u, accepting only up to %u byte\n", 3483 data_size, SHARED_SECRET_MAX); 3484 err = -EIO; 3485 goto reconnect; 3486 } 3487 3488 err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size); 3489 if (err) 3490 goto reconnect; 3491 /* we expect NUL terminated string */ 3492 /* but just in case someone tries to be evil */ 3493 D_ASSERT(device, p->verify_alg[data_size-1] == 0); 3494 p->verify_alg[data_size-1] = 0; 3495 3496 } else /* apv >= 89 */ { 3497 /* we still expect NUL terminated strings */ 3498 /* but just in case someone tries to be evil */ 3499 D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0); 3500 D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0); 3501 p->verify_alg[SHARED_SECRET_MAX-1] = 0; 3502 p->csums_alg[SHARED_SECRET_MAX-1] = 0; 3503 } 3504 3505 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) { 3506 if (device->state.conn == C_WF_REPORT_PARAMS) { 3507 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n", 3508 old_net_conf->verify_alg, p->verify_alg); 3509 goto disconnect; 3510 } 3511 verify_tfm = drbd_crypto_alloc_digest_safe(device, 3512 p->verify_alg, "verify-alg"); 3513 if (IS_ERR(verify_tfm)) { 3514 verify_tfm = NULL; 3515 goto disconnect; 3516 } 3517 } 3518 3519 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) { 3520 if (device->state.conn == C_WF_REPORT_PARAMS) { 3521 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n", 3522 old_net_conf->csums_alg, p->csums_alg); 3523 goto disconnect; 3524 } 3525 csums_tfm = drbd_crypto_alloc_digest_safe(device, 3526 p->csums_alg, "csums-alg"); 3527 if (IS_ERR(csums_tfm)) { 3528 csums_tfm = NULL; 3529 goto disconnect; 3530 } 3531 } 3532 3533 if (apv > 94 && new_disk_conf) { 3534 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead); 3535 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target); 3536 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target); 3537 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate); 3538 3539 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ; 3540 if (fifo_size != device->rs_plan_s->size) { 3541 new_plan = fifo_alloc(fifo_size); 3542 if (!new_plan) { 3543 drbd_err(device, "kmalloc of fifo_buffer failed"); 3544 put_ldev(device); 3545 goto disconnect; 3546 } 3547 } 3548 } 3549 3550 if (verify_tfm || csums_tfm) { 3551 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL); 3552 if (!new_net_conf) { 3553 drbd_err(device, "Allocation of new net_conf failed\n"); 3554 goto disconnect; 3555 } 3556 3557 *new_net_conf = *old_net_conf; 3558 3559 if (verify_tfm) { 3560 strcpy(new_net_conf->verify_alg, p->verify_alg); 3561 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1; 3562 crypto_free_hash(peer_device->connection->verify_tfm); 3563 peer_device->connection->verify_tfm = verify_tfm; 3564 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg); 3565 } 3566 if (csums_tfm) { 3567 strcpy(new_net_conf->csums_alg, p->csums_alg); 3568 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1; 3569 crypto_free_hash(peer_device->connection->csums_tfm); 3570 peer_device->connection->csums_tfm = csums_tfm; 3571 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg); 3572 } 3573 rcu_assign_pointer(connection->net_conf, new_net_conf); 3574 } 3575 } 3576 3577 if (new_disk_conf) { 3578 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf); 3579 put_ldev(device); 3580 } 3581 3582 if (new_plan) { 3583 old_plan = device->rs_plan_s; 3584 rcu_assign_pointer(device->rs_plan_s, new_plan); 3585 } 3586 3587 mutex_unlock(&connection->resource->conf_update); 3588 synchronize_rcu(); 3589 if (new_net_conf) 3590 kfree(old_net_conf); 3591 kfree(old_disk_conf); 3592 kfree(old_plan); 3593 3594 return 0; 3595 3596 reconnect: 3597 if (new_disk_conf) { 3598 put_ldev(device); 3599 kfree(new_disk_conf); 3600 } 3601 mutex_unlock(&connection->resource->conf_update); 3602 return -EIO; 3603 3604 disconnect: 3605 kfree(new_plan); 3606 if (new_disk_conf) { 3607 put_ldev(device); 3608 kfree(new_disk_conf); 3609 } 3610 mutex_unlock(&connection->resource->conf_update); 3611 /* just for completeness: actually not needed, 3612 * as this is not reached if csums_tfm was ok. */ 3613 crypto_free_hash(csums_tfm); 3614 /* but free the verify_tfm again, if csums_tfm did not work out */ 3615 crypto_free_hash(verify_tfm); 3616 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 3617 return -EIO; 3618 } 3619 3620 /* warn if the arguments differ by more than 12.5% */ 3621 static void warn_if_differ_considerably(struct drbd_device *device, 3622 const char *s, sector_t a, sector_t b) 3623 { 3624 sector_t d; 3625 if (a == 0 || b == 0) 3626 return; 3627 d = (a > b) ? (a - b) : (b - a); 3628 if (d > (a>>3) || d > (b>>3)) 3629 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s, 3630 (unsigned long long)a, (unsigned long long)b); 3631 } 3632 3633 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi) 3634 { 3635 struct drbd_peer_device *peer_device; 3636 struct drbd_device *device; 3637 struct p_sizes *p = pi->data; 3638 enum determine_dev_size dd = DS_UNCHANGED; 3639 sector_t p_size, p_usize, my_usize; 3640 int ldsc = 0; /* local disk size changed */ 3641 enum dds_flags ddsf; 3642 3643 peer_device = conn_peer_device(connection, pi->vnr); 3644 if (!peer_device) 3645 return config_unknown_volume(connection, pi); 3646 device = peer_device->device; 3647 3648 p_size = be64_to_cpu(p->d_size); 3649 p_usize = be64_to_cpu(p->u_size); 3650 3651 /* just store the peer's disk size for now. 3652 * we still need to figure out whether we accept that. */ 3653 device->p_size = p_size; 3654 3655 if (get_ldev(device)) { 3656 rcu_read_lock(); 3657 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size; 3658 rcu_read_unlock(); 3659 3660 warn_if_differ_considerably(device, "lower level device sizes", 3661 p_size, drbd_get_max_capacity(device->ldev)); 3662 warn_if_differ_considerably(device, "user requested size", 3663 p_usize, my_usize); 3664 3665 /* if this is the first connect, or an otherwise expected 3666 * param exchange, choose the minimum */ 3667 if (device->state.conn == C_WF_REPORT_PARAMS) 3668 p_usize = min_not_zero(my_usize, p_usize); 3669 3670 /* Never shrink a device with usable data during connect. 3671 But allow online shrinking if we are connected. */ 3672 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) < 3673 drbd_get_capacity(device->this_bdev) && 3674 device->state.disk >= D_OUTDATED && 3675 device->state.conn < C_CONNECTED) { 3676 drbd_err(device, "The peer's disk size is too small!\n"); 3677 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 3678 put_ldev(device); 3679 return -EIO; 3680 } 3681 3682 if (my_usize != p_usize) { 3683 struct disk_conf *old_disk_conf, *new_disk_conf = NULL; 3684 3685 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL); 3686 if (!new_disk_conf) { 3687 drbd_err(device, "Allocation of new disk_conf failed\n"); 3688 put_ldev(device); 3689 return -ENOMEM; 3690 } 3691 3692 mutex_lock(&connection->resource->conf_update); 3693 old_disk_conf = device->ldev->disk_conf; 3694 *new_disk_conf = *old_disk_conf; 3695 new_disk_conf->disk_size = p_usize; 3696 3697 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf); 3698 mutex_unlock(&connection->resource->conf_update); 3699 synchronize_rcu(); 3700 kfree(old_disk_conf); 3701 3702 drbd_info(device, "Peer sets u_size to %lu sectors\n", 3703 (unsigned long)my_usize); 3704 } 3705 3706 put_ldev(device); 3707 } 3708 3709 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size); 3710 drbd_reconsider_max_bio_size(device); 3711 /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size(). 3712 In case we cleared the QUEUE_FLAG_DISCARD from our queue in 3713 drbd_reconsider_max_bio_size(), we can be sure that after 3714 drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */ 3715 3716 ddsf = be16_to_cpu(p->dds_flags); 3717 if (get_ldev(device)) { 3718 dd = drbd_determine_dev_size(device, ddsf, NULL); 3719 put_ldev(device); 3720 if (dd == DS_ERROR) 3721 return -EIO; 3722 drbd_md_sync(device); 3723 } else { 3724 /* I am diskless, need to accept the peer's size. */ 3725 drbd_set_my_capacity(device, p_size); 3726 } 3727 3728 if (get_ldev(device)) { 3729 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) { 3730 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev); 3731 ldsc = 1; 3732 } 3733 3734 put_ldev(device); 3735 } 3736 3737 if (device->state.conn > C_WF_REPORT_PARAMS) { 3738 if (be64_to_cpu(p->c_size) != 3739 drbd_get_capacity(device->this_bdev) || ldsc) { 3740 /* we have different sizes, probably peer 3741 * needs to know my new size... */ 3742 drbd_send_sizes(peer_device, 0, ddsf); 3743 } 3744 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) || 3745 (dd == DS_GREW && device->state.conn == C_CONNECTED)) { 3746 if (device->state.pdsk >= D_INCONSISTENT && 3747 device->state.disk >= D_INCONSISTENT) { 3748 if (ddsf & DDSF_NO_RESYNC) 3749 drbd_info(device, "Resync of new storage suppressed with --assume-clean\n"); 3750 else 3751 resync_after_online_grow(device); 3752 } else 3753 set_bit(RESYNC_AFTER_NEG, &device->flags); 3754 } 3755 } 3756 3757 return 0; 3758 } 3759 3760 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi) 3761 { 3762 struct drbd_peer_device *peer_device; 3763 struct drbd_device *device; 3764 struct p_uuids *p = pi->data; 3765 u64 *p_uuid; 3766 int i, updated_uuids = 0; 3767 3768 peer_device = conn_peer_device(connection, pi->vnr); 3769 if (!peer_device) 3770 return config_unknown_volume(connection, pi); 3771 device = peer_device->device; 3772 3773 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO); 3774 if (!p_uuid) { 3775 drbd_err(device, "kmalloc of p_uuid failed\n"); 3776 return false; 3777 } 3778 3779 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++) 3780 p_uuid[i] = be64_to_cpu(p->uuid[i]); 3781 3782 kfree(device->p_uuid); 3783 device->p_uuid = p_uuid; 3784 3785 if (device->state.conn < C_CONNECTED && 3786 device->state.disk < D_INCONSISTENT && 3787 device->state.role == R_PRIMARY && 3788 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) { 3789 drbd_err(device, "Can only connect to data with current UUID=%016llX\n", 3790 (unsigned long long)device->ed_uuid); 3791 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 3792 return -EIO; 3793 } 3794 3795 if (get_ldev(device)) { 3796 int skip_initial_sync = 3797 device->state.conn == C_CONNECTED && 3798 peer_device->connection->agreed_pro_version >= 90 && 3799 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED && 3800 (p_uuid[UI_FLAGS] & 8); 3801 if (skip_initial_sync) { 3802 drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n"); 3803 drbd_bitmap_io(device, &drbd_bmio_clear_n_write, 3804 "clear_n_write from receive_uuids", 3805 BM_LOCKED_TEST_ALLOWED); 3806 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]); 3807 _drbd_uuid_set(device, UI_BITMAP, 0); 3808 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE), 3809 CS_VERBOSE, NULL); 3810 drbd_md_sync(device); 3811 updated_uuids = 1; 3812 } 3813 put_ldev(device); 3814 } else if (device->state.disk < D_INCONSISTENT && 3815 device->state.role == R_PRIMARY) { 3816 /* I am a diskless primary, the peer just created a new current UUID 3817 for me. */ 3818 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]); 3819 } 3820 3821 /* Before we test for the disk state, we should wait until an eventually 3822 ongoing cluster wide state change is finished. That is important if 3823 we are primary and are detaching from our disk. We need to see the 3824 new disk state... */ 3825 mutex_lock(device->state_mutex); 3826 mutex_unlock(device->state_mutex); 3827 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT) 3828 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]); 3829 3830 if (updated_uuids) 3831 drbd_print_uuids(device, "receiver updated UUIDs to"); 3832 3833 return 0; 3834 } 3835 3836 /** 3837 * convert_state() - Converts the peer's view of the cluster state to our point of view 3838 * @ps: The state as seen by the peer. 3839 */ 3840 static union drbd_state convert_state(union drbd_state ps) 3841 { 3842 union drbd_state ms; 3843 3844 static enum drbd_conns c_tab[] = { 3845 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS, 3846 [C_CONNECTED] = C_CONNECTED, 3847 3848 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T, 3849 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S, 3850 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */ 3851 [C_VERIFY_S] = C_VERIFY_T, 3852 [C_MASK] = C_MASK, 3853 }; 3854 3855 ms.i = ps.i; 3856 3857 ms.conn = c_tab[ps.conn]; 3858 ms.peer = ps.role; 3859 ms.role = ps.peer; 3860 ms.pdsk = ps.disk; 3861 ms.disk = ps.pdsk; 3862 ms.peer_isp = (ps.aftr_isp | ps.user_isp); 3863 3864 return ms; 3865 } 3866 3867 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi) 3868 { 3869 struct drbd_peer_device *peer_device; 3870 struct drbd_device *device; 3871 struct p_req_state *p = pi->data; 3872 union drbd_state mask, val; 3873 enum drbd_state_rv rv; 3874 3875 peer_device = conn_peer_device(connection, pi->vnr); 3876 if (!peer_device) 3877 return -EIO; 3878 device = peer_device->device; 3879 3880 mask.i = be32_to_cpu(p->mask); 3881 val.i = be32_to_cpu(p->val); 3882 3883 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) && 3884 mutex_is_locked(device->state_mutex)) { 3885 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG); 3886 return 0; 3887 } 3888 3889 mask = convert_state(mask); 3890 val = convert_state(val); 3891 3892 rv = drbd_change_state(device, CS_VERBOSE, mask, val); 3893 drbd_send_sr_reply(peer_device, rv); 3894 3895 drbd_md_sync(device); 3896 3897 return 0; 3898 } 3899 3900 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi) 3901 { 3902 struct p_req_state *p = pi->data; 3903 union drbd_state mask, val; 3904 enum drbd_state_rv rv; 3905 3906 mask.i = be32_to_cpu(p->mask); 3907 val.i = be32_to_cpu(p->val); 3908 3909 if (test_bit(RESOLVE_CONFLICTS, &connection->flags) && 3910 mutex_is_locked(&connection->cstate_mutex)) { 3911 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG); 3912 return 0; 3913 } 3914 3915 mask = convert_state(mask); 3916 val = convert_state(val); 3917 3918 rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL); 3919 conn_send_sr_reply(connection, rv); 3920 3921 return 0; 3922 } 3923 3924 static int receive_state(struct drbd_connection *connection, struct packet_info *pi) 3925 { 3926 struct drbd_peer_device *peer_device; 3927 struct drbd_device *device; 3928 struct p_state *p = pi->data; 3929 union drbd_state os, ns, peer_state; 3930 enum drbd_disk_state real_peer_disk; 3931 enum chg_state_flags cs_flags; 3932 int rv; 3933 3934 peer_device = conn_peer_device(connection, pi->vnr); 3935 if (!peer_device) 3936 return config_unknown_volume(connection, pi); 3937 device = peer_device->device; 3938 3939 peer_state.i = be32_to_cpu(p->state); 3940 3941 real_peer_disk = peer_state.disk; 3942 if (peer_state.disk == D_NEGOTIATING) { 3943 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT; 3944 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk)); 3945 } 3946 3947 spin_lock_irq(&device->resource->req_lock); 3948 retry: 3949 os = ns = drbd_read_state(device); 3950 spin_unlock_irq(&device->resource->req_lock); 3951 3952 /* If some other part of the code (asender thread, timeout) 3953 * already decided to close the connection again, 3954 * we must not "re-establish" it here. */ 3955 if (os.conn <= C_TEAR_DOWN) 3956 return -ECONNRESET; 3957 3958 /* If this is the "end of sync" confirmation, usually the peer disk 3959 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits 3960 * set) resync started in PausedSyncT, or if the timing of pause-/ 3961 * unpause-sync events has been "just right", the peer disk may 3962 * transition from D_CONSISTENT to D_UP_TO_DATE as well. 3963 */ 3964 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) && 3965 real_peer_disk == D_UP_TO_DATE && 3966 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) { 3967 /* If we are (becoming) SyncSource, but peer is still in sync 3968 * preparation, ignore its uptodate-ness to avoid flapping, it 3969 * will change to inconsistent once the peer reaches active 3970 * syncing states. 3971 * It may have changed syncer-paused flags, however, so we 3972 * cannot ignore this completely. */ 3973 if (peer_state.conn > C_CONNECTED && 3974 peer_state.conn < C_SYNC_SOURCE) 3975 real_peer_disk = D_INCONSISTENT; 3976 3977 /* if peer_state changes to connected at the same time, 3978 * it explicitly notifies us that it finished resync. 3979 * Maybe we should finish it up, too? */ 3980 else if (os.conn >= C_SYNC_SOURCE && 3981 peer_state.conn == C_CONNECTED) { 3982 if (drbd_bm_total_weight(device) <= device->rs_failed) 3983 drbd_resync_finished(device); 3984 return 0; 3985 } 3986 } 3987 3988 /* explicit verify finished notification, stop sector reached. */ 3989 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE && 3990 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) { 3991 ov_out_of_sync_print(device); 3992 drbd_resync_finished(device); 3993 return 0; 3994 } 3995 3996 /* peer says his disk is inconsistent, while we think it is uptodate, 3997 * and this happens while the peer still thinks we have a sync going on, 3998 * but we think we are already done with the sync. 3999 * We ignore this to avoid flapping pdsk. 4000 * This should not happen, if the peer is a recent version of drbd. */ 4001 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT && 4002 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE) 4003 real_peer_disk = D_UP_TO_DATE; 4004 4005 if (ns.conn == C_WF_REPORT_PARAMS) 4006 ns.conn = C_CONNECTED; 4007 4008 if (peer_state.conn == C_AHEAD) 4009 ns.conn = C_BEHIND; 4010 4011 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING && 4012 get_ldev_if_state(device, D_NEGOTIATING)) { 4013 int cr; /* consider resync */ 4014 4015 /* if we established a new connection */ 4016 cr = (os.conn < C_CONNECTED); 4017 /* if we had an established connection 4018 * and one of the nodes newly attaches a disk */ 4019 cr |= (os.conn == C_CONNECTED && 4020 (peer_state.disk == D_NEGOTIATING || 4021 os.disk == D_NEGOTIATING)); 4022 /* if we have both been inconsistent, and the peer has been 4023 * forced to be UpToDate with --overwrite-data */ 4024 cr |= test_bit(CONSIDER_RESYNC, &device->flags); 4025 /* if we had been plain connected, and the admin requested to 4026 * start a sync by "invalidate" or "invalidate-remote" */ 4027 cr |= (os.conn == C_CONNECTED && 4028 (peer_state.conn >= C_STARTING_SYNC_S && 4029 peer_state.conn <= C_WF_BITMAP_T)); 4030 4031 if (cr) 4032 ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk); 4033 4034 put_ldev(device); 4035 if (ns.conn == C_MASK) { 4036 ns.conn = C_CONNECTED; 4037 if (device->state.disk == D_NEGOTIATING) { 4038 drbd_force_state(device, NS(disk, D_FAILED)); 4039 } else if (peer_state.disk == D_NEGOTIATING) { 4040 drbd_err(device, "Disk attach process on the peer node was aborted.\n"); 4041 peer_state.disk = D_DISKLESS; 4042 real_peer_disk = D_DISKLESS; 4043 } else { 4044 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags)) 4045 return -EIO; 4046 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS); 4047 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 4048 return -EIO; 4049 } 4050 } 4051 } 4052 4053 spin_lock_irq(&device->resource->req_lock); 4054 if (os.i != drbd_read_state(device).i) 4055 goto retry; 4056 clear_bit(CONSIDER_RESYNC, &device->flags); 4057 ns.peer = peer_state.role; 4058 ns.pdsk = real_peer_disk; 4059 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp); 4060 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING) 4061 ns.disk = device->new_state_tmp.disk; 4062 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD); 4063 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED && 4064 test_bit(NEW_CUR_UUID, &device->flags)) { 4065 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this 4066 for temporal network outages! */ 4067 spin_unlock_irq(&device->resource->req_lock); 4068 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n"); 4069 tl_clear(peer_device->connection); 4070 drbd_uuid_new_current(device); 4071 clear_bit(NEW_CUR_UUID, &device->flags); 4072 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD); 4073 return -EIO; 4074 } 4075 rv = _drbd_set_state(device, ns, cs_flags, NULL); 4076 ns = drbd_read_state(device); 4077 spin_unlock_irq(&device->resource->req_lock); 4078 4079 if (rv < SS_SUCCESS) { 4080 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 4081 return -EIO; 4082 } 4083 4084 if (os.conn > C_WF_REPORT_PARAMS) { 4085 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED && 4086 peer_state.disk != D_NEGOTIATING ) { 4087 /* we want resync, peer has not yet decided to sync... */ 4088 /* Nowadays only used when forcing a node into primary role and 4089 setting its disk to UpToDate with that */ 4090 drbd_send_uuids(peer_device); 4091 drbd_send_current_state(peer_device); 4092 } 4093 } 4094 4095 clear_bit(DISCARD_MY_DATA, &device->flags); 4096 4097 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */ 4098 4099 return 0; 4100 } 4101 4102 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi) 4103 { 4104 struct drbd_peer_device *peer_device; 4105 struct drbd_device *device; 4106 struct p_rs_uuid *p = pi->data; 4107 4108 peer_device = conn_peer_device(connection, pi->vnr); 4109 if (!peer_device) 4110 return -EIO; 4111 device = peer_device->device; 4112 4113 wait_event(device->misc_wait, 4114 device->state.conn == C_WF_SYNC_UUID || 4115 device->state.conn == C_BEHIND || 4116 device->state.conn < C_CONNECTED || 4117 device->state.disk < D_NEGOTIATING); 4118 4119 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */ 4120 4121 /* Here the _drbd_uuid_ functions are right, current should 4122 _not_ be rotated into the history */ 4123 if (get_ldev_if_state(device, D_NEGOTIATING)) { 4124 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid)); 4125 _drbd_uuid_set(device, UI_BITMAP, 0UL); 4126 4127 drbd_print_uuids(device, "updated sync uuid"); 4128 drbd_start_resync(device, C_SYNC_TARGET); 4129 4130 put_ldev(device); 4131 } else 4132 drbd_err(device, "Ignoring SyncUUID packet!\n"); 4133 4134 return 0; 4135 } 4136 4137 /** 4138 * receive_bitmap_plain 4139 * 4140 * Return 0 when done, 1 when another iteration is needed, and a negative error 4141 * code upon failure. 4142 */ 4143 static int 4144 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size, 4145 unsigned long *p, struct bm_xfer_ctx *c) 4146 { 4147 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - 4148 drbd_header_size(peer_device->connection); 4149 unsigned int num_words = min_t(size_t, data_size / sizeof(*p), 4150 c->bm_words - c->word_offset); 4151 unsigned int want = num_words * sizeof(*p); 4152 int err; 4153 4154 if (want != size) { 4155 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size); 4156 return -EIO; 4157 } 4158 if (want == 0) 4159 return 0; 4160 err = drbd_recv_all(peer_device->connection, p, want); 4161 if (err) 4162 return err; 4163 4164 drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p); 4165 4166 c->word_offset += num_words; 4167 c->bit_offset = c->word_offset * BITS_PER_LONG; 4168 if (c->bit_offset > c->bm_bits) 4169 c->bit_offset = c->bm_bits; 4170 4171 return 1; 4172 } 4173 4174 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p) 4175 { 4176 return (enum drbd_bitmap_code)(p->encoding & 0x0f); 4177 } 4178 4179 static int dcbp_get_start(struct p_compressed_bm *p) 4180 { 4181 return (p->encoding & 0x80) != 0; 4182 } 4183 4184 static int dcbp_get_pad_bits(struct p_compressed_bm *p) 4185 { 4186 return (p->encoding >> 4) & 0x7; 4187 } 4188 4189 /** 4190 * recv_bm_rle_bits 4191 * 4192 * Return 0 when done, 1 when another iteration is needed, and a negative error 4193 * code upon failure. 4194 */ 4195 static int 4196 recv_bm_rle_bits(struct drbd_peer_device *peer_device, 4197 struct p_compressed_bm *p, 4198 struct bm_xfer_ctx *c, 4199 unsigned int len) 4200 { 4201 struct bitstream bs; 4202 u64 look_ahead; 4203 u64 rl; 4204 u64 tmp; 4205 unsigned long s = c->bit_offset; 4206 unsigned long e; 4207 int toggle = dcbp_get_start(p); 4208 int have; 4209 int bits; 4210 4211 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p)); 4212 4213 bits = bitstream_get_bits(&bs, &look_ahead, 64); 4214 if (bits < 0) 4215 return -EIO; 4216 4217 for (have = bits; have > 0; s += rl, toggle = !toggle) { 4218 bits = vli_decode_bits(&rl, look_ahead); 4219 if (bits <= 0) 4220 return -EIO; 4221 4222 if (toggle) { 4223 e = s + rl -1; 4224 if (e >= c->bm_bits) { 4225 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e); 4226 return -EIO; 4227 } 4228 _drbd_bm_set_bits(peer_device->device, s, e); 4229 } 4230 4231 if (have < bits) { 4232 drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n", 4233 have, bits, look_ahead, 4234 (unsigned int)(bs.cur.b - p->code), 4235 (unsigned int)bs.buf_len); 4236 return -EIO; 4237 } 4238 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */ 4239 if (likely(bits < 64)) 4240 look_ahead >>= bits; 4241 else 4242 look_ahead = 0; 4243 have -= bits; 4244 4245 bits = bitstream_get_bits(&bs, &tmp, 64 - have); 4246 if (bits < 0) 4247 return -EIO; 4248 look_ahead |= tmp << have; 4249 have += bits; 4250 } 4251 4252 c->bit_offset = s; 4253 bm_xfer_ctx_bit_to_word_offset(c); 4254 4255 return (s != c->bm_bits); 4256 } 4257 4258 /** 4259 * decode_bitmap_c 4260 * 4261 * Return 0 when done, 1 when another iteration is needed, and a negative error 4262 * code upon failure. 4263 */ 4264 static int 4265 decode_bitmap_c(struct drbd_peer_device *peer_device, 4266 struct p_compressed_bm *p, 4267 struct bm_xfer_ctx *c, 4268 unsigned int len) 4269 { 4270 if (dcbp_get_code(p) == RLE_VLI_Bits) 4271 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p)); 4272 4273 /* other variants had been implemented for evaluation, 4274 * but have been dropped as this one turned out to be "best" 4275 * during all our tests. */ 4276 4277 drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding); 4278 conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD); 4279 return -EIO; 4280 } 4281 4282 void INFO_bm_xfer_stats(struct drbd_device *device, 4283 const char *direction, struct bm_xfer_ctx *c) 4284 { 4285 /* what would it take to transfer it "plaintext" */ 4286 unsigned int header_size = drbd_header_size(first_peer_device(device)->connection); 4287 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size; 4288 unsigned int plain = 4289 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) + 4290 c->bm_words * sizeof(unsigned long); 4291 unsigned int total = c->bytes[0] + c->bytes[1]; 4292 unsigned int r; 4293 4294 /* total can not be zero. but just in case: */ 4295 if (total == 0) 4296 return; 4297 4298 /* don't report if not compressed */ 4299 if (total >= plain) 4300 return; 4301 4302 /* total < plain. check for overflow, still */ 4303 r = (total > UINT_MAX/1000) ? (total / (plain/1000)) 4304 : (1000 * total / plain); 4305 4306 if (r > 1000) 4307 r = 1000; 4308 4309 r = 1000 - r; 4310 drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), " 4311 "total %u; compression: %u.%u%%\n", 4312 direction, 4313 c->bytes[1], c->packets[1], 4314 c->bytes[0], c->packets[0], 4315 total, r/10, r % 10); 4316 } 4317 4318 /* Since we are processing the bitfield from lower addresses to higher, 4319 it does not matter if the process it in 32 bit chunks or 64 bit 4320 chunks as long as it is little endian. (Understand it as byte stream, 4321 beginning with the lowest byte...) If we would use big endian 4322 we would need to process it from the highest address to the lowest, 4323 in order to be agnostic to the 32 vs 64 bits issue. 4324 4325 returns 0 on failure, 1 if we successfully received it. */ 4326 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi) 4327 { 4328 struct drbd_peer_device *peer_device; 4329 struct drbd_device *device; 4330 struct bm_xfer_ctx c; 4331 int err; 4332 4333 peer_device = conn_peer_device(connection, pi->vnr); 4334 if (!peer_device) 4335 return -EIO; 4336 device = peer_device->device; 4337 4338 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED); 4339 /* you are supposed to send additional out-of-sync information 4340 * if you actually set bits during this phase */ 4341 4342 c = (struct bm_xfer_ctx) { 4343 .bm_bits = drbd_bm_bits(device), 4344 .bm_words = drbd_bm_words(device), 4345 }; 4346 4347 for(;;) { 4348 if (pi->cmd == P_BITMAP) 4349 err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c); 4350 else if (pi->cmd == P_COMPRESSED_BITMAP) { 4351 /* MAYBE: sanity check that we speak proto >= 90, 4352 * and the feature is enabled! */ 4353 struct p_compressed_bm *p = pi->data; 4354 4355 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) { 4356 drbd_err(device, "ReportCBitmap packet too large\n"); 4357 err = -EIO; 4358 goto out; 4359 } 4360 if (pi->size <= sizeof(*p)) { 4361 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size); 4362 err = -EIO; 4363 goto out; 4364 } 4365 err = drbd_recv_all(peer_device->connection, p, pi->size); 4366 if (err) 4367 goto out; 4368 err = decode_bitmap_c(peer_device, p, &c, pi->size); 4369 } else { 4370 drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd); 4371 err = -EIO; 4372 goto out; 4373 } 4374 4375 c.packets[pi->cmd == P_BITMAP]++; 4376 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size; 4377 4378 if (err <= 0) { 4379 if (err < 0) 4380 goto out; 4381 break; 4382 } 4383 err = drbd_recv_header(peer_device->connection, pi); 4384 if (err) 4385 goto out; 4386 } 4387 4388 INFO_bm_xfer_stats(device, "receive", &c); 4389 4390 if (device->state.conn == C_WF_BITMAP_T) { 4391 enum drbd_state_rv rv; 4392 4393 err = drbd_send_bitmap(device); 4394 if (err) 4395 goto out; 4396 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */ 4397 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE); 4398 D_ASSERT(device, rv == SS_SUCCESS); 4399 } else if (device->state.conn != C_WF_BITMAP_S) { 4400 /* admin may have requested C_DISCONNECTING, 4401 * other threads may have noticed network errors */ 4402 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n", 4403 drbd_conn_str(device->state.conn)); 4404 } 4405 err = 0; 4406 4407 out: 4408 drbd_bm_unlock(device); 4409 if (!err && device->state.conn == C_WF_BITMAP_S) 4410 drbd_start_resync(device, C_SYNC_SOURCE); 4411 return err; 4412 } 4413 4414 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi) 4415 { 4416 drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n", 4417 pi->cmd, pi->size); 4418 4419 return ignore_remaining_packet(connection, pi); 4420 } 4421 4422 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi) 4423 { 4424 /* Make sure we've acked all the TCP data associated 4425 * with the data requests being unplugged */ 4426 drbd_tcp_quickack(connection->data.socket); 4427 4428 return 0; 4429 } 4430 4431 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi) 4432 { 4433 struct drbd_peer_device *peer_device; 4434 struct drbd_device *device; 4435 struct p_block_desc *p = pi->data; 4436 4437 peer_device = conn_peer_device(connection, pi->vnr); 4438 if (!peer_device) 4439 return -EIO; 4440 device = peer_device->device; 4441 4442 switch (device->state.conn) { 4443 case C_WF_SYNC_UUID: 4444 case C_WF_BITMAP_T: 4445 case C_BEHIND: 4446 break; 4447 default: 4448 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n", 4449 drbd_conn_str(device->state.conn)); 4450 } 4451 4452 drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize)); 4453 4454 return 0; 4455 } 4456 4457 struct data_cmd { 4458 int expect_payload; 4459 size_t pkt_size; 4460 int (*fn)(struct drbd_connection *, struct packet_info *); 4461 }; 4462 4463 static struct data_cmd drbd_cmd_handler[] = { 4464 [P_DATA] = { 1, sizeof(struct p_data), receive_Data }, 4465 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply }, 4466 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } , 4467 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } , 4468 [P_BITMAP] = { 1, 0, receive_bitmap } , 4469 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } , 4470 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote }, 4471 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4472 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4473 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam }, 4474 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam }, 4475 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol }, 4476 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids }, 4477 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes }, 4478 [P_STATE] = { 0, sizeof(struct p_state), receive_state }, 4479 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state }, 4480 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid }, 4481 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4482 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest }, 4483 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest }, 4484 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip }, 4485 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync }, 4486 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state }, 4487 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol }, 4488 [P_TRIM] = { 0, sizeof(struct p_trim), receive_Data }, 4489 }; 4490 4491 static void drbdd(struct drbd_connection *connection) 4492 { 4493 struct packet_info pi; 4494 size_t shs; /* sub header size */ 4495 int err; 4496 4497 while (get_t_state(&connection->receiver) == RUNNING) { 4498 struct data_cmd *cmd; 4499 4500 drbd_thread_current_set_cpu(&connection->receiver); 4501 if (drbd_recv_header(connection, &pi)) 4502 goto err_out; 4503 4504 cmd = &drbd_cmd_handler[pi.cmd]; 4505 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) { 4506 drbd_err(connection, "Unexpected data packet %s (0x%04x)", 4507 cmdname(pi.cmd), pi.cmd); 4508 goto err_out; 4509 } 4510 4511 shs = cmd->pkt_size; 4512 if (pi.size > shs && !cmd->expect_payload) { 4513 drbd_err(connection, "No payload expected %s l:%d\n", 4514 cmdname(pi.cmd), pi.size); 4515 goto err_out; 4516 } 4517 4518 if (shs) { 4519 err = drbd_recv_all_warn(connection, pi.data, shs); 4520 if (err) 4521 goto err_out; 4522 pi.size -= shs; 4523 } 4524 4525 err = cmd->fn(connection, &pi); 4526 if (err) { 4527 drbd_err(connection, "error receiving %s, e: %d l: %d!\n", 4528 cmdname(pi.cmd), err, pi.size); 4529 goto err_out; 4530 } 4531 } 4532 return; 4533 4534 err_out: 4535 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD); 4536 } 4537 4538 static void conn_disconnect(struct drbd_connection *connection) 4539 { 4540 struct drbd_peer_device *peer_device; 4541 enum drbd_conns oc; 4542 int vnr; 4543 4544 if (connection->cstate == C_STANDALONE) 4545 return; 4546 4547 /* We are about to start the cleanup after connection loss. 4548 * Make sure drbd_make_request knows about that. 4549 * Usually we should be in some network failure state already, 4550 * but just in case we are not, we fix it up here. 4551 */ 4552 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 4553 4554 /* asender does not clean up anything. it must not interfere, either */ 4555 drbd_thread_stop(&connection->asender); 4556 drbd_free_sock(connection); 4557 4558 rcu_read_lock(); 4559 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 4560 struct drbd_device *device = peer_device->device; 4561 kref_get(&device->kref); 4562 rcu_read_unlock(); 4563 drbd_disconnected(peer_device); 4564 kref_put(&device->kref, drbd_destroy_device); 4565 rcu_read_lock(); 4566 } 4567 rcu_read_unlock(); 4568 4569 if (!list_empty(&connection->current_epoch->list)) 4570 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n"); 4571 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */ 4572 atomic_set(&connection->current_epoch->epoch_size, 0); 4573 connection->send.seen_any_write_yet = false; 4574 4575 drbd_info(connection, "Connection closed\n"); 4576 4577 if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN) 4578 conn_try_outdate_peer_async(connection); 4579 4580 spin_lock_irq(&connection->resource->req_lock); 4581 oc = connection->cstate; 4582 if (oc >= C_UNCONNECTED) 4583 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE); 4584 4585 spin_unlock_irq(&connection->resource->req_lock); 4586 4587 if (oc == C_DISCONNECTING) 4588 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD); 4589 } 4590 4591 static int drbd_disconnected(struct drbd_peer_device *peer_device) 4592 { 4593 struct drbd_device *device = peer_device->device; 4594 unsigned int i; 4595 4596 /* wait for current activity to cease. */ 4597 spin_lock_irq(&device->resource->req_lock); 4598 _drbd_wait_ee_list_empty(device, &device->active_ee); 4599 _drbd_wait_ee_list_empty(device, &device->sync_ee); 4600 _drbd_wait_ee_list_empty(device, &device->read_ee); 4601 spin_unlock_irq(&device->resource->req_lock); 4602 4603 /* We do not have data structures that would allow us to 4604 * get the rs_pending_cnt down to 0 again. 4605 * * On C_SYNC_TARGET we do not have any data structures describing 4606 * the pending RSDataRequest's we have sent. 4607 * * On C_SYNC_SOURCE there is no data structure that tracks 4608 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget. 4609 * And no, it is not the sum of the reference counts in the 4610 * resync_LRU. The resync_LRU tracks the whole operation including 4611 * the disk-IO, while the rs_pending_cnt only tracks the blocks 4612 * on the fly. */ 4613 drbd_rs_cancel_all(device); 4614 device->rs_total = 0; 4615 device->rs_failed = 0; 4616 atomic_set(&device->rs_pending_cnt, 0); 4617 wake_up(&device->misc_wait); 4618 4619 del_timer_sync(&device->resync_timer); 4620 resync_timer_fn((unsigned long)device); 4621 4622 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier, 4623 * w_make_resync_request etc. which may still be on the worker queue 4624 * to be "canceled" */ 4625 drbd_flush_workqueue(&peer_device->connection->sender_work); 4626 4627 drbd_finish_peer_reqs(device); 4628 4629 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs() 4630 might have issued a work again. The one before drbd_finish_peer_reqs() is 4631 necessary to reclain net_ee in drbd_finish_peer_reqs(). */ 4632 drbd_flush_workqueue(&peer_device->connection->sender_work); 4633 4634 /* need to do it again, drbd_finish_peer_reqs() may have populated it 4635 * again via drbd_try_clear_on_disk_bm(). */ 4636 drbd_rs_cancel_all(device); 4637 4638 kfree(device->p_uuid); 4639 device->p_uuid = NULL; 4640 4641 if (!drbd_suspended(device)) 4642 tl_clear(peer_device->connection); 4643 4644 drbd_md_sync(device); 4645 4646 /* serialize with bitmap writeout triggered by the state change, 4647 * if any. */ 4648 wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags)); 4649 4650 /* tcp_close and release of sendpage pages can be deferred. I don't 4651 * want to use SO_LINGER, because apparently it can be deferred for 4652 * more than 20 seconds (longest time I checked). 4653 * 4654 * Actually we don't care for exactly when the network stack does its 4655 * put_page(), but release our reference on these pages right here. 4656 */ 4657 i = drbd_free_peer_reqs(device, &device->net_ee); 4658 if (i) 4659 drbd_info(device, "net_ee not empty, killed %u entries\n", i); 4660 i = atomic_read(&device->pp_in_use_by_net); 4661 if (i) 4662 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i); 4663 i = atomic_read(&device->pp_in_use); 4664 if (i) 4665 drbd_info(device, "pp_in_use = %d, expected 0\n", i); 4666 4667 D_ASSERT(device, list_empty(&device->read_ee)); 4668 D_ASSERT(device, list_empty(&device->active_ee)); 4669 D_ASSERT(device, list_empty(&device->sync_ee)); 4670 D_ASSERT(device, list_empty(&device->done_ee)); 4671 4672 return 0; 4673 } 4674 4675 /* 4676 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version 4677 * we can agree on is stored in agreed_pro_version. 4678 * 4679 * feature flags and the reserved array should be enough room for future 4680 * enhancements of the handshake protocol, and possible plugins... 4681 * 4682 * for now, they are expected to be zero, but ignored. 4683 */ 4684 static int drbd_send_features(struct drbd_connection *connection) 4685 { 4686 struct drbd_socket *sock; 4687 struct p_connection_features *p; 4688 4689 sock = &connection->data; 4690 p = conn_prepare_command(connection, sock); 4691 if (!p) 4692 return -EIO; 4693 memset(p, 0, sizeof(*p)); 4694 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN); 4695 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX); 4696 p->feature_flags = cpu_to_be32(PRO_FEATURES); 4697 return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0); 4698 } 4699 4700 /* 4701 * return values: 4702 * 1 yes, we have a valid connection 4703 * 0 oops, did not work out, please try again 4704 * -1 peer talks different language, 4705 * no point in trying again, please go standalone. 4706 */ 4707 static int drbd_do_features(struct drbd_connection *connection) 4708 { 4709 /* ASSERT current == connection->receiver ... */ 4710 struct p_connection_features *p; 4711 const int expect = sizeof(struct p_connection_features); 4712 struct packet_info pi; 4713 int err; 4714 4715 err = drbd_send_features(connection); 4716 if (err) 4717 return 0; 4718 4719 err = drbd_recv_header(connection, &pi); 4720 if (err) 4721 return 0; 4722 4723 if (pi.cmd != P_CONNECTION_FEATURES) { 4724 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n", 4725 cmdname(pi.cmd), pi.cmd); 4726 return -1; 4727 } 4728 4729 if (pi.size != expect) { 4730 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n", 4731 expect, pi.size); 4732 return -1; 4733 } 4734 4735 p = pi.data; 4736 err = drbd_recv_all_warn(connection, p, expect); 4737 if (err) 4738 return 0; 4739 4740 p->protocol_min = be32_to_cpu(p->protocol_min); 4741 p->protocol_max = be32_to_cpu(p->protocol_max); 4742 if (p->protocol_max == 0) 4743 p->protocol_max = p->protocol_min; 4744 4745 if (PRO_VERSION_MAX < p->protocol_min || 4746 PRO_VERSION_MIN > p->protocol_max) 4747 goto incompat; 4748 4749 connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max); 4750 connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags); 4751 4752 drbd_info(connection, "Handshake successful: " 4753 "Agreed network protocol version %d\n", connection->agreed_pro_version); 4754 4755 drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n", 4756 connection->agreed_features & FF_TRIM ? " " : " not "); 4757 4758 return 1; 4759 4760 incompat: 4761 drbd_err(connection, "incompatible DRBD dialects: " 4762 "I support %d-%d, peer supports %d-%d\n", 4763 PRO_VERSION_MIN, PRO_VERSION_MAX, 4764 p->protocol_min, p->protocol_max); 4765 return -1; 4766 } 4767 4768 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE) 4769 static int drbd_do_auth(struct drbd_connection *connection) 4770 { 4771 drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n"); 4772 drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n"); 4773 return -1; 4774 } 4775 #else 4776 #define CHALLENGE_LEN 64 4777 4778 /* Return value: 4779 1 - auth succeeded, 4780 0 - failed, try again (network error), 4781 -1 - auth failed, don't try again. 4782 */ 4783 4784 static int drbd_do_auth(struct drbd_connection *connection) 4785 { 4786 struct drbd_socket *sock; 4787 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */ 4788 struct scatterlist sg; 4789 char *response = NULL; 4790 char *right_response = NULL; 4791 char *peers_ch = NULL; 4792 unsigned int key_len; 4793 char secret[SHARED_SECRET_MAX]; /* 64 byte */ 4794 unsigned int resp_size; 4795 struct hash_desc desc; 4796 struct packet_info pi; 4797 struct net_conf *nc; 4798 int err, rv; 4799 4800 /* FIXME: Put the challenge/response into the preallocated socket buffer. */ 4801 4802 rcu_read_lock(); 4803 nc = rcu_dereference(connection->net_conf); 4804 key_len = strlen(nc->shared_secret); 4805 memcpy(secret, nc->shared_secret, key_len); 4806 rcu_read_unlock(); 4807 4808 desc.tfm = connection->cram_hmac_tfm; 4809 desc.flags = 0; 4810 4811 rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len); 4812 if (rv) { 4813 drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv); 4814 rv = -1; 4815 goto fail; 4816 } 4817 4818 get_random_bytes(my_challenge, CHALLENGE_LEN); 4819 4820 sock = &connection->data; 4821 if (!conn_prepare_command(connection, sock)) { 4822 rv = 0; 4823 goto fail; 4824 } 4825 rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0, 4826 my_challenge, CHALLENGE_LEN); 4827 if (!rv) 4828 goto fail; 4829 4830 err = drbd_recv_header(connection, &pi); 4831 if (err) { 4832 rv = 0; 4833 goto fail; 4834 } 4835 4836 if (pi.cmd != P_AUTH_CHALLENGE) { 4837 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n", 4838 cmdname(pi.cmd), pi.cmd); 4839 rv = 0; 4840 goto fail; 4841 } 4842 4843 if (pi.size > CHALLENGE_LEN * 2) { 4844 drbd_err(connection, "expected AuthChallenge payload too big.\n"); 4845 rv = -1; 4846 goto fail; 4847 } 4848 4849 if (pi.size < CHALLENGE_LEN) { 4850 drbd_err(connection, "AuthChallenge payload too small.\n"); 4851 rv = -1; 4852 goto fail; 4853 } 4854 4855 peers_ch = kmalloc(pi.size, GFP_NOIO); 4856 if (peers_ch == NULL) { 4857 drbd_err(connection, "kmalloc of peers_ch failed\n"); 4858 rv = -1; 4859 goto fail; 4860 } 4861 4862 err = drbd_recv_all_warn(connection, peers_ch, pi.size); 4863 if (err) { 4864 rv = 0; 4865 goto fail; 4866 } 4867 4868 if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) { 4869 drbd_err(connection, "Peer presented the same challenge!\n"); 4870 rv = -1; 4871 goto fail; 4872 } 4873 4874 resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm); 4875 response = kmalloc(resp_size, GFP_NOIO); 4876 if (response == NULL) { 4877 drbd_err(connection, "kmalloc of response failed\n"); 4878 rv = -1; 4879 goto fail; 4880 } 4881 4882 sg_init_table(&sg, 1); 4883 sg_set_buf(&sg, peers_ch, pi.size); 4884 4885 rv = crypto_hash_digest(&desc, &sg, sg.length, response); 4886 if (rv) { 4887 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv); 4888 rv = -1; 4889 goto fail; 4890 } 4891 4892 if (!conn_prepare_command(connection, sock)) { 4893 rv = 0; 4894 goto fail; 4895 } 4896 rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0, 4897 response, resp_size); 4898 if (!rv) 4899 goto fail; 4900 4901 err = drbd_recv_header(connection, &pi); 4902 if (err) { 4903 rv = 0; 4904 goto fail; 4905 } 4906 4907 if (pi.cmd != P_AUTH_RESPONSE) { 4908 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n", 4909 cmdname(pi.cmd), pi.cmd); 4910 rv = 0; 4911 goto fail; 4912 } 4913 4914 if (pi.size != resp_size) { 4915 drbd_err(connection, "expected AuthResponse payload of wrong size\n"); 4916 rv = 0; 4917 goto fail; 4918 } 4919 4920 err = drbd_recv_all_warn(connection, response , resp_size); 4921 if (err) { 4922 rv = 0; 4923 goto fail; 4924 } 4925 4926 right_response = kmalloc(resp_size, GFP_NOIO); 4927 if (right_response == NULL) { 4928 drbd_err(connection, "kmalloc of right_response failed\n"); 4929 rv = -1; 4930 goto fail; 4931 } 4932 4933 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN); 4934 4935 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response); 4936 if (rv) { 4937 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv); 4938 rv = -1; 4939 goto fail; 4940 } 4941 4942 rv = !memcmp(response, right_response, resp_size); 4943 4944 if (rv) 4945 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n", 4946 resp_size); 4947 else 4948 rv = -1; 4949 4950 fail: 4951 kfree(peers_ch); 4952 kfree(response); 4953 kfree(right_response); 4954 4955 return rv; 4956 } 4957 #endif 4958 4959 int drbd_receiver(struct drbd_thread *thi) 4960 { 4961 struct drbd_connection *connection = thi->connection; 4962 int h; 4963 4964 drbd_info(connection, "receiver (re)started\n"); 4965 4966 do { 4967 h = conn_connect(connection); 4968 if (h == 0) { 4969 conn_disconnect(connection); 4970 schedule_timeout_interruptible(HZ); 4971 } 4972 if (h == -1) { 4973 drbd_warn(connection, "Discarding network configuration.\n"); 4974 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 4975 } 4976 } while (h == 0); 4977 4978 if (h > 0) 4979 drbdd(connection); 4980 4981 conn_disconnect(connection); 4982 4983 drbd_info(connection, "receiver terminated\n"); 4984 return 0; 4985 } 4986 4987 /* ********* acknowledge sender ******** */ 4988 4989 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi) 4990 { 4991 struct p_req_state_reply *p = pi->data; 4992 int retcode = be32_to_cpu(p->retcode); 4993 4994 if (retcode >= SS_SUCCESS) { 4995 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags); 4996 } else { 4997 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags); 4998 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n", 4999 drbd_set_st_err_str(retcode), retcode); 5000 } 5001 wake_up(&connection->ping_wait); 5002 5003 return 0; 5004 } 5005 5006 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi) 5007 { 5008 struct drbd_peer_device *peer_device; 5009 struct drbd_device *device; 5010 struct p_req_state_reply *p = pi->data; 5011 int retcode = be32_to_cpu(p->retcode); 5012 5013 peer_device = conn_peer_device(connection, pi->vnr); 5014 if (!peer_device) 5015 return -EIO; 5016 device = peer_device->device; 5017 5018 if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) { 5019 D_ASSERT(device, connection->agreed_pro_version < 100); 5020 return got_conn_RqSReply(connection, pi); 5021 } 5022 5023 if (retcode >= SS_SUCCESS) { 5024 set_bit(CL_ST_CHG_SUCCESS, &device->flags); 5025 } else { 5026 set_bit(CL_ST_CHG_FAIL, &device->flags); 5027 drbd_err(device, "Requested state change failed by peer: %s (%d)\n", 5028 drbd_set_st_err_str(retcode), retcode); 5029 } 5030 wake_up(&device->state_wait); 5031 5032 return 0; 5033 } 5034 5035 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi) 5036 { 5037 return drbd_send_ping_ack(connection); 5038 5039 } 5040 5041 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi) 5042 { 5043 /* restore idle timeout */ 5044 connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ; 5045 if (!test_and_set_bit(GOT_PING_ACK, &connection->flags)) 5046 wake_up(&connection->ping_wait); 5047 5048 return 0; 5049 } 5050 5051 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi) 5052 { 5053 struct drbd_peer_device *peer_device; 5054 struct drbd_device *device; 5055 struct p_block_ack *p = pi->data; 5056 sector_t sector = be64_to_cpu(p->sector); 5057 int blksize = be32_to_cpu(p->blksize); 5058 5059 peer_device = conn_peer_device(connection, pi->vnr); 5060 if (!peer_device) 5061 return -EIO; 5062 device = peer_device->device; 5063 5064 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89); 5065 5066 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5067 5068 if (get_ldev(device)) { 5069 drbd_rs_complete_io(device, sector); 5070 drbd_set_in_sync(device, sector, blksize); 5071 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */ 5072 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT); 5073 put_ldev(device); 5074 } 5075 dec_rs_pending(device); 5076 atomic_add(blksize >> 9, &device->rs_sect_in); 5077 5078 return 0; 5079 } 5080 5081 static int 5082 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector, 5083 struct rb_root *root, const char *func, 5084 enum drbd_req_event what, bool missing_ok) 5085 { 5086 struct drbd_request *req; 5087 struct bio_and_error m; 5088 5089 spin_lock_irq(&device->resource->req_lock); 5090 req = find_request(device, root, id, sector, missing_ok, func); 5091 if (unlikely(!req)) { 5092 spin_unlock_irq(&device->resource->req_lock); 5093 return -EIO; 5094 } 5095 __req_mod(req, what, &m); 5096 spin_unlock_irq(&device->resource->req_lock); 5097 5098 if (m.bio) 5099 complete_master_bio(device, &m); 5100 return 0; 5101 } 5102 5103 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi) 5104 { 5105 struct drbd_peer_device *peer_device; 5106 struct drbd_device *device; 5107 struct p_block_ack *p = pi->data; 5108 sector_t sector = be64_to_cpu(p->sector); 5109 int blksize = be32_to_cpu(p->blksize); 5110 enum drbd_req_event what; 5111 5112 peer_device = conn_peer_device(connection, pi->vnr); 5113 if (!peer_device) 5114 return -EIO; 5115 device = peer_device->device; 5116 5117 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5118 5119 if (p->block_id == ID_SYNCER) { 5120 drbd_set_in_sync(device, sector, blksize); 5121 dec_rs_pending(device); 5122 return 0; 5123 } 5124 switch (pi->cmd) { 5125 case P_RS_WRITE_ACK: 5126 what = WRITE_ACKED_BY_PEER_AND_SIS; 5127 break; 5128 case P_WRITE_ACK: 5129 what = WRITE_ACKED_BY_PEER; 5130 break; 5131 case P_RECV_ACK: 5132 what = RECV_ACKED_BY_PEER; 5133 break; 5134 case P_SUPERSEDED: 5135 what = CONFLICT_RESOLVED; 5136 break; 5137 case P_RETRY_WRITE: 5138 what = POSTPONE_WRITE; 5139 break; 5140 default: 5141 BUG(); 5142 } 5143 5144 return validate_req_change_req_state(device, p->block_id, sector, 5145 &device->write_requests, __func__, 5146 what, false); 5147 } 5148 5149 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi) 5150 { 5151 struct drbd_peer_device *peer_device; 5152 struct drbd_device *device; 5153 struct p_block_ack *p = pi->data; 5154 sector_t sector = be64_to_cpu(p->sector); 5155 int size = be32_to_cpu(p->blksize); 5156 int err; 5157 5158 peer_device = conn_peer_device(connection, pi->vnr); 5159 if (!peer_device) 5160 return -EIO; 5161 device = peer_device->device; 5162 5163 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5164 5165 if (p->block_id == ID_SYNCER) { 5166 dec_rs_pending(device); 5167 drbd_rs_failed_io(device, sector, size); 5168 return 0; 5169 } 5170 5171 err = validate_req_change_req_state(device, p->block_id, sector, 5172 &device->write_requests, __func__, 5173 NEG_ACKED, true); 5174 if (err) { 5175 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs. 5176 The master bio might already be completed, therefore the 5177 request is no longer in the collision hash. */ 5178 /* In Protocol B we might already have got a P_RECV_ACK 5179 but then get a P_NEG_ACK afterwards. */ 5180 drbd_set_out_of_sync(device, sector, size); 5181 } 5182 return 0; 5183 } 5184 5185 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi) 5186 { 5187 struct drbd_peer_device *peer_device; 5188 struct drbd_device *device; 5189 struct p_block_ack *p = pi->data; 5190 sector_t sector = be64_to_cpu(p->sector); 5191 5192 peer_device = conn_peer_device(connection, pi->vnr); 5193 if (!peer_device) 5194 return -EIO; 5195 device = peer_device->device; 5196 5197 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5198 5199 drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n", 5200 (unsigned long long)sector, be32_to_cpu(p->blksize)); 5201 5202 return validate_req_change_req_state(device, p->block_id, sector, 5203 &device->read_requests, __func__, 5204 NEG_ACKED, false); 5205 } 5206 5207 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi) 5208 { 5209 struct drbd_peer_device *peer_device; 5210 struct drbd_device *device; 5211 sector_t sector; 5212 int size; 5213 struct p_block_ack *p = pi->data; 5214 5215 peer_device = conn_peer_device(connection, pi->vnr); 5216 if (!peer_device) 5217 return -EIO; 5218 device = peer_device->device; 5219 5220 sector = be64_to_cpu(p->sector); 5221 size = be32_to_cpu(p->blksize); 5222 5223 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5224 5225 dec_rs_pending(device); 5226 5227 if (get_ldev_if_state(device, D_FAILED)) { 5228 drbd_rs_complete_io(device, sector); 5229 switch (pi->cmd) { 5230 case P_NEG_RS_DREPLY: 5231 drbd_rs_failed_io(device, sector, size); 5232 case P_RS_CANCEL: 5233 break; 5234 default: 5235 BUG(); 5236 } 5237 put_ldev(device); 5238 } 5239 5240 return 0; 5241 } 5242 5243 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi) 5244 { 5245 struct p_barrier_ack *p = pi->data; 5246 struct drbd_peer_device *peer_device; 5247 int vnr; 5248 5249 tl_release(connection, p->barrier, be32_to_cpu(p->set_size)); 5250 5251 rcu_read_lock(); 5252 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 5253 struct drbd_device *device = peer_device->device; 5254 5255 if (device->state.conn == C_AHEAD && 5256 atomic_read(&device->ap_in_flight) == 0 && 5257 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) { 5258 device->start_resync_timer.expires = jiffies + HZ; 5259 add_timer(&device->start_resync_timer); 5260 } 5261 } 5262 rcu_read_unlock(); 5263 5264 return 0; 5265 } 5266 5267 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi) 5268 { 5269 struct drbd_peer_device *peer_device; 5270 struct drbd_device *device; 5271 struct p_block_ack *p = pi->data; 5272 struct drbd_device_work *dw; 5273 sector_t sector; 5274 int size; 5275 5276 peer_device = conn_peer_device(connection, pi->vnr); 5277 if (!peer_device) 5278 return -EIO; 5279 device = peer_device->device; 5280 5281 sector = be64_to_cpu(p->sector); 5282 size = be32_to_cpu(p->blksize); 5283 5284 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5285 5286 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC) 5287 drbd_ov_out_of_sync_found(device, sector, size); 5288 else 5289 ov_out_of_sync_print(device); 5290 5291 if (!get_ldev(device)) 5292 return 0; 5293 5294 drbd_rs_complete_io(device, sector); 5295 dec_rs_pending(device); 5296 5297 --device->ov_left; 5298 5299 /* let's advance progress step marks only for every other megabyte */ 5300 if ((device->ov_left & 0x200) == 0x200) 5301 drbd_advance_rs_marks(device, device->ov_left); 5302 5303 if (device->ov_left == 0) { 5304 dw = kmalloc(sizeof(*dw), GFP_NOIO); 5305 if (dw) { 5306 dw->w.cb = w_ov_finished; 5307 dw->device = device; 5308 drbd_queue_work(&peer_device->connection->sender_work, &dw->w); 5309 } else { 5310 drbd_err(device, "kmalloc(dw) failed."); 5311 ov_out_of_sync_print(device); 5312 drbd_resync_finished(device); 5313 } 5314 } 5315 put_ldev(device); 5316 return 0; 5317 } 5318 5319 static int got_skip(struct drbd_connection *connection, struct packet_info *pi) 5320 { 5321 return 0; 5322 } 5323 5324 static int connection_finish_peer_reqs(struct drbd_connection *connection) 5325 { 5326 struct drbd_peer_device *peer_device; 5327 int vnr, not_empty = 0; 5328 5329 do { 5330 clear_bit(SIGNAL_ASENDER, &connection->flags); 5331 flush_signals(current); 5332 5333 rcu_read_lock(); 5334 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 5335 struct drbd_device *device = peer_device->device; 5336 kref_get(&device->kref); 5337 rcu_read_unlock(); 5338 if (drbd_finish_peer_reqs(device)) { 5339 kref_put(&device->kref, drbd_destroy_device); 5340 return 1; 5341 } 5342 kref_put(&device->kref, drbd_destroy_device); 5343 rcu_read_lock(); 5344 } 5345 set_bit(SIGNAL_ASENDER, &connection->flags); 5346 5347 spin_lock_irq(&connection->resource->req_lock); 5348 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 5349 struct drbd_device *device = peer_device->device; 5350 not_empty = !list_empty(&device->done_ee); 5351 if (not_empty) 5352 break; 5353 } 5354 spin_unlock_irq(&connection->resource->req_lock); 5355 rcu_read_unlock(); 5356 } while (not_empty); 5357 5358 return 0; 5359 } 5360 5361 struct asender_cmd { 5362 size_t pkt_size; 5363 int (*fn)(struct drbd_connection *connection, struct packet_info *); 5364 }; 5365 5366 static struct asender_cmd asender_tbl[] = { 5367 [P_PING] = { 0, got_Ping }, 5368 [P_PING_ACK] = { 0, got_PingAck }, 5369 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5370 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5371 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5372 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck }, 5373 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck }, 5374 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply }, 5375 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply }, 5376 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult }, 5377 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck }, 5378 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply }, 5379 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync }, 5380 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip }, 5381 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply }, 5382 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply }, 5383 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck }, 5384 }; 5385 5386 int drbd_asender(struct drbd_thread *thi) 5387 { 5388 struct drbd_connection *connection = thi->connection; 5389 struct asender_cmd *cmd = NULL; 5390 struct packet_info pi; 5391 int rv; 5392 void *buf = connection->meta.rbuf; 5393 int received = 0; 5394 unsigned int header_size = drbd_header_size(connection); 5395 int expect = header_size; 5396 bool ping_timeout_active = false; 5397 struct net_conf *nc; 5398 int ping_timeo, tcp_cork, ping_int; 5399 struct sched_param param = { .sched_priority = 2 }; 5400 5401 rv = sched_setscheduler(current, SCHED_RR, ¶m); 5402 if (rv < 0) 5403 drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv); 5404 5405 while (get_t_state(thi) == RUNNING) { 5406 drbd_thread_current_set_cpu(thi); 5407 5408 rcu_read_lock(); 5409 nc = rcu_dereference(connection->net_conf); 5410 ping_timeo = nc->ping_timeo; 5411 tcp_cork = nc->tcp_cork; 5412 ping_int = nc->ping_int; 5413 rcu_read_unlock(); 5414 5415 if (test_and_clear_bit(SEND_PING, &connection->flags)) { 5416 if (drbd_send_ping(connection)) { 5417 drbd_err(connection, "drbd_send_ping has failed\n"); 5418 goto reconnect; 5419 } 5420 connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10; 5421 ping_timeout_active = true; 5422 } 5423 5424 /* TODO: conditionally cork; it may hurt latency if we cork without 5425 much to send */ 5426 if (tcp_cork) 5427 drbd_tcp_cork(connection->meta.socket); 5428 if (connection_finish_peer_reqs(connection)) { 5429 drbd_err(connection, "connection_finish_peer_reqs() failed\n"); 5430 goto reconnect; 5431 } 5432 /* but unconditionally uncork unless disabled */ 5433 if (tcp_cork) 5434 drbd_tcp_uncork(connection->meta.socket); 5435 5436 /* short circuit, recv_msg would return EINTR anyways. */ 5437 if (signal_pending(current)) 5438 continue; 5439 5440 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0); 5441 clear_bit(SIGNAL_ASENDER, &connection->flags); 5442 5443 flush_signals(current); 5444 5445 /* Note: 5446 * -EINTR (on meta) we got a signal 5447 * -EAGAIN (on meta) rcvtimeo expired 5448 * -ECONNRESET other side closed the connection 5449 * -ERESTARTSYS (on data) we got a signal 5450 * rv < 0 other than above: unexpected error! 5451 * rv == expected: full header or command 5452 * rv < expected: "woken" by signal during receive 5453 * rv == 0 : "connection shut down by peer" 5454 */ 5455 if (likely(rv > 0)) { 5456 received += rv; 5457 buf += rv; 5458 } else if (rv == 0) { 5459 if (test_bit(DISCONNECT_SENT, &connection->flags)) { 5460 long t; 5461 rcu_read_lock(); 5462 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10; 5463 rcu_read_unlock(); 5464 5465 t = wait_event_timeout(connection->ping_wait, 5466 connection->cstate < C_WF_REPORT_PARAMS, 5467 t); 5468 if (t) 5469 break; 5470 } 5471 drbd_err(connection, "meta connection shut down by peer.\n"); 5472 goto reconnect; 5473 } else if (rv == -EAGAIN) { 5474 /* If the data socket received something meanwhile, 5475 * that is good enough: peer is still alive. */ 5476 if (time_after(connection->last_received, 5477 jiffies - connection->meta.socket->sk->sk_rcvtimeo)) 5478 continue; 5479 if (ping_timeout_active) { 5480 drbd_err(connection, "PingAck did not arrive in time.\n"); 5481 goto reconnect; 5482 } 5483 set_bit(SEND_PING, &connection->flags); 5484 continue; 5485 } else if (rv == -EINTR) { 5486 continue; 5487 } else { 5488 drbd_err(connection, "sock_recvmsg returned %d\n", rv); 5489 goto reconnect; 5490 } 5491 5492 if (received == expect && cmd == NULL) { 5493 if (decode_header(connection, connection->meta.rbuf, &pi)) 5494 goto reconnect; 5495 cmd = &asender_tbl[pi.cmd]; 5496 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) { 5497 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n", 5498 cmdname(pi.cmd), pi.cmd); 5499 goto disconnect; 5500 } 5501 expect = header_size + cmd->pkt_size; 5502 if (pi.size != expect - header_size) { 5503 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n", 5504 pi.cmd, pi.size); 5505 goto reconnect; 5506 } 5507 } 5508 if (received == expect) { 5509 bool err; 5510 5511 err = cmd->fn(connection, &pi); 5512 if (err) { 5513 drbd_err(connection, "%pf failed\n", cmd->fn); 5514 goto reconnect; 5515 } 5516 5517 connection->last_received = jiffies; 5518 5519 if (cmd == &asender_tbl[P_PING_ACK]) { 5520 /* restore idle timeout */ 5521 connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ; 5522 ping_timeout_active = false; 5523 } 5524 5525 buf = connection->meta.rbuf; 5526 received = 0; 5527 expect = header_size; 5528 cmd = NULL; 5529 } 5530 } 5531 5532 if (0) { 5533 reconnect: 5534 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 5535 conn_md_sync(connection); 5536 } 5537 if (0) { 5538 disconnect: 5539 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 5540 } 5541 clear_bit(SIGNAL_ASENDER, &connection->flags); 5542 5543 drbd_info(connection, "asender terminated\n"); 5544 5545 return 0; 5546 } 5547