1 /* 2 drbd_receiver.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 */ 24 25 26 #include <linux/module.h> 27 28 #include <asm/uaccess.h> 29 #include <net/sock.h> 30 31 #include <linux/drbd.h> 32 #include <linux/fs.h> 33 #include <linux/file.h> 34 #include <linux/in.h> 35 #include <linux/mm.h> 36 #include <linux/memcontrol.h> 37 #include <linux/mm_inline.h> 38 #include <linux/slab.h> 39 #include <linux/pkt_sched.h> 40 #define __KERNEL_SYSCALLS__ 41 #include <linux/unistd.h> 42 #include <linux/vmalloc.h> 43 #include <linux/random.h> 44 #include <linux/string.h> 45 #include <linux/scatterlist.h> 46 #include "drbd_int.h" 47 #include "drbd_protocol.h" 48 #include "drbd_req.h" 49 #include "drbd_vli.h" 50 51 #define PRO_FEATURES (FF_TRIM) 52 53 struct packet_info { 54 enum drbd_packet cmd; 55 unsigned int size; 56 unsigned int vnr; 57 void *data; 58 }; 59 60 enum finish_epoch { 61 FE_STILL_LIVE, 62 FE_DESTROYED, 63 FE_RECYCLED, 64 }; 65 66 static int drbd_do_features(struct drbd_connection *connection); 67 static int drbd_do_auth(struct drbd_connection *connection); 68 static int drbd_disconnected(struct drbd_peer_device *); 69 static void conn_wait_active_ee_empty(struct drbd_connection *connection); 70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event); 71 static int e_end_block(struct drbd_work *, int); 72 73 74 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 75 76 /* 77 * some helper functions to deal with single linked page lists, 78 * page->private being our "next" pointer. 79 */ 80 81 /* If at least n pages are linked at head, get n pages off. 82 * Otherwise, don't modify head, and return NULL. 83 * Locking is the responsibility of the caller. 84 */ 85 static struct page *page_chain_del(struct page **head, int n) 86 { 87 struct page *page; 88 struct page *tmp; 89 90 BUG_ON(!n); 91 BUG_ON(!head); 92 93 page = *head; 94 95 if (!page) 96 return NULL; 97 98 while (page) { 99 tmp = page_chain_next(page); 100 if (--n == 0) 101 break; /* found sufficient pages */ 102 if (tmp == NULL) 103 /* insufficient pages, don't use any of them. */ 104 return NULL; 105 page = tmp; 106 } 107 108 /* add end of list marker for the returned list */ 109 set_page_private(page, 0); 110 /* actual return value, and adjustment of head */ 111 page = *head; 112 *head = tmp; 113 return page; 114 } 115 116 /* may be used outside of locks to find the tail of a (usually short) 117 * "private" page chain, before adding it back to a global chain head 118 * with page_chain_add() under a spinlock. */ 119 static struct page *page_chain_tail(struct page *page, int *len) 120 { 121 struct page *tmp; 122 int i = 1; 123 while ((tmp = page_chain_next(page))) 124 ++i, page = tmp; 125 if (len) 126 *len = i; 127 return page; 128 } 129 130 static int page_chain_free(struct page *page) 131 { 132 struct page *tmp; 133 int i = 0; 134 page_chain_for_each_safe(page, tmp) { 135 put_page(page); 136 ++i; 137 } 138 return i; 139 } 140 141 static void page_chain_add(struct page **head, 142 struct page *chain_first, struct page *chain_last) 143 { 144 #if 1 145 struct page *tmp; 146 tmp = page_chain_tail(chain_first, NULL); 147 BUG_ON(tmp != chain_last); 148 #endif 149 150 /* add chain to head */ 151 set_page_private(chain_last, (unsigned long)*head); 152 *head = chain_first; 153 } 154 155 static struct page *__drbd_alloc_pages(struct drbd_device *device, 156 unsigned int number) 157 { 158 struct page *page = NULL; 159 struct page *tmp = NULL; 160 unsigned int i = 0; 161 162 /* Yes, testing drbd_pp_vacant outside the lock is racy. 163 * So what. It saves a spin_lock. */ 164 if (drbd_pp_vacant >= number) { 165 spin_lock(&drbd_pp_lock); 166 page = page_chain_del(&drbd_pp_pool, number); 167 if (page) 168 drbd_pp_vacant -= number; 169 spin_unlock(&drbd_pp_lock); 170 if (page) 171 return page; 172 } 173 174 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD 175 * "criss-cross" setup, that might cause write-out on some other DRBD, 176 * which in turn might block on the other node at this very place. */ 177 for (i = 0; i < number; i++) { 178 tmp = alloc_page(GFP_TRY); 179 if (!tmp) 180 break; 181 set_page_private(tmp, (unsigned long)page); 182 page = tmp; 183 } 184 185 if (i == number) 186 return page; 187 188 /* Not enough pages immediately available this time. 189 * No need to jump around here, drbd_alloc_pages will retry this 190 * function "soon". */ 191 if (page) { 192 tmp = page_chain_tail(page, NULL); 193 spin_lock(&drbd_pp_lock); 194 page_chain_add(&drbd_pp_pool, page, tmp); 195 drbd_pp_vacant += i; 196 spin_unlock(&drbd_pp_lock); 197 } 198 return NULL; 199 } 200 201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device, 202 struct list_head *to_be_freed) 203 { 204 struct drbd_peer_request *peer_req, *tmp; 205 206 /* The EEs are always appended to the end of the list. Since 207 they are sent in order over the wire, they have to finish 208 in order. As soon as we see the first not finished we can 209 stop to examine the list... */ 210 211 list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) { 212 if (drbd_peer_req_has_active_page(peer_req)) 213 break; 214 list_move(&peer_req->w.list, to_be_freed); 215 } 216 } 217 218 static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device) 219 { 220 LIST_HEAD(reclaimed); 221 struct drbd_peer_request *peer_req, *t; 222 223 spin_lock_irq(&device->resource->req_lock); 224 reclaim_finished_net_peer_reqs(device, &reclaimed); 225 spin_unlock_irq(&device->resource->req_lock); 226 227 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list) 228 drbd_free_net_peer_req(device, peer_req); 229 } 230 231 /** 232 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled) 233 * @device: DRBD device. 234 * @number: number of pages requested 235 * @retry: whether to retry, if not enough pages are available right now 236 * 237 * Tries to allocate number pages, first from our own page pool, then from 238 * the kernel. 239 * Possibly retry until DRBD frees sufficient pages somewhere else. 240 * 241 * If this allocation would exceed the max_buffers setting, we throttle 242 * allocation (schedule_timeout) to give the system some room to breathe. 243 * 244 * We do not use max-buffers as hard limit, because it could lead to 245 * congestion and further to a distributed deadlock during online-verify or 246 * (checksum based) resync, if the max-buffers, socket buffer sizes and 247 * resync-rate settings are mis-configured. 248 * 249 * Returns a page chain linked via page->private. 250 */ 251 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number, 252 bool retry) 253 { 254 struct drbd_device *device = peer_device->device; 255 struct page *page = NULL; 256 struct net_conf *nc; 257 DEFINE_WAIT(wait); 258 unsigned int mxb; 259 260 rcu_read_lock(); 261 nc = rcu_dereference(peer_device->connection->net_conf); 262 mxb = nc ? nc->max_buffers : 1000000; 263 rcu_read_unlock(); 264 265 if (atomic_read(&device->pp_in_use) < mxb) 266 page = __drbd_alloc_pages(device, number); 267 268 while (page == NULL) { 269 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE); 270 271 drbd_kick_lo_and_reclaim_net(device); 272 273 if (atomic_read(&device->pp_in_use) < mxb) { 274 page = __drbd_alloc_pages(device, number); 275 if (page) 276 break; 277 } 278 279 if (!retry) 280 break; 281 282 if (signal_pending(current)) { 283 drbd_warn(device, "drbd_alloc_pages interrupted!\n"); 284 break; 285 } 286 287 if (schedule_timeout(HZ/10) == 0) 288 mxb = UINT_MAX; 289 } 290 finish_wait(&drbd_pp_wait, &wait); 291 292 if (page) 293 atomic_add(number, &device->pp_in_use); 294 return page; 295 } 296 297 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages. 298 * Is also used from inside an other spin_lock_irq(&resource->req_lock); 299 * Either links the page chain back to the global pool, 300 * or returns all pages to the system. */ 301 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net) 302 { 303 atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use; 304 int i; 305 306 if (page == NULL) 307 return; 308 309 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count) 310 i = page_chain_free(page); 311 else { 312 struct page *tmp; 313 tmp = page_chain_tail(page, &i); 314 spin_lock(&drbd_pp_lock); 315 page_chain_add(&drbd_pp_pool, page, tmp); 316 drbd_pp_vacant += i; 317 spin_unlock(&drbd_pp_lock); 318 } 319 i = atomic_sub_return(i, a); 320 if (i < 0) 321 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n", 322 is_net ? "pp_in_use_by_net" : "pp_in_use", i); 323 wake_up(&drbd_pp_wait); 324 } 325 326 /* 327 You need to hold the req_lock: 328 _drbd_wait_ee_list_empty() 329 330 You must not have the req_lock: 331 drbd_free_peer_req() 332 drbd_alloc_peer_req() 333 drbd_free_peer_reqs() 334 drbd_ee_fix_bhs() 335 drbd_finish_peer_reqs() 336 drbd_clear_done_ee() 337 drbd_wait_ee_list_empty() 338 */ 339 340 struct drbd_peer_request * 341 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector, 342 unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local) 343 { 344 struct drbd_device *device = peer_device->device; 345 struct drbd_peer_request *peer_req; 346 struct page *page = NULL; 347 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT; 348 349 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE)) 350 return NULL; 351 352 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM); 353 if (!peer_req) { 354 if (!(gfp_mask & __GFP_NOWARN)) 355 drbd_err(device, "%s: allocation failed\n", __func__); 356 return NULL; 357 } 358 359 if (has_payload && data_size) { 360 page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT)); 361 if (!page) 362 goto fail; 363 } 364 365 drbd_clear_interval(&peer_req->i); 366 peer_req->i.size = data_size; 367 peer_req->i.sector = sector; 368 peer_req->i.local = false; 369 peer_req->i.waiting = false; 370 371 peer_req->epoch = NULL; 372 peer_req->peer_device = peer_device; 373 peer_req->pages = page; 374 atomic_set(&peer_req->pending_bios, 0); 375 peer_req->flags = 0; 376 /* 377 * The block_id is opaque to the receiver. It is not endianness 378 * converted, and sent back to the sender unchanged. 379 */ 380 peer_req->block_id = id; 381 382 return peer_req; 383 384 fail: 385 mempool_free(peer_req, drbd_ee_mempool); 386 return NULL; 387 } 388 389 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req, 390 int is_net) 391 { 392 if (peer_req->flags & EE_HAS_DIGEST) 393 kfree(peer_req->digest); 394 drbd_free_pages(device, peer_req->pages, is_net); 395 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0); 396 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 397 mempool_free(peer_req, drbd_ee_mempool); 398 } 399 400 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list) 401 { 402 LIST_HEAD(work_list); 403 struct drbd_peer_request *peer_req, *t; 404 int count = 0; 405 int is_net = list == &device->net_ee; 406 407 spin_lock_irq(&device->resource->req_lock); 408 list_splice_init(list, &work_list); 409 spin_unlock_irq(&device->resource->req_lock); 410 411 list_for_each_entry_safe(peer_req, t, &work_list, w.list) { 412 __drbd_free_peer_req(device, peer_req, is_net); 413 count++; 414 } 415 return count; 416 } 417 418 /* 419 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier. 420 */ 421 static int drbd_finish_peer_reqs(struct drbd_device *device) 422 { 423 LIST_HEAD(work_list); 424 LIST_HEAD(reclaimed); 425 struct drbd_peer_request *peer_req, *t; 426 int err = 0; 427 428 spin_lock_irq(&device->resource->req_lock); 429 reclaim_finished_net_peer_reqs(device, &reclaimed); 430 list_splice_init(&device->done_ee, &work_list); 431 spin_unlock_irq(&device->resource->req_lock); 432 433 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list) 434 drbd_free_net_peer_req(device, peer_req); 435 436 /* possible callbacks here: 437 * e_end_block, and e_end_resync_block, e_send_superseded. 438 * all ignore the last argument. 439 */ 440 list_for_each_entry_safe(peer_req, t, &work_list, w.list) { 441 int err2; 442 443 /* list_del not necessary, next/prev members not touched */ 444 err2 = peer_req->w.cb(&peer_req->w, !!err); 445 if (!err) 446 err = err2; 447 drbd_free_peer_req(device, peer_req); 448 } 449 wake_up(&device->ee_wait); 450 451 return err; 452 } 453 454 static void _drbd_wait_ee_list_empty(struct drbd_device *device, 455 struct list_head *head) 456 { 457 DEFINE_WAIT(wait); 458 459 /* avoids spin_lock/unlock 460 * and calling prepare_to_wait in the fast path */ 461 while (!list_empty(head)) { 462 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE); 463 spin_unlock_irq(&device->resource->req_lock); 464 io_schedule(); 465 finish_wait(&device->ee_wait, &wait); 466 spin_lock_irq(&device->resource->req_lock); 467 } 468 } 469 470 static void drbd_wait_ee_list_empty(struct drbd_device *device, 471 struct list_head *head) 472 { 473 spin_lock_irq(&device->resource->req_lock); 474 _drbd_wait_ee_list_empty(device, head); 475 spin_unlock_irq(&device->resource->req_lock); 476 } 477 478 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags) 479 { 480 struct kvec iov = { 481 .iov_base = buf, 482 .iov_len = size, 483 }; 484 struct msghdr msg = { 485 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL) 486 }; 487 return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags); 488 } 489 490 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size) 491 { 492 int rv; 493 494 rv = drbd_recv_short(connection->data.socket, buf, size, 0); 495 496 if (rv < 0) { 497 if (rv == -ECONNRESET) 498 drbd_info(connection, "sock was reset by peer\n"); 499 else if (rv != -ERESTARTSYS) 500 drbd_err(connection, "sock_recvmsg returned %d\n", rv); 501 } else if (rv == 0) { 502 if (test_bit(DISCONNECT_SENT, &connection->flags)) { 503 long t; 504 rcu_read_lock(); 505 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10; 506 rcu_read_unlock(); 507 508 t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t); 509 510 if (t) 511 goto out; 512 } 513 drbd_info(connection, "sock was shut down by peer\n"); 514 } 515 516 if (rv != size) 517 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD); 518 519 out: 520 return rv; 521 } 522 523 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size) 524 { 525 int err; 526 527 err = drbd_recv(connection, buf, size); 528 if (err != size) { 529 if (err >= 0) 530 err = -EIO; 531 } else 532 err = 0; 533 return err; 534 } 535 536 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size) 537 { 538 int err; 539 540 err = drbd_recv_all(connection, buf, size); 541 if (err && !signal_pending(current)) 542 drbd_warn(connection, "short read (expected size %d)\n", (int)size); 543 return err; 544 } 545 546 /* quoting tcp(7): 547 * On individual connections, the socket buffer size must be set prior to the 548 * listen(2) or connect(2) calls in order to have it take effect. 549 * This is our wrapper to do so. 550 */ 551 static void drbd_setbufsize(struct socket *sock, unsigned int snd, 552 unsigned int rcv) 553 { 554 /* open coded SO_SNDBUF, SO_RCVBUF */ 555 if (snd) { 556 sock->sk->sk_sndbuf = snd; 557 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK; 558 } 559 if (rcv) { 560 sock->sk->sk_rcvbuf = rcv; 561 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK; 562 } 563 } 564 565 static struct socket *drbd_try_connect(struct drbd_connection *connection) 566 { 567 const char *what; 568 struct socket *sock; 569 struct sockaddr_in6 src_in6; 570 struct sockaddr_in6 peer_in6; 571 struct net_conf *nc; 572 int err, peer_addr_len, my_addr_len; 573 int sndbuf_size, rcvbuf_size, connect_int; 574 int disconnect_on_error = 1; 575 576 rcu_read_lock(); 577 nc = rcu_dereference(connection->net_conf); 578 if (!nc) { 579 rcu_read_unlock(); 580 return NULL; 581 } 582 sndbuf_size = nc->sndbuf_size; 583 rcvbuf_size = nc->rcvbuf_size; 584 connect_int = nc->connect_int; 585 rcu_read_unlock(); 586 587 my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6)); 588 memcpy(&src_in6, &connection->my_addr, my_addr_len); 589 590 if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6) 591 src_in6.sin6_port = 0; 592 else 593 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */ 594 595 peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6)); 596 memcpy(&peer_in6, &connection->peer_addr, peer_addr_len); 597 598 what = "sock_create_kern"; 599 err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family, 600 SOCK_STREAM, IPPROTO_TCP, &sock); 601 if (err < 0) { 602 sock = NULL; 603 goto out; 604 } 605 606 sock->sk->sk_rcvtimeo = 607 sock->sk->sk_sndtimeo = connect_int * HZ; 608 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size); 609 610 /* explicitly bind to the configured IP as source IP 611 * for the outgoing connections. 612 * This is needed for multihomed hosts and to be 613 * able to use lo: interfaces for drbd. 614 * Make sure to use 0 as port number, so linux selects 615 * a free one dynamically. 616 */ 617 what = "bind before connect"; 618 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len); 619 if (err < 0) 620 goto out; 621 622 /* connect may fail, peer not yet available. 623 * stay C_WF_CONNECTION, don't go Disconnecting! */ 624 disconnect_on_error = 0; 625 what = "connect"; 626 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0); 627 628 out: 629 if (err < 0) { 630 if (sock) { 631 sock_release(sock); 632 sock = NULL; 633 } 634 switch (-err) { 635 /* timeout, busy, signal pending */ 636 case ETIMEDOUT: case EAGAIN: case EINPROGRESS: 637 case EINTR: case ERESTARTSYS: 638 /* peer not (yet) available, network problem */ 639 case ECONNREFUSED: case ENETUNREACH: 640 case EHOSTDOWN: case EHOSTUNREACH: 641 disconnect_on_error = 0; 642 break; 643 default: 644 drbd_err(connection, "%s failed, err = %d\n", what, err); 645 } 646 if (disconnect_on_error) 647 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 648 } 649 650 return sock; 651 } 652 653 struct accept_wait_data { 654 struct drbd_connection *connection; 655 struct socket *s_listen; 656 struct completion door_bell; 657 void (*original_sk_state_change)(struct sock *sk); 658 659 }; 660 661 static void drbd_incoming_connection(struct sock *sk) 662 { 663 struct accept_wait_data *ad = sk->sk_user_data; 664 void (*state_change)(struct sock *sk); 665 666 state_change = ad->original_sk_state_change; 667 if (sk->sk_state == TCP_ESTABLISHED) 668 complete(&ad->door_bell); 669 state_change(sk); 670 } 671 672 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad) 673 { 674 int err, sndbuf_size, rcvbuf_size, my_addr_len; 675 struct sockaddr_in6 my_addr; 676 struct socket *s_listen; 677 struct net_conf *nc; 678 const char *what; 679 680 rcu_read_lock(); 681 nc = rcu_dereference(connection->net_conf); 682 if (!nc) { 683 rcu_read_unlock(); 684 return -EIO; 685 } 686 sndbuf_size = nc->sndbuf_size; 687 rcvbuf_size = nc->rcvbuf_size; 688 rcu_read_unlock(); 689 690 my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6)); 691 memcpy(&my_addr, &connection->my_addr, my_addr_len); 692 693 what = "sock_create_kern"; 694 err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family, 695 SOCK_STREAM, IPPROTO_TCP, &s_listen); 696 if (err) { 697 s_listen = NULL; 698 goto out; 699 } 700 701 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 702 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size); 703 704 what = "bind before listen"; 705 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len); 706 if (err < 0) 707 goto out; 708 709 ad->s_listen = s_listen; 710 write_lock_bh(&s_listen->sk->sk_callback_lock); 711 ad->original_sk_state_change = s_listen->sk->sk_state_change; 712 s_listen->sk->sk_state_change = drbd_incoming_connection; 713 s_listen->sk->sk_user_data = ad; 714 write_unlock_bh(&s_listen->sk->sk_callback_lock); 715 716 what = "listen"; 717 err = s_listen->ops->listen(s_listen, 5); 718 if (err < 0) 719 goto out; 720 721 return 0; 722 out: 723 if (s_listen) 724 sock_release(s_listen); 725 if (err < 0) { 726 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) { 727 drbd_err(connection, "%s failed, err = %d\n", what, err); 728 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 729 } 730 } 731 732 return -EIO; 733 } 734 735 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad) 736 { 737 write_lock_bh(&sk->sk_callback_lock); 738 sk->sk_state_change = ad->original_sk_state_change; 739 sk->sk_user_data = NULL; 740 write_unlock_bh(&sk->sk_callback_lock); 741 } 742 743 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad) 744 { 745 int timeo, connect_int, err = 0; 746 struct socket *s_estab = NULL; 747 struct net_conf *nc; 748 749 rcu_read_lock(); 750 nc = rcu_dereference(connection->net_conf); 751 if (!nc) { 752 rcu_read_unlock(); 753 return NULL; 754 } 755 connect_int = nc->connect_int; 756 rcu_read_unlock(); 757 758 timeo = connect_int * HZ; 759 /* 28.5% random jitter */ 760 timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7; 761 762 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo); 763 if (err <= 0) 764 return NULL; 765 766 err = kernel_accept(ad->s_listen, &s_estab, 0); 767 if (err < 0) { 768 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) { 769 drbd_err(connection, "accept failed, err = %d\n", err); 770 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 771 } 772 } 773 774 if (s_estab) 775 unregister_state_change(s_estab->sk, ad); 776 777 return s_estab; 778 } 779 780 static int decode_header(struct drbd_connection *, void *, struct packet_info *); 781 782 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock, 783 enum drbd_packet cmd) 784 { 785 if (!conn_prepare_command(connection, sock)) 786 return -EIO; 787 return conn_send_command(connection, sock, cmd, 0, NULL, 0); 788 } 789 790 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock) 791 { 792 unsigned int header_size = drbd_header_size(connection); 793 struct packet_info pi; 794 int err; 795 796 err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0); 797 if (err != header_size) { 798 if (err >= 0) 799 err = -EIO; 800 return err; 801 } 802 err = decode_header(connection, connection->data.rbuf, &pi); 803 if (err) 804 return err; 805 return pi.cmd; 806 } 807 808 /** 809 * drbd_socket_okay() - Free the socket if its connection is not okay 810 * @sock: pointer to the pointer to the socket. 811 */ 812 static int drbd_socket_okay(struct socket **sock) 813 { 814 int rr; 815 char tb[4]; 816 817 if (!*sock) 818 return false; 819 820 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK); 821 822 if (rr > 0 || rr == -EAGAIN) { 823 return true; 824 } else { 825 sock_release(*sock); 826 *sock = NULL; 827 return false; 828 } 829 } 830 /* Gets called if a connection is established, or if a new minor gets created 831 in a connection */ 832 int drbd_connected(struct drbd_peer_device *peer_device) 833 { 834 struct drbd_device *device = peer_device->device; 835 int err; 836 837 atomic_set(&device->packet_seq, 0); 838 device->peer_seq = 0; 839 840 device->state_mutex = peer_device->connection->agreed_pro_version < 100 ? 841 &peer_device->connection->cstate_mutex : 842 &device->own_state_mutex; 843 844 err = drbd_send_sync_param(peer_device); 845 if (!err) 846 err = drbd_send_sizes(peer_device, 0, 0); 847 if (!err) 848 err = drbd_send_uuids(peer_device); 849 if (!err) 850 err = drbd_send_current_state(peer_device); 851 clear_bit(USE_DEGR_WFC_T, &device->flags); 852 clear_bit(RESIZE_PENDING, &device->flags); 853 atomic_set(&device->ap_in_flight, 0); 854 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */ 855 return err; 856 } 857 858 /* 859 * return values: 860 * 1 yes, we have a valid connection 861 * 0 oops, did not work out, please try again 862 * -1 peer talks different language, 863 * no point in trying again, please go standalone. 864 * -2 We do not have a network config... 865 */ 866 static int conn_connect(struct drbd_connection *connection) 867 { 868 struct drbd_socket sock, msock; 869 struct drbd_peer_device *peer_device; 870 struct net_conf *nc; 871 int vnr, timeout, h, ok; 872 bool discard_my_data; 873 enum drbd_state_rv rv; 874 struct accept_wait_data ad = { 875 .connection = connection, 876 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell), 877 }; 878 879 clear_bit(DISCONNECT_SENT, &connection->flags); 880 if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS) 881 return -2; 882 883 mutex_init(&sock.mutex); 884 sock.sbuf = connection->data.sbuf; 885 sock.rbuf = connection->data.rbuf; 886 sock.socket = NULL; 887 mutex_init(&msock.mutex); 888 msock.sbuf = connection->meta.sbuf; 889 msock.rbuf = connection->meta.rbuf; 890 msock.socket = NULL; 891 892 /* Assume that the peer only understands protocol 80 until we know better. */ 893 connection->agreed_pro_version = 80; 894 895 if (prepare_listen_socket(connection, &ad)) 896 return 0; 897 898 do { 899 struct socket *s; 900 901 s = drbd_try_connect(connection); 902 if (s) { 903 if (!sock.socket) { 904 sock.socket = s; 905 send_first_packet(connection, &sock, P_INITIAL_DATA); 906 } else if (!msock.socket) { 907 clear_bit(RESOLVE_CONFLICTS, &connection->flags); 908 msock.socket = s; 909 send_first_packet(connection, &msock, P_INITIAL_META); 910 } else { 911 drbd_err(connection, "Logic error in conn_connect()\n"); 912 goto out_release_sockets; 913 } 914 } 915 916 if (sock.socket && msock.socket) { 917 rcu_read_lock(); 918 nc = rcu_dereference(connection->net_conf); 919 timeout = nc->ping_timeo * HZ / 10; 920 rcu_read_unlock(); 921 schedule_timeout_interruptible(timeout); 922 ok = drbd_socket_okay(&sock.socket); 923 ok = drbd_socket_okay(&msock.socket) && ok; 924 if (ok) 925 break; 926 } 927 928 retry: 929 s = drbd_wait_for_connect(connection, &ad); 930 if (s) { 931 int fp = receive_first_packet(connection, s); 932 drbd_socket_okay(&sock.socket); 933 drbd_socket_okay(&msock.socket); 934 switch (fp) { 935 case P_INITIAL_DATA: 936 if (sock.socket) { 937 drbd_warn(connection, "initial packet S crossed\n"); 938 sock_release(sock.socket); 939 sock.socket = s; 940 goto randomize; 941 } 942 sock.socket = s; 943 break; 944 case P_INITIAL_META: 945 set_bit(RESOLVE_CONFLICTS, &connection->flags); 946 if (msock.socket) { 947 drbd_warn(connection, "initial packet M crossed\n"); 948 sock_release(msock.socket); 949 msock.socket = s; 950 goto randomize; 951 } 952 msock.socket = s; 953 break; 954 default: 955 drbd_warn(connection, "Error receiving initial packet\n"); 956 sock_release(s); 957 randomize: 958 if (prandom_u32() & 1) 959 goto retry; 960 } 961 } 962 963 if (connection->cstate <= C_DISCONNECTING) 964 goto out_release_sockets; 965 if (signal_pending(current)) { 966 flush_signals(current); 967 smp_rmb(); 968 if (get_t_state(&connection->receiver) == EXITING) 969 goto out_release_sockets; 970 } 971 972 ok = drbd_socket_okay(&sock.socket); 973 ok = drbd_socket_okay(&msock.socket) && ok; 974 } while (!ok); 975 976 if (ad.s_listen) 977 sock_release(ad.s_listen); 978 979 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 980 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 981 982 sock.socket->sk->sk_allocation = GFP_NOIO; 983 msock.socket->sk->sk_allocation = GFP_NOIO; 984 985 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK; 986 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE; 987 988 /* NOT YET ... 989 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10; 990 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; 991 * first set it to the P_CONNECTION_FEATURES timeout, 992 * which we set to 4x the configured ping_timeout. */ 993 rcu_read_lock(); 994 nc = rcu_dereference(connection->net_conf); 995 996 sock.socket->sk->sk_sndtimeo = 997 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10; 998 999 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ; 1000 timeout = nc->timeout * HZ / 10; 1001 discard_my_data = nc->discard_my_data; 1002 rcu_read_unlock(); 1003 1004 msock.socket->sk->sk_sndtimeo = timeout; 1005 1006 /* we don't want delays. 1007 * we use TCP_CORK where appropriate, though */ 1008 drbd_tcp_nodelay(sock.socket); 1009 drbd_tcp_nodelay(msock.socket); 1010 1011 connection->data.socket = sock.socket; 1012 connection->meta.socket = msock.socket; 1013 connection->last_received = jiffies; 1014 1015 h = drbd_do_features(connection); 1016 if (h <= 0) 1017 return h; 1018 1019 if (connection->cram_hmac_tfm) { 1020 /* drbd_request_state(device, NS(conn, WFAuth)); */ 1021 switch (drbd_do_auth(connection)) { 1022 case -1: 1023 drbd_err(connection, "Authentication of peer failed\n"); 1024 return -1; 1025 case 0: 1026 drbd_err(connection, "Authentication of peer failed, trying again.\n"); 1027 return 0; 1028 } 1029 } 1030 1031 connection->data.socket->sk->sk_sndtimeo = timeout; 1032 connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; 1033 1034 if (drbd_send_protocol(connection) == -EOPNOTSUPP) 1035 return -1; 1036 1037 /* Prevent a race between resync-handshake and 1038 * being promoted to Primary. 1039 * 1040 * Grab and release the state mutex, so we know that any current 1041 * drbd_set_role() is finished, and any incoming drbd_set_role 1042 * will see the STATE_SENT flag, and wait for it to be cleared. 1043 */ 1044 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) 1045 mutex_lock(peer_device->device->state_mutex); 1046 1047 set_bit(STATE_SENT, &connection->flags); 1048 1049 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) 1050 mutex_unlock(peer_device->device->state_mutex); 1051 1052 rcu_read_lock(); 1053 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1054 struct drbd_device *device = peer_device->device; 1055 kref_get(&device->kref); 1056 rcu_read_unlock(); 1057 1058 if (discard_my_data) 1059 set_bit(DISCARD_MY_DATA, &device->flags); 1060 else 1061 clear_bit(DISCARD_MY_DATA, &device->flags); 1062 1063 drbd_connected(peer_device); 1064 kref_put(&device->kref, drbd_destroy_device); 1065 rcu_read_lock(); 1066 } 1067 rcu_read_unlock(); 1068 1069 rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE); 1070 if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) { 1071 clear_bit(STATE_SENT, &connection->flags); 1072 return 0; 1073 } 1074 1075 drbd_thread_start(&connection->asender); 1076 1077 mutex_lock(&connection->resource->conf_update); 1078 /* The discard_my_data flag is a single-shot modifier to the next 1079 * connection attempt, the handshake of which is now well underway. 1080 * No need for rcu style copying of the whole struct 1081 * just to clear a single value. */ 1082 connection->net_conf->discard_my_data = 0; 1083 mutex_unlock(&connection->resource->conf_update); 1084 1085 return h; 1086 1087 out_release_sockets: 1088 if (ad.s_listen) 1089 sock_release(ad.s_listen); 1090 if (sock.socket) 1091 sock_release(sock.socket); 1092 if (msock.socket) 1093 sock_release(msock.socket); 1094 return -1; 1095 } 1096 1097 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi) 1098 { 1099 unsigned int header_size = drbd_header_size(connection); 1100 1101 if (header_size == sizeof(struct p_header100) && 1102 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) { 1103 struct p_header100 *h = header; 1104 if (h->pad != 0) { 1105 drbd_err(connection, "Header padding is not zero\n"); 1106 return -EINVAL; 1107 } 1108 pi->vnr = be16_to_cpu(h->volume); 1109 pi->cmd = be16_to_cpu(h->command); 1110 pi->size = be32_to_cpu(h->length); 1111 } else if (header_size == sizeof(struct p_header95) && 1112 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) { 1113 struct p_header95 *h = header; 1114 pi->cmd = be16_to_cpu(h->command); 1115 pi->size = be32_to_cpu(h->length); 1116 pi->vnr = 0; 1117 } else if (header_size == sizeof(struct p_header80) && 1118 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) { 1119 struct p_header80 *h = header; 1120 pi->cmd = be16_to_cpu(h->command); 1121 pi->size = be16_to_cpu(h->length); 1122 pi->vnr = 0; 1123 } else { 1124 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n", 1125 be32_to_cpu(*(__be32 *)header), 1126 connection->agreed_pro_version); 1127 return -EINVAL; 1128 } 1129 pi->data = header + header_size; 1130 return 0; 1131 } 1132 1133 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi) 1134 { 1135 void *buffer = connection->data.rbuf; 1136 int err; 1137 1138 err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection)); 1139 if (err) 1140 return err; 1141 1142 err = decode_header(connection, buffer, pi); 1143 connection->last_received = jiffies; 1144 1145 return err; 1146 } 1147 1148 static void drbd_flush(struct drbd_connection *connection) 1149 { 1150 int rv; 1151 struct drbd_peer_device *peer_device; 1152 int vnr; 1153 1154 if (connection->write_ordering >= WO_bdev_flush) { 1155 rcu_read_lock(); 1156 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1157 struct drbd_device *device = peer_device->device; 1158 1159 if (!get_ldev(device)) 1160 continue; 1161 kref_get(&device->kref); 1162 rcu_read_unlock(); 1163 1164 rv = blkdev_issue_flush(device->ldev->backing_bdev, 1165 GFP_NOIO, NULL); 1166 if (rv) { 1167 drbd_info(device, "local disk flush failed with status %d\n", rv); 1168 /* would rather check on EOPNOTSUPP, but that is not reliable. 1169 * don't try again for ANY return value != 0 1170 * if (rv == -EOPNOTSUPP) */ 1171 drbd_bump_write_ordering(connection, WO_drain_io); 1172 } 1173 put_ldev(device); 1174 kref_put(&device->kref, drbd_destroy_device); 1175 1176 rcu_read_lock(); 1177 if (rv) 1178 break; 1179 } 1180 rcu_read_unlock(); 1181 } 1182 } 1183 1184 /** 1185 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it. 1186 * @device: DRBD device. 1187 * @epoch: Epoch object. 1188 * @ev: Epoch event. 1189 */ 1190 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection, 1191 struct drbd_epoch *epoch, 1192 enum epoch_event ev) 1193 { 1194 int epoch_size; 1195 struct drbd_epoch *next_epoch; 1196 enum finish_epoch rv = FE_STILL_LIVE; 1197 1198 spin_lock(&connection->epoch_lock); 1199 do { 1200 next_epoch = NULL; 1201 1202 epoch_size = atomic_read(&epoch->epoch_size); 1203 1204 switch (ev & ~EV_CLEANUP) { 1205 case EV_PUT: 1206 atomic_dec(&epoch->active); 1207 break; 1208 case EV_GOT_BARRIER_NR: 1209 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags); 1210 break; 1211 case EV_BECAME_LAST: 1212 /* nothing to do*/ 1213 break; 1214 } 1215 1216 if (epoch_size != 0 && 1217 atomic_read(&epoch->active) == 0 && 1218 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) { 1219 if (!(ev & EV_CLEANUP)) { 1220 spin_unlock(&connection->epoch_lock); 1221 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size); 1222 spin_lock(&connection->epoch_lock); 1223 } 1224 #if 0 1225 /* FIXME: dec unacked on connection, once we have 1226 * something to count pending connection packets in. */ 1227 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) 1228 dec_unacked(epoch->connection); 1229 #endif 1230 1231 if (connection->current_epoch != epoch) { 1232 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list); 1233 list_del(&epoch->list); 1234 ev = EV_BECAME_LAST | (ev & EV_CLEANUP); 1235 connection->epochs--; 1236 kfree(epoch); 1237 1238 if (rv == FE_STILL_LIVE) 1239 rv = FE_DESTROYED; 1240 } else { 1241 epoch->flags = 0; 1242 atomic_set(&epoch->epoch_size, 0); 1243 /* atomic_set(&epoch->active, 0); is already zero */ 1244 if (rv == FE_STILL_LIVE) 1245 rv = FE_RECYCLED; 1246 } 1247 } 1248 1249 if (!next_epoch) 1250 break; 1251 1252 epoch = next_epoch; 1253 } while (1); 1254 1255 spin_unlock(&connection->epoch_lock); 1256 1257 return rv; 1258 } 1259 1260 /** 1261 * drbd_bump_write_ordering() - Fall back to an other write ordering method 1262 * @connection: DRBD connection. 1263 * @wo: Write ordering method to try. 1264 */ 1265 void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo) 1266 { 1267 struct disk_conf *dc; 1268 struct drbd_peer_device *peer_device; 1269 enum write_ordering_e pwo; 1270 int vnr; 1271 static char *write_ordering_str[] = { 1272 [WO_none] = "none", 1273 [WO_drain_io] = "drain", 1274 [WO_bdev_flush] = "flush", 1275 }; 1276 1277 pwo = connection->write_ordering; 1278 wo = min(pwo, wo); 1279 rcu_read_lock(); 1280 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1281 struct drbd_device *device = peer_device->device; 1282 1283 if (!get_ldev_if_state(device, D_ATTACHING)) 1284 continue; 1285 dc = rcu_dereference(device->ldev->disk_conf); 1286 1287 if (wo == WO_bdev_flush && !dc->disk_flushes) 1288 wo = WO_drain_io; 1289 if (wo == WO_drain_io && !dc->disk_drain) 1290 wo = WO_none; 1291 put_ldev(device); 1292 } 1293 rcu_read_unlock(); 1294 connection->write_ordering = wo; 1295 if (pwo != connection->write_ordering || wo == WO_bdev_flush) 1296 drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]); 1297 } 1298 1299 /** 1300 * drbd_submit_peer_request() 1301 * @device: DRBD device. 1302 * @peer_req: peer request 1303 * @rw: flag field, see bio->bi_rw 1304 * 1305 * May spread the pages to multiple bios, 1306 * depending on bio_add_page restrictions. 1307 * 1308 * Returns 0 if all bios have been submitted, 1309 * -ENOMEM if we could not allocate enough bios, 1310 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a 1311 * single page to an empty bio (which should never happen and likely indicates 1312 * that the lower level IO stack is in some way broken). This has been observed 1313 * on certain Xen deployments. 1314 */ 1315 /* TODO allocate from our own bio_set. */ 1316 int drbd_submit_peer_request(struct drbd_device *device, 1317 struct drbd_peer_request *peer_req, 1318 const unsigned rw, const int fault_type) 1319 { 1320 struct bio *bios = NULL; 1321 struct bio *bio; 1322 struct page *page = peer_req->pages; 1323 sector_t sector = peer_req->i.sector; 1324 unsigned ds = peer_req->i.size; 1325 unsigned n_bios = 0; 1326 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT; 1327 int err = -ENOMEM; 1328 1329 if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) { 1330 /* wait for all pending IO completions, before we start 1331 * zeroing things out. */ 1332 conn_wait_active_ee_empty(first_peer_device(device)->connection); 1333 if (blkdev_issue_zeroout(device->ldev->backing_bdev, 1334 sector, ds >> 9, GFP_NOIO)) 1335 peer_req->flags |= EE_WAS_ERROR; 1336 drbd_endio_write_sec_final(peer_req); 1337 return 0; 1338 } 1339 1340 /* Discards don't have any payload. 1341 * But the scsi layer still expects a bio_vec it can use internally, 1342 * see sd_setup_discard_cmnd() and blk_add_request_payload(). */ 1343 if (peer_req->flags & EE_IS_TRIM) 1344 nr_pages = 1; 1345 1346 /* In most cases, we will only need one bio. But in case the lower 1347 * level restrictions happen to be different at this offset on this 1348 * side than those of the sending peer, we may need to submit the 1349 * request in more than one bio. 1350 * 1351 * Plain bio_alloc is good enough here, this is no DRBD internally 1352 * generated bio, but a bio allocated on behalf of the peer. 1353 */ 1354 next_bio: 1355 bio = bio_alloc(GFP_NOIO, nr_pages); 1356 if (!bio) { 1357 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages); 1358 goto fail; 1359 } 1360 /* > peer_req->i.sector, unless this is the first bio */ 1361 bio->bi_iter.bi_sector = sector; 1362 bio->bi_bdev = device->ldev->backing_bdev; 1363 bio->bi_rw = rw; 1364 bio->bi_private = peer_req; 1365 bio->bi_end_io = drbd_peer_request_endio; 1366 1367 bio->bi_next = bios; 1368 bios = bio; 1369 ++n_bios; 1370 1371 if (rw & REQ_DISCARD) { 1372 bio->bi_iter.bi_size = ds; 1373 goto submit; 1374 } 1375 1376 page_chain_for_each(page) { 1377 unsigned len = min_t(unsigned, ds, PAGE_SIZE); 1378 if (!bio_add_page(bio, page, len, 0)) { 1379 /* A single page must always be possible! 1380 * But in case it fails anyways, 1381 * we deal with it, and complain (below). */ 1382 if (bio->bi_vcnt == 0) { 1383 drbd_err(device, 1384 "bio_add_page failed for len=%u, " 1385 "bi_vcnt=0 (bi_sector=%llu)\n", 1386 len, (uint64_t)bio->bi_iter.bi_sector); 1387 err = -ENOSPC; 1388 goto fail; 1389 } 1390 goto next_bio; 1391 } 1392 ds -= len; 1393 sector += len >> 9; 1394 --nr_pages; 1395 } 1396 D_ASSERT(device, ds == 0); 1397 submit: 1398 D_ASSERT(device, page == NULL); 1399 1400 atomic_set(&peer_req->pending_bios, n_bios); 1401 do { 1402 bio = bios; 1403 bios = bios->bi_next; 1404 bio->bi_next = NULL; 1405 1406 drbd_generic_make_request(device, fault_type, bio); 1407 } while (bios); 1408 return 0; 1409 1410 fail: 1411 while (bios) { 1412 bio = bios; 1413 bios = bios->bi_next; 1414 bio_put(bio); 1415 } 1416 return err; 1417 } 1418 1419 static void drbd_remove_epoch_entry_interval(struct drbd_device *device, 1420 struct drbd_peer_request *peer_req) 1421 { 1422 struct drbd_interval *i = &peer_req->i; 1423 1424 drbd_remove_interval(&device->write_requests, i); 1425 drbd_clear_interval(i); 1426 1427 /* Wake up any processes waiting for this peer request to complete. */ 1428 if (i->waiting) 1429 wake_up(&device->misc_wait); 1430 } 1431 1432 static void conn_wait_active_ee_empty(struct drbd_connection *connection) 1433 { 1434 struct drbd_peer_device *peer_device; 1435 int vnr; 1436 1437 rcu_read_lock(); 1438 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1439 struct drbd_device *device = peer_device->device; 1440 1441 kref_get(&device->kref); 1442 rcu_read_unlock(); 1443 drbd_wait_ee_list_empty(device, &device->active_ee); 1444 kref_put(&device->kref, drbd_destroy_device); 1445 rcu_read_lock(); 1446 } 1447 rcu_read_unlock(); 1448 } 1449 1450 static struct drbd_peer_device * 1451 conn_peer_device(struct drbd_connection *connection, int volume_number) 1452 { 1453 return idr_find(&connection->peer_devices, volume_number); 1454 } 1455 1456 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi) 1457 { 1458 int rv; 1459 struct p_barrier *p = pi->data; 1460 struct drbd_epoch *epoch; 1461 1462 /* FIXME these are unacked on connection, 1463 * not a specific (peer)device. 1464 */ 1465 connection->current_epoch->barrier_nr = p->barrier; 1466 connection->current_epoch->connection = connection; 1467 rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR); 1468 1469 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from 1470 * the activity log, which means it would not be resynced in case the 1471 * R_PRIMARY crashes now. 1472 * Therefore we must send the barrier_ack after the barrier request was 1473 * completed. */ 1474 switch (connection->write_ordering) { 1475 case WO_none: 1476 if (rv == FE_RECYCLED) 1477 return 0; 1478 1479 /* receiver context, in the writeout path of the other node. 1480 * avoid potential distributed deadlock */ 1481 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO); 1482 if (epoch) 1483 break; 1484 else 1485 drbd_warn(connection, "Allocation of an epoch failed, slowing down\n"); 1486 /* Fall through */ 1487 1488 case WO_bdev_flush: 1489 case WO_drain_io: 1490 conn_wait_active_ee_empty(connection); 1491 drbd_flush(connection); 1492 1493 if (atomic_read(&connection->current_epoch->epoch_size)) { 1494 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO); 1495 if (epoch) 1496 break; 1497 } 1498 1499 return 0; 1500 default: 1501 drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering); 1502 return -EIO; 1503 } 1504 1505 epoch->flags = 0; 1506 atomic_set(&epoch->epoch_size, 0); 1507 atomic_set(&epoch->active, 0); 1508 1509 spin_lock(&connection->epoch_lock); 1510 if (atomic_read(&connection->current_epoch->epoch_size)) { 1511 list_add(&epoch->list, &connection->current_epoch->list); 1512 connection->current_epoch = epoch; 1513 connection->epochs++; 1514 } else { 1515 /* The current_epoch got recycled while we allocated this one... */ 1516 kfree(epoch); 1517 } 1518 spin_unlock(&connection->epoch_lock); 1519 1520 return 0; 1521 } 1522 1523 /* used from receive_RSDataReply (recv_resync_read) 1524 * and from receive_Data */ 1525 static struct drbd_peer_request * 1526 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector, 1527 struct packet_info *pi) __must_hold(local) 1528 { 1529 struct drbd_device *device = peer_device->device; 1530 const sector_t capacity = drbd_get_capacity(device->this_bdev); 1531 struct drbd_peer_request *peer_req; 1532 struct page *page; 1533 int dgs, ds, err; 1534 int data_size = pi->size; 1535 void *dig_in = peer_device->connection->int_dig_in; 1536 void *dig_vv = peer_device->connection->int_dig_vv; 1537 unsigned long *data; 1538 struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL; 1539 1540 dgs = 0; 1541 if (!trim && peer_device->connection->peer_integrity_tfm) { 1542 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm); 1543 /* 1544 * FIXME: Receive the incoming digest into the receive buffer 1545 * here, together with its struct p_data? 1546 */ 1547 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs); 1548 if (err) 1549 return NULL; 1550 data_size -= dgs; 1551 } 1552 1553 if (trim) { 1554 D_ASSERT(peer_device, data_size == 0); 1555 data_size = be32_to_cpu(trim->size); 1556 } 1557 1558 if (!expect(IS_ALIGNED(data_size, 512))) 1559 return NULL; 1560 /* prepare for larger trim requests. */ 1561 if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE)) 1562 return NULL; 1563 1564 /* even though we trust out peer, 1565 * we sometimes have to double check. */ 1566 if (sector + (data_size>>9) > capacity) { 1567 drbd_err(device, "request from peer beyond end of local disk: " 1568 "capacity: %llus < sector: %llus + size: %u\n", 1569 (unsigned long long)capacity, 1570 (unsigned long long)sector, data_size); 1571 return NULL; 1572 } 1573 1574 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD 1575 * "criss-cross" setup, that might cause write-out on some other DRBD, 1576 * which in turn might block on the other node at this very place. */ 1577 peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO); 1578 if (!peer_req) 1579 return NULL; 1580 1581 if (trim) 1582 return peer_req; 1583 1584 ds = data_size; 1585 page = peer_req->pages; 1586 page_chain_for_each(page) { 1587 unsigned len = min_t(int, ds, PAGE_SIZE); 1588 data = kmap(page); 1589 err = drbd_recv_all_warn(peer_device->connection, data, len); 1590 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) { 1591 drbd_err(device, "Fault injection: Corrupting data on receive\n"); 1592 data[0] = data[0] ^ (unsigned long)-1; 1593 } 1594 kunmap(page); 1595 if (err) { 1596 drbd_free_peer_req(device, peer_req); 1597 return NULL; 1598 } 1599 ds -= len; 1600 } 1601 1602 if (dgs) { 1603 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv); 1604 if (memcmp(dig_in, dig_vv, dgs)) { 1605 drbd_err(device, "Digest integrity check FAILED: %llus +%u\n", 1606 (unsigned long long)sector, data_size); 1607 drbd_free_peer_req(device, peer_req); 1608 return NULL; 1609 } 1610 } 1611 device->recv_cnt += data_size>>9; 1612 return peer_req; 1613 } 1614 1615 /* drbd_drain_block() just takes a data block 1616 * out of the socket input buffer, and discards it. 1617 */ 1618 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size) 1619 { 1620 struct page *page; 1621 int err = 0; 1622 void *data; 1623 1624 if (!data_size) 1625 return 0; 1626 1627 page = drbd_alloc_pages(peer_device, 1, 1); 1628 1629 data = kmap(page); 1630 while (data_size) { 1631 unsigned int len = min_t(int, data_size, PAGE_SIZE); 1632 1633 err = drbd_recv_all_warn(peer_device->connection, data, len); 1634 if (err) 1635 break; 1636 data_size -= len; 1637 } 1638 kunmap(page); 1639 drbd_free_pages(peer_device->device, page, 0); 1640 return err; 1641 } 1642 1643 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req, 1644 sector_t sector, int data_size) 1645 { 1646 struct bio_vec bvec; 1647 struct bvec_iter iter; 1648 struct bio *bio; 1649 int dgs, err, expect; 1650 void *dig_in = peer_device->connection->int_dig_in; 1651 void *dig_vv = peer_device->connection->int_dig_vv; 1652 1653 dgs = 0; 1654 if (peer_device->connection->peer_integrity_tfm) { 1655 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm); 1656 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs); 1657 if (err) 1658 return err; 1659 data_size -= dgs; 1660 } 1661 1662 /* optimistically update recv_cnt. if receiving fails below, 1663 * we disconnect anyways, and counters will be reset. */ 1664 peer_device->device->recv_cnt += data_size>>9; 1665 1666 bio = req->master_bio; 1667 D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector); 1668 1669 bio_for_each_segment(bvec, bio, iter) { 1670 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset; 1671 expect = min_t(int, data_size, bvec.bv_len); 1672 err = drbd_recv_all_warn(peer_device->connection, mapped, expect); 1673 kunmap(bvec.bv_page); 1674 if (err) 1675 return err; 1676 data_size -= expect; 1677 } 1678 1679 if (dgs) { 1680 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv); 1681 if (memcmp(dig_in, dig_vv, dgs)) { 1682 drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n"); 1683 return -EINVAL; 1684 } 1685 } 1686 1687 D_ASSERT(peer_device->device, data_size == 0); 1688 return 0; 1689 } 1690 1691 /* 1692 * e_end_resync_block() is called in asender context via 1693 * drbd_finish_peer_reqs(). 1694 */ 1695 static int e_end_resync_block(struct drbd_work *w, int unused) 1696 { 1697 struct drbd_peer_request *peer_req = 1698 container_of(w, struct drbd_peer_request, w); 1699 struct drbd_peer_device *peer_device = peer_req->peer_device; 1700 struct drbd_device *device = peer_device->device; 1701 sector_t sector = peer_req->i.sector; 1702 int err; 1703 1704 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 1705 1706 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1707 drbd_set_in_sync(device, sector, peer_req->i.size); 1708 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req); 1709 } else { 1710 /* Record failure to sync */ 1711 drbd_rs_failed_io(device, sector, peer_req->i.size); 1712 1713 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req); 1714 } 1715 dec_unacked(device); 1716 1717 return err; 1718 } 1719 1720 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector, 1721 struct packet_info *pi) __releases(local) 1722 { 1723 struct drbd_device *device = peer_device->device; 1724 struct drbd_peer_request *peer_req; 1725 1726 peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi); 1727 if (!peer_req) 1728 goto fail; 1729 1730 dec_rs_pending(device); 1731 1732 inc_unacked(device); 1733 /* corresponding dec_unacked() in e_end_resync_block() 1734 * respective _drbd_clear_done_ee */ 1735 1736 peer_req->w.cb = e_end_resync_block; 1737 1738 spin_lock_irq(&device->resource->req_lock); 1739 list_add(&peer_req->w.list, &device->sync_ee); 1740 spin_unlock_irq(&device->resource->req_lock); 1741 1742 atomic_add(pi->size >> 9, &device->rs_sect_ev); 1743 if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0) 1744 return 0; 1745 1746 /* don't care for the reason here */ 1747 drbd_err(device, "submit failed, triggering re-connect\n"); 1748 spin_lock_irq(&device->resource->req_lock); 1749 list_del(&peer_req->w.list); 1750 spin_unlock_irq(&device->resource->req_lock); 1751 1752 drbd_free_peer_req(device, peer_req); 1753 fail: 1754 put_ldev(device); 1755 return -EIO; 1756 } 1757 1758 static struct drbd_request * 1759 find_request(struct drbd_device *device, struct rb_root *root, u64 id, 1760 sector_t sector, bool missing_ok, const char *func) 1761 { 1762 struct drbd_request *req; 1763 1764 /* Request object according to our peer */ 1765 req = (struct drbd_request *)(unsigned long)id; 1766 if (drbd_contains_interval(root, sector, &req->i) && req->i.local) 1767 return req; 1768 if (!missing_ok) { 1769 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func, 1770 (unsigned long)id, (unsigned long long)sector); 1771 } 1772 return NULL; 1773 } 1774 1775 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi) 1776 { 1777 struct drbd_peer_device *peer_device; 1778 struct drbd_device *device; 1779 struct drbd_request *req; 1780 sector_t sector; 1781 int err; 1782 struct p_data *p = pi->data; 1783 1784 peer_device = conn_peer_device(connection, pi->vnr); 1785 if (!peer_device) 1786 return -EIO; 1787 device = peer_device->device; 1788 1789 sector = be64_to_cpu(p->sector); 1790 1791 spin_lock_irq(&device->resource->req_lock); 1792 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__); 1793 spin_unlock_irq(&device->resource->req_lock); 1794 if (unlikely(!req)) 1795 return -EIO; 1796 1797 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid 1798 * special casing it there for the various failure cases. 1799 * still no race with drbd_fail_pending_reads */ 1800 err = recv_dless_read(peer_device, req, sector, pi->size); 1801 if (!err) 1802 req_mod(req, DATA_RECEIVED); 1803 /* else: nothing. handled from drbd_disconnect... 1804 * I don't think we may complete this just yet 1805 * in case we are "on-disconnect: freeze" */ 1806 1807 return err; 1808 } 1809 1810 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi) 1811 { 1812 struct drbd_peer_device *peer_device; 1813 struct drbd_device *device; 1814 sector_t sector; 1815 int err; 1816 struct p_data *p = pi->data; 1817 1818 peer_device = conn_peer_device(connection, pi->vnr); 1819 if (!peer_device) 1820 return -EIO; 1821 device = peer_device->device; 1822 1823 sector = be64_to_cpu(p->sector); 1824 D_ASSERT(device, p->block_id == ID_SYNCER); 1825 1826 if (get_ldev(device)) { 1827 /* data is submitted to disk within recv_resync_read. 1828 * corresponding put_ldev done below on error, 1829 * or in drbd_peer_request_endio. */ 1830 err = recv_resync_read(peer_device, sector, pi); 1831 } else { 1832 if (__ratelimit(&drbd_ratelimit_state)) 1833 drbd_err(device, "Can not write resync data to local disk.\n"); 1834 1835 err = drbd_drain_block(peer_device, pi->size); 1836 1837 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size); 1838 } 1839 1840 atomic_add(pi->size >> 9, &device->rs_sect_in); 1841 1842 return err; 1843 } 1844 1845 static void restart_conflicting_writes(struct drbd_device *device, 1846 sector_t sector, int size) 1847 { 1848 struct drbd_interval *i; 1849 struct drbd_request *req; 1850 1851 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 1852 if (!i->local) 1853 continue; 1854 req = container_of(i, struct drbd_request, i); 1855 if (req->rq_state & RQ_LOCAL_PENDING || 1856 !(req->rq_state & RQ_POSTPONED)) 1857 continue; 1858 /* as it is RQ_POSTPONED, this will cause it to 1859 * be queued on the retry workqueue. */ 1860 __req_mod(req, CONFLICT_RESOLVED, NULL); 1861 } 1862 } 1863 1864 /* 1865 * e_end_block() is called in asender context via drbd_finish_peer_reqs(). 1866 */ 1867 static int e_end_block(struct drbd_work *w, int cancel) 1868 { 1869 struct drbd_peer_request *peer_req = 1870 container_of(w, struct drbd_peer_request, w); 1871 struct drbd_peer_device *peer_device = peer_req->peer_device; 1872 struct drbd_device *device = peer_device->device; 1873 sector_t sector = peer_req->i.sector; 1874 int err = 0, pcmd; 1875 1876 if (peer_req->flags & EE_SEND_WRITE_ACK) { 1877 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1878 pcmd = (device->state.conn >= C_SYNC_SOURCE && 1879 device->state.conn <= C_PAUSED_SYNC_T && 1880 peer_req->flags & EE_MAY_SET_IN_SYNC) ? 1881 P_RS_WRITE_ACK : P_WRITE_ACK; 1882 err = drbd_send_ack(peer_device, pcmd, peer_req); 1883 if (pcmd == P_RS_WRITE_ACK) 1884 drbd_set_in_sync(device, sector, peer_req->i.size); 1885 } else { 1886 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req); 1887 /* we expect it to be marked out of sync anyways... 1888 * maybe assert this? */ 1889 } 1890 dec_unacked(device); 1891 } 1892 /* we delete from the conflict detection hash _after_ we sent out the 1893 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */ 1894 if (peer_req->flags & EE_IN_INTERVAL_TREE) { 1895 spin_lock_irq(&device->resource->req_lock); 1896 D_ASSERT(device, !drbd_interval_empty(&peer_req->i)); 1897 drbd_remove_epoch_entry_interval(device, peer_req); 1898 if (peer_req->flags & EE_RESTART_REQUESTS) 1899 restart_conflicting_writes(device, sector, peer_req->i.size); 1900 spin_unlock_irq(&device->resource->req_lock); 1901 } else 1902 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 1903 1904 drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0)); 1905 1906 return err; 1907 } 1908 1909 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack) 1910 { 1911 struct drbd_peer_request *peer_req = 1912 container_of(w, struct drbd_peer_request, w); 1913 struct drbd_peer_device *peer_device = peer_req->peer_device; 1914 int err; 1915 1916 err = drbd_send_ack(peer_device, ack, peer_req); 1917 dec_unacked(peer_device->device); 1918 1919 return err; 1920 } 1921 1922 static int e_send_superseded(struct drbd_work *w, int unused) 1923 { 1924 return e_send_ack(w, P_SUPERSEDED); 1925 } 1926 1927 static int e_send_retry_write(struct drbd_work *w, int unused) 1928 { 1929 struct drbd_peer_request *peer_req = 1930 container_of(w, struct drbd_peer_request, w); 1931 struct drbd_connection *connection = peer_req->peer_device->connection; 1932 1933 return e_send_ack(w, connection->agreed_pro_version >= 100 ? 1934 P_RETRY_WRITE : P_SUPERSEDED); 1935 } 1936 1937 static bool seq_greater(u32 a, u32 b) 1938 { 1939 /* 1940 * We assume 32-bit wrap-around here. 1941 * For 24-bit wrap-around, we would have to shift: 1942 * a <<= 8; b <<= 8; 1943 */ 1944 return (s32)a - (s32)b > 0; 1945 } 1946 1947 static u32 seq_max(u32 a, u32 b) 1948 { 1949 return seq_greater(a, b) ? a : b; 1950 } 1951 1952 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq) 1953 { 1954 struct drbd_device *device = peer_device->device; 1955 unsigned int newest_peer_seq; 1956 1957 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) { 1958 spin_lock(&device->peer_seq_lock); 1959 newest_peer_seq = seq_max(device->peer_seq, peer_seq); 1960 device->peer_seq = newest_peer_seq; 1961 spin_unlock(&device->peer_seq_lock); 1962 /* wake up only if we actually changed device->peer_seq */ 1963 if (peer_seq == newest_peer_seq) 1964 wake_up(&device->seq_wait); 1965 } 1966 } 1967 1968 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2) 1969 { 1970 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9))); 1971 } 1972 1973 /* maybe change sync_ee into interval trees as well? */ 1974 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req) 1975 { 1976 struct drbd_peer_request *rs_req; 1977 bool rv = 0; 1978 1979 spin_lock_irq(&device->resource->req_lock); 1980 list_for_each_entry(rs_req, &device->sync_ee, w.list) { 1981 if (overlaps(peer_req->i.sector, peer_req->i.size, 1982 rs_req->i.sector, rs_req->i.size)) { 1983 rv = 1; 1984 break; 1985 } 1986 } 1987 spin_unlock_irq(&device->resource->req_lock); 1988 1989 return rv; 1990 } 1991 1992 /* Called from receive_Data. 1993 * Synchronize packets on sock with packets on msock. 1994 * 1995 * This is here so even when a P_DATA packet traveling via sock overtook an Ack 1996 * packet traveling on msock, they are still processed in the order they have 1997 * been sent. 1998 * 1999 * Note: we don't care for Ack packets overtaking P_DATA packets. 2000 * 2001 * In case packet_seq is larger than device->peer_seq number, there are 2002 * outstanding packets on the msock. We wait for them to arrive. 2003 * In case we are the logically next packet, we update device->peer_seq 2004 * ourselves. Correctly handles 32bit wrap around. 2005 * 2006 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second, 2007 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds 2008 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have 2009 * 1<<9 == 512 seconds aka ages for the 32bit wrap around... 2010 * 2011 * returns 0 if we may process the packet, 2012 * -ERESTARTSYS if we were interrupted (by disconnect signal). */ 2013 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq) 2014 { 2015 struct drbd_device *device = peer_device->device; 2016 DEFINE_WAIT(wait); 2017 long timeout; 2018 int ret = 0, tp; 2019 2020 if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) 2021 return 0; 2022 2023 spin_lock(&device->peer_seq_lock); 2024 for (;;) { 2025 if (!seq_greater(peer_seq - 1, device->peer_seq)) { 2026 device->peer_seq = seq_max(device->peer_seq, peer_seq); 2027 break; 2028 } 2029 2030 if (signal_pending(current)) { 2031 ret = -ERESTARTSYS; 2032 break; 2033 } 2034 2035 rcu_read_lock(); 2036 tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries; 2037 rcu_read_unlock(); 2038 2039 if (!tp) 2040 break; 2041 2042 /* Only need to wait if two_primaries is enabled */ 2043 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE); 2044 spin_unlock(&device->peer_seq_lock); 2045 rcu_read_lock(); 2046 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10; 2047 rcu_read_unlock(); 2048 timeout = schedule_timeout(timeout); 2049 spin_lock(&device->peer_seq_lock); 2050 if (!timeout) { 2051 ret = -ETIMEDOUT; 2052 drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n"); 2053 break; 2054 } 2055 } 2056 spin_unlock(&device->peer_seq_lock); 2057 finish_wait(&device->seq_wait, &wait); 2058 return ret; 2059 } 2060 2061 /* see also bio_flags_to_wire() 2062 * DRBD_REQ_*, because we need to semantically map the flags to data packet 2063 * flags and back. We may replicate to other kernel versions. */ 2064 static unsigned long wire_flags_to_bio(u32 dpf) 2065 { 2066 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) | 2067 (dpf & DP_FUA ? REQ_FUA : 0) | 2068 (dpf & DP_FLUSH ? REQ_FLUSH : 0) | 2069 (dpf & DP_DISCARD ? REQ_DISCARD : 0); 2070 } 2071 2072 static void fail_postponed_requests(struct drbd_device *device, sector_t sector, 2073 unsigned int size) 2074 { 2075 struct drbd_interval *i; 2076 2077 repeat: 2078 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 2079 struct drbd_request *req; 2080 struct bio_and_error m; 2081 2082 if (!i->local) 2083 continue; 2084 req = container_of(i, struct drbd_request, i); 2085 if (!(req->rq_state & RQ_POSTPONED)) 2086 continue; 2087 req->rq_state &= ~RQ_POSTPONED; 2088 __req_mod(req, NEG_ACKED, &m); 2089 spin_unlock_irq(&device->resource->req_lock); 2090 if (m.bio) 2091 complete_master_bio(device, &m); 2092 spin_lock_irq(&device->resource->req_lock); 2093 goto repeat; 2094 } 2095 } 2096 2097 static int handle_write_conflicts(struct drbd_device *device, 2098 struct drbd_peer_request *peer_req) 2099 { 2100 struct drbd_connection *connection = peer_req->peer_device->connection; 2101 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags); 2102 sector_t sector = peer_req->i.sector; 2103 const unsigned int size = peer_req->i.size; 2104 struct drbd_interval *i; 2105 bool equal; 2106 int err; 2107 2108 /* 2109 * Inserting the peer request into the write_requests tree will prevent 2110 * new conflicting local requests from being added. 2111 */ 2112 drbd_insert_interval(&device->write_requests, &peer_req->i); 2113 2114 repeat: 2115 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 2116 if (i == &peer_req->i) 2117 continue; 2118 2119 if (!i->local) { 2120 /* 2121 * Our peer has sent a conflicting remote request; this 2122 * should not happen in a two-node setup. Wait for the 2123 * earlier peer request to complete. 2124 */ 2125 err = drbd_wait_misc(device, i); 2126 if (err) 2127 goto out; 2128 goto repeat; 2129 } 2130 2131 equal = i->sector == sector && i->size == size; 2132 if (resolve_conflicts) { 2133 /* 2134 * If the peer request is fully contained within the 2135 * overlapping request, it can be considered overwritten 2136 * and thus superseded; otherwise, it will be retried 2137 * once all overlapping requests have completed. 2138 */ 2139 bool superseded = i->sector <= sector && i->sector + 2140 (i->size >> 9) >= sector + (size >> 9); 2141 2142 if (!equal) 2143 drbd_alert(device, "Concurrent writes detected: " 2144 "local=%llus +%u, remote=%llus +%u, " 2145 "assuming %s came first\n", 2146 (unsigned long long)i->sector, i->size, 2147 (unsigned long long)sector, size, 2148 superseded ? "local" : "remote"); 2149 2150 inc_unacked(device); 2151 peer_req->w.cb = superseded ? e_send_superseded : 2152 e_send_retry_write; 2153 list_add_tail(&peer_req->w.list, &device->done_ee); 2154 wake_asender(connection); 2155 2156 err = -ENOENT; 2157 goto out; 2158 } else { 2159 struct drbd_request *req = 2160 container_of(i, struct drbd_request, i); 2161 2162 if (!equal) 2163 drbd_alert(device, "Concurrent writes detected: " 2164 "local=%llus +%u, remote=%llus +%u\n", 2165 (unsigned long long)i->sector, i->size, 2166 (unsigned long long)sector, size); 2167 2168 if (req->rq_state & RQ_LOCAL_PENDING || 2169 !(req->rq_state & RQ_POSTPONED)) { 2170 /* 2171 * Wait for the node with the discard flag to 2172 * decide if this request has been superseded 2173 * or needs to be retried. 2174 * Requests that have been superseded will 2175 * disappear from the write_requests tree. 2176 * 2177 * In addition, wait for the conflicting 2178 * request to finish locally before submitting 2179 * the conflicting peer request. 2180 */ 2181 err = drbd_wait_misc(device, &req->i); 2182 if (err) { 2183 _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD); 2184 fail_postponed_requests(device, sector, size); 2185 goto out; 2186 } 2187 goto repeat; 2188 } 2189 /* 2190 * Remember to restart the conflicting requests after 2191 * the new peer request has completed. 2192 */ 2193 peer_req->flags |= EE_RESTART_REQUESTS; 2194 } 2195 } 2196 err = 0; 2197 2198 out: 2199 if (err) 2200 drbd_remove_epoch_entry_interval(device, peer_req); 2201 return err; 2202 } 2203 2204 /* mirrored write */ 2205 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi) 2206 { 2207 struct drbd_peer_device *peer_device; 2208 struct drbd_device *device; 2209 sector_t sector; 2210 struct drbd_peer_request *peer_req; 2211 struct p_data *p = pi->data; 2212 u32 peer_seq = be32_to_cpu(p->seq_num); 2213 int rw = WRITE; 2214 u32 dp_flags; 2215 int err, tp; 2216 2217 peer_device = conn_peer_device(connection, pi->vnr); 2218 if (!peer_device) 2219 return -EIO; 2220 device = peer_device->device; 2221 2222 if (!get_ldev(device)) { 2223 int err2; 2224 2225 err = wait_for_and_update_peer_seq(peer_device, peer_seq); 2226 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size); 2227 atomic_inc(&connection->current_epoch->epoch_size); 2228 err2 = drbd_drain_block(peer_device, pi->size); 2229 if (!err) 2230 err = err2; 2231 return err; 2232 } 2233 2234 /* 2235 * Corresponding put_ldev done either below (on various errors), or in 2236 * drbd_peer_request_endio, if we successfully submit the data at the 2237 * end of this function. 2238 */ 2239 2240 sector = be64_to_cpu(p->sector); 2241 peer_req = read_in_block(peer_device, p->block_id, sector, pi); 2242 if (!peer_req) { 2243 put_ldev(device); 2244 return -EIO; 2245 } 2246 2247 peer_req->w.cb = e_end_block; 2248 2249 dp_flags = be32_to_cpu(p->dp_flags); 2250 rw |= wire_flags_to_bio(dp_flags); 2251 if (pi->cmd == P_TRIM) { 2252 struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev); 2253 peer_req->flags |= EE_IS_TRIM; 2254 if (!blk_queue_discard(q)) 2255 peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT; 2256 D_ASSERT(peer_device, peer_req->i.size > 0); 2257 D_ASSERT(peer_device, rw & REQ_DISCARD); 2258 D_ASSERT(peer_device, peer_req->pages == NULL); 2259 } else if (peer_req->pages == NULL) { 2260 D_ASSERT(device, peer_req->i.size == 0); 2261 D_ASSERT(device, dp_flags & DP_FLUSH); 2262 } 2263 2264 if (dp_flags & DP_MAY_SET_IN_SYNC) 2265 peer_req->flags |= EE_MAY_SET_IN_SYNC; 2266 2267 spin_lock(&connection->epoch_lock); 2268 peer_req->epoch = connection->current_epoch; 2269 atomic_inc(&peer_req->epoch->epoch_size); 2270 atomic_inc(&peer_req->epoch->active); 2271 spin_unlock(&connection->epoch_lock); 2272 2273 rcu_read_lock(); 2274 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries; 2275 rcu_read_unlock(); 2276 if (tp) { 2277 peer_req->flags |= EE_IN_INTERVAL_TREE; 2278 err = wait_for_and_update_peer_seq(peer_device, peer_seq); 2279 if (err) 2280 goto out_interrupted; 2281 spin_lock_irq(&device->resource->req_lock); 2282 err = handle_write_conflicts(device, peer_req); 2283 if (err) { 2284 spin_unlock_irq(&device->resource->req_lock); 2285 if (err == -ENOENT) { 2286 put_ldev(device); 2287 return 0; 2288 } 2289 goto out_interrupted; 2290 } 2291 } else { 2292 update_peer_seq(peer_device, peer_seq); 2293 spin_lock_irq(&device->resource->req_lock); 2294 } 2295 /* if we use the zeroout fallback code, we process synchronously 2296 * and we wait for all pending requests, respectively wait for 2297 * active_ee to become empty in drbd_submit_peer_request(); 2298 * better not add ourselves here. */ 2299 if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0) 2300 list_add(&peer_req->w.list, &device->active_ee); 2301 spin_unlock_irq(&device->resource->req_lock); 2302 2303 if (device->state.conn == C_SYNC_TARGET) 2304 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req)); 2305 2306 if (peer_device->connection->agreed_pro_version < 100) { 2307 rcu_read_lock(); 2308 switch (rcu_dereference(peer_device->connection->net_conf)->wire_protocol) { 2309 case DRBD_PROT_C: 2310 dp_flags |= DP_SEND_WRITE_ACK; 2311 break; 2312 case DRBD_PROT_B: 2313 dp_flags |= DP_SEND_RECEIVE_ACK; 2314 break; 2315 } 2316 rcu_read_unlock(); 2317 } 2318 2319 if (dp_flags & DP_SEND_WRITE_ACK) { 2320 peer_req->flags |= EE_SEND_WRITE_ACK; 2321 inc_unacked(device); 2322 /* corresponding dec_unacked() in e_end_block() 2323 * respective _drbd_clear_done_ee */ 2324 } 2325 2326 if (dp_flags & DP_SEND_RECEIVE_ACK) { 2327 /* I really don't like it that the receiver thread 2328 * sends on the msock, but anyways */ 2329 drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req); 2330 } 2331 2332 if (device->state.pdsk < D_INCONSISTENT) { 2333 /* In case we have the only disk of the cluster, */ 2334 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size); 2335 peer_req->flags |= EE_CALL_AL_COMPLETE_IO; 2336 peer_req->flags &= ~EE_MAY_SET_IN_SYNC; 2337 drbd_al_begin_io(device, &peer_req->i, true); 2338 } 2339 2340 err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR); 2341 if (!err) 2342 return 0; 2343 2344 /* don't care for the reason here */ 2345 drbd_err(device, "submit failed, triggering re-connect\n"); 2346 spin_lock_irq(&device->resource->req_lock); 2347 list_del(&peer_req->w.list); 2348 drbd_remove_epoch_entry_interval(device, peer_req); 2349 spin_unlock_irq(&device->resource->req_lock); 2350 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) 2351 drbd_al_complete_io(device, &peer_req->i); 2352 2353 out_interrupted: 2354 drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP); 2355 put_ldev(device); 2356 drbd_free_peer_req(device, peer_req); 2357 return err; 2358 } 2359 2360 /* We may throttle resync, if the lower device seems to be busy, 2361 * and current sync rate is above c_min_rate. 2362 * 2363 * To decide whether or not the lower device is busy, we use a scheme similar 2364 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant" 2365 * (more than 64 sectors) of activity we cannot account for with our own resync 2366 * activity, it obviously is "busy". 2367 * 2368 * The current sync rate used here uses only the most recent two step marks, 2369 * to have a short time average so we can react faster. 2370 */ 2371 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector) 2372 { 2373 struct lc_element *tmp; 2374 bool throttle = true; 2375 2376 if (!drbd_rs_c_min_rate_throttle(device)) 2377 return false; 2378 2379 spin_lock_irq(&device->al_lock); 2380 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector)); 2381 if (tmp) { 2382 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce); 2383 if (test_bit(BME_PRIORITY, &bm_ext->flags)) 2384 throttle = false; 2385 /* Do not slow down if app IO is already waiting for this extent */ 2386 } 2387 spin_unlock_irq(&device->al_lock); 2388 2389 return throttle; 2390 } 2391 2392 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device) 2393 { 2394 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk; 2395 unsigned long db, dt, dbdt; 2396 unsigned int c_min_rate; 2397 int curr_events; 2398 2399 rcu_read_lock(); 2400 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate; 2401 rcu_read_unlock(); 2402 2403 /* feature disabled? */ 2404 if (c_min_rate == 0) 2405 return false; 2406 2407 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) + 2408 (int)part_stat_read(&disk->part0, sectors[1]) - 2409 atomic_read(&device->rs_sect_ev); 2410 if (!device->rs_last_events || curr_events - device->rs_last_events > 64) { 2411 unsigned long rs_left; 2412 int i; 2413 2414 device->rs_last_events = curr_events; 2415 2416 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP, 2417 * approx. */ 2418 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS; 2419 2420 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T) 2421 rs_left = device->ov_left; 2422 else 2423 rs_left = drbd_bm_total_weight(device) - device->rs_failed; 2424 2425 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ; 2426 if (!dt) 2427 dt++; 2428 db = device->rs_mark_left[i] - rs_left; 2429 dbdt = Bit2KB(db/dt); 2430 2431 if (dbdt > c_min_rate) 2432 return true; 2433 } 2434 return false; 2435 } 2436 2437 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi) 2438 { 2439 struct drbd_peer_device *peer_device; 2440 struct drbd_device *device; 2441 sector_t sector; 2442 sector_t capacity; 2443 struct drbd_peer_request *peer_req; 2444 struct digest_info *di = NULL; 2445 int size, verb; 2446 unsigned int fault_type; 2447 struct p_block_req *p = pi->data; 2448 2449 peer_device = conn_peer_device(connection, pi->vnr); 2450 if (!peer_device) 2451 return -EIO; 2452 device = peer_device->device; 2453 capacity = drbd_get_capacity(device->this_bdev); 2454 2455 sector = be64_to_cpu(p->sector); 2456 size = be32_to_cpu(p->blksize); 2457 2458 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) { 2459 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__, 2460 (unsigned long long)sector, size); 2461 return -EINVAL; 2462 } 2463 if (sector + (size>>9) > capacity) { 2464 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__, 2465 (unsigned long long)sector, size); 2466 return -EINVAL; 2467 } 2468 2469 if (!get_ldev_if_state(device, D_UP_TO_DATE)) { 2470 verb = 1; 2471 switch (pi->cmd) { 2472 case P_DATA_REQUEST: 2473 drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p); 2474 break; 2475 case P_RS_DATA_REQUEST: 2476 case P_CSUM_RS_REQUEST: 2477 case P_OV_REQUEST: 2478 drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p); 2479 break; 2480 case P_OV_REPLY: 2481 verb = 0; 2482 dec_rs_pending(device); 2483 drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC); 2484 break; 2485 default: 2486 BUG(); 2487 } 2488 if (verb && __ratelimit(&drbd_ratelimit_state)) 2489 drbd_err(device, "Can not satisfy peer's read request, " 2490 "no local data.\n"); 2491 2492 /* drain possibly payload */ 2493 return drbd_drain_block(peer_device, pi->size); 2494 } 2495 2496 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD 2497 * "criss-cross" setup, that might cause write-out on some other DRBD, 2498 * which in turn might block on the other node at this very place. */ 2499 peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size, 2500 true /* has real payload */, GFP_NOIO); 2501 if (!peer_req) { 2502 put_ldev(device); 2503 return -ENOMEM; 2504 } 2505 2506 switch (pi->cmd) { 2507 case P_DATA_REQUEST: 2508 peer_req->w.cb = w_e_end_data_req; 2509 fault_type = DRBD_FAULT_DT_RD; 2510 /* application IO, don't drbd_rs_begin_io */ 2511 goto submit; 2512 2513 case P_RS_DATA_REQUEST: 2514 peer_req->w.cb = w_e_end_rsdata_req; 2515 fault_type = DRBD_FAULT_RS_RD; 2516 /* used in the sector offset progress display */ 2517 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 2518 break; 2519 2520 case P_OV_REPLY: 2521 case P_CSUM_RS_REQUEST: 2522 fault_type = DRBD_FAULT_RS_RD; 2523 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO); 2524 if (!di) 2525 goto out_free_e; 2526 2527 di->digest_size = pi->size; 2528 di->digest = (((char *)di)+sizeof(struct digest_info)); 2529 2530 peer_req->digest = di; 2531 peer_req->flags |= EE_HAS_DIGEST; 2532 2533 if (drbd_recv_all(peer_device->connection, di->digest, pi->size)) 2534 goto out_free_e; 2535 2536 if (pi->cmd == P_CSUM_RS_REQUEST) { 2537 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89); 2538 peer_req->w.cb = w_e_end_csum_rs_req; 2539 /* used in the sector offset progress display */ 2540 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 2541 } else if (pi->cmd == P_OV_REPLY) { 2542 /* track progress, we may need to throttle */ 2543 atomic_add(size >> 9, &device->rs_sect_in); 2544 peer_req->w.cb = w_e_end_ov_reply; 2545 dec_rs_pending(device); 2546 /* drbd_rs_begin_io done when we sent this request, 2547 * but accounting still needs to be done. */ 2548 goto submit_for_resync; 2549 } 2550 break; 2551 2552 case P_OV_REQUEST: 2553 if (device->ov_start_sector == ~(sector_t)0 && 2554 peer_device->connection->agreed_pro_version >= 90) { 2555 unsigned long now = jiffies; 2556 int i; 2557 device->ov_start_sector = sector; 2558 device->ov_position = sector; 2559 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector); 2560 device->rs_total = device->ov_left; 2561 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 2562 device->rs_mark_left[i] = device->ov_left; 2563 device->rs_mark_time[i] = now; 2564 } 2565 drbd_info(device, "Online Verify start sector: %llu\n", 2566 (unsigned long long)sector); 2567 } 2568 peer_req->w.cb = w_e_end_ov_req; 2569 fault_type = DRBD_FAULT_RS_RD; 2570 break; 2571 2572 default: 2573 BUG(); 2574 } 2575 2576 /* Throttle, drbd_rs_begin_io and submit should become asynchronous 2577 * wrt the receiver, but it is not as straightforward as it may seem. 2578 * Various places in the resync start and stop logic assume resync 2579 * requests are processed in order, requeuing this on the worker thread 2580 * introduces a bunch of new code for synchronization between threads. 2581 * 2582 * Unlimited throttling before drbd_rs_begin_io may stall the resync 2583 * "forever", throttling after drbd_rs_begin_io will lock that extent 2584 * for application writes for the same time. For now, just throttle 2585 * here, where the rest of the code expects the receiver to sleep for 2586 * a while, anyways. 2587 */ 2588 2589 /* Throttle before drbd_rs_begin_io, as that locks out application IO; 2590 * this defers syncer requests for some time, before letting at least 2591 * on request through. The resync controller on the receiving side 2592 * will adapt to the incoming rate accordingly. 2593 * 2594 * We cannot throttle here if remote is Primary/SyncTarget: 2595 * we would also throttle its application reads. 2596 * In that case, throttling is done on the SyncTarget only. 2597 */ 2598 if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector)) 2599 schedule_timeout_uninterruptible(HZ/10); 2600 if (drbd_rs_begin_io(device, sector)) 2601 goto out_free_e; 2602 2603 submit_for_resync: 2604 atomic_add(size >> 9, &device->rs_sect_ev); 2605 2606 submit: 2607 inc_unacked(device); 2608 spin_lock_irq(&device->resource->req_lock); 2609 list_add_tail(&peer_req->w.list, &device->read_ee); 2610 spin_unlock_irq(&device->resource->req_lock); 2611 2612 if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0) 2613 return 0; 2614 2615 /* don't care for the reason here */ 2616 drbd_err(device, "submit failed, triggering re-connect\n"); 2617 spin_lock_irq(&device->resource->req_lock); 2618 list_del(&peer_req->w.list); 2619 spin_unlock_irq(&device->resource->req_lock); 2620 /* no drbd_rs_complete_io(), we are dropping the connection anyways */ 2621 2622 out_free_e: 2623 put_ldev(device); 2624 drbd_free_peer_req(device, peer_req); 2625 return -EIO; 2626 } 2627 2628 /** 2629 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries 2630 */ 2631 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local) 2632 { 2633 struct drbd_device *device = peer_device->device; 2634 int self, peer, rv = -100; 2635 unsigned long ch_self, ch_peer; 2636 enum drbd_after_sb_p after_sb_0p; 2637 2638 self = device->ldev->md.uuid[UI_BITMAP] & 1; 2639 peer = device->p_uuid[UI_BITMAP] & 1; 2640 2641 ch_peer = device->p_uuid[UI_SIZE]; 2642 ch_self = device->comm_bm_set; 2643 2644 rcu_read_lock(); 2645 after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p; 2646 rcu_read_unlock(); 2647 switch (after_sb_0p) { 2648 case ASB_CONSENSUS: 2649 case ASB_DISCARD_SECONDARY: 2650 case ASB_CALL_HELPER: 2651 case ASB_VIOLENTLY: 2652 drbd_err(device, "Configuration error.\n"); 2653 break; 2654 case ASB_DISCONNECT: 2655 break; 2656 case ASB_DISCARD_YOUNGER_PRI: 2657 if (self == 0 && peer == 1) { 2658 rv = -1; 2659 break; 2660 } 2661 if (self == 1 && peer == 0) { 2662 rv = 1; 2663 break; 2664 } 2665 /* Else fall through to one of the other strategies... */ 2666 case ASB_DISCARD_OLDER_PRI: 2667 if (self == 0 && peer == 1) { 2668 rv = 1; 2669 break; 2670 } 2671 if (self == 1 && peer == 0) { 2672 rv = -1; 2673 break; 2674 } 2675 /* Else fall through to one of the other strategies... */ 2676 drbd_warn(device, "Discard younger/older primary did not find a decision\n" 2677 "Using discard-least-changes instead\n"); 2678 case ASB_DISCARD_ZERO_CHG: 2679 if (ch_peer == 0 && ch_self == 0) { 2680 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) 2681 ? -1 : 1; 2682 break; 2683 } else { 2684 if (ch_peer == 0) { rv = 1; break; } 2685 if (ch_self == 0) { rv = -1; break; } 2686 } 2687 if (after_sb_0p == ASB_DISCARD_ZERO_CHG) 2688 break; 2689 case ASB_DISCARD_LEAST_CHG: 2690 if (ch_self < ch_peer) 2691 rv = -1; 2692 else if (ch_self > ch_peer) 2693 rv = 1; 2694 else /* ( ch_self == ch_peer ) */ 2695 /* Well, then use something else. */ 2696 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) 2697 ? -1 : 1; 2698 break; 2699 case ASB_DISCARD_LOCAL: 2700 rv = -1; 2701 break; 2702 case ASB_DISCARD_REMOTE: 2703 rv = 1; 2704 } 2705 2706 return rv; 2707 } 2708 2709 /** 2710 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary 2711 */ 2712 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local) 2713 { 2714 struct drbd_device *device = peer_device->device; 2715 int hg, rv = -100; 2716 enum drbd_after_sb_p after_sb_1p; 2717 2718 rcu_read_lock(); 2719 after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p; 2720 rcu_read_unlock(); 2721 switch (after_sb_1p) { 2722 case ASB_DISCARD_YOUNGER_PRI: 2723 case ASB_DISCARD_OLDER_PRI: 2724 case ASB_DISCARD_LEAST_CHG: 2725 case ASB_DISCARD_LOCAL: 2726 case ASB_DISCARD_REMOTE: 2727 case ASB_DISCARD_ZERO_CHG: 2728 drbd_err(device, "Configuration error.\n"); 2729 break; 2730 case ASB_DISCONNECT: 2731 break; 2732 case ASB_CONSENSUS: 2733 hg = drbd_asb_recover_0p(peer_device); 2734 if (hg == -1 && device->state.role == R_SECONDARY) 2735 rv = hg; 2736 if (hg == 1 && device->state.role == R_PRIMARY) 2737 rv = hg; 2738 break; 2739 case ASB_VIOLENTLY: 2740 rv = drbd_asb_recover_0p(peer_device); 2741 break; 2742 case ASB_DISCARD_SECONDARY: 2743 return device->state.role == R_PRIMARY ? 1 : -1; 2744 case ASB_CALL_HELPER: 2745 hg = drbd_asb_recover_0p(peer_device); 2746 if (hg == -1 && device->state.role == R_PRIMARY) { 2747 enum drbd_state_rv rv2; 2748 2749 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE, 2750 * we might be here in C_WF_REPORT_PARAMS which is transient. 2751 * we do not need to wait for the after state change work either. */ 2752 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY)); 2753 if (rv2 != SS_SUCCESS) { 2754 drbd_khelper(device, "pri-lost-after-sb"); 2755 } else { 2756 drbd_warn(device, "Successfully gave up primary role.\n"); 2757 rv = hg; 2758 } 2759 } else 2760 rv = hg; 2761 } 2762 2763 return rv; 2764 } 2765 2766 /** 2767 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries 2768 */ 2769 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local) 2770 { 2771 struct drbd_device *device = peer_device->device; 2772 int hg, rv = -100; 2773 enum drbd_after_sb_p after_sb_2p; 2774 2775 rcu_read_lock(); 2776 after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p; 2777 rcu_read_unlock(); 2778 switch (after_sb_2p) { 2779 case ASB_DISCARD_YOUNGER_PRI: 2780 case ASB_DISCARD_OLDER_PRI: 2781 case ASB_DISCARD_LEAST_CHG: 2782 case ASB_DISCARD_LOCAL: 2783 case ASB_DISCARD_REMOTE: 2784 case ASB_CONSENSUS: 2785 case ASB_DISCARD_SECONDARY: 2786 case ASB_DISCARD_ZERO_CHG: 2787 drbd_err(device, "Configuration error.\n"); 2788 break; 2789 case ASB_VIOLENTLY: 2790 rv = drbd_asb_recover_0p(peer_device); 2791 break; 2792 case ASB_DISCONNECT: 2793 break; 2794 case ASB_CALL_HELPER: 2795 hg = drbd_asb_recover_0p(peer_device); 2796 if (hg == -1) { 2797 enum drbd_state_rv rv2; 2798 2799 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE, 2800 * we might be here in C_WF_REPORT_PARAMS which is transient. 2801 * we do not need to wait for the after state change work either. */ 2802 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY)); 2803 if (rv2 != SS_SUCCESS) { 2804 drbd_khelper(device, "pri-lost-after-sb"); 2805 } else { 2806 drbd_warn(device, "Successfully gave up primary role.\n"); 2807 rv = hg; 2808 } 2809 } else 2810 rv = hg; 2811 } 2812 2813 return rv; 2814 } 2815 2816 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid, 2817 u64 bits, u64 flags) 2818 { 2819 if (!uuid) { 2820 drbd_info(device, "%s uuid info vanished while I was looking!\n", text); 2821 return; 2822 } 2823 drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n", 2824 text, 2825 (unsigned long long)uuid[UI_CURRENT], 2826 (unsigned long long)uuid[UI_BITMAP], 2827 (unsigned long long)uuid[UI_HISTORY_START], 2828 (unsigned long long)uuid[UI_HISTORY_END], 2829 (unsigned long long)bits, 2830 (unsigned long long)flags); 2831 } 2832 2833 /* 2834 100 after split brain try auto recover 2835 2 C_SYNC_SOURCE set BitMap 2836 1 C_SYNC_SOURCE use BitMap 2837 0 no Sync 2838 -1 C_SYNC_TARGET use BitMap 2839 -2 C_SYNC_TARGET set BitMap 2840 -100 after split brain, disconnect 2841 -1000 unrelated data 2842 -1091 requires proto 91 2843 -1096 requires proto 96 2844 */ 2845 static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local) 2846 { 2847 u64 self, peer; 2848 int i, j; 2849 2850 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1); 2851 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 2852 2853 *rule_nr = 10; 2854 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED) 2855 return 0; 2856 2857 *rule_nr = 20; 2858 if ((self == UUID_JUST_CREATED || self == (u64)0) && 2859 peer != UUID_JUST_CREATED) 2860 return -2; 2861 2862 *rule_nr = 30; 2863 if (self != UUID_JUST_CREATED && 2864 (peer == UUID_JUST_CREATED || peer == (u64)0)) 2865 return 2; 2866 2867 if (self == peer) { 2868 int rct, dc; /* roles at crash time */ 2869 2870 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) { 2871 2872 if (first_peer_device(device)->connection->agreed_pro_version < 91) 2873 return -1091; 2874 2875 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) && 2876 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) { 2877 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n"); 2878 drbd_uuid_move_history(device); 2879 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP]; 2880 device->ldev->md.uuid[UI_BITMAP] = 0; 2881 2882 drbd_uuid_dump(device, "self", device->ldev->md.uuid, 2883 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0); 2884 *rule_nr = 34; 2885 } else { 2886 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n"); 2887 *rule_nr = 36; 2888 } 2889 2890 return 1; 2891 } 2892 2893 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) { 2894 2895 if (first_peer_device(device)->connection->agreed_pro_version < 91) 2896 return -1091; 2897 2898 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) && 2899 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) { 2900 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n"); 2901 2902 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START]; 2903 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP]; 2904 device->p_uuid[UI_BITMAP] = 0UL; 2905 2906 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 2907 *rule_nr = 35; 2908 } else { 2909 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n"); 2910 *rule_nr = 37; 2911 } 2912 2913 return -1; 2914 } 2915 2916 /* Common power [off|failure] */ 2917 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) + 2918 (device->p_uuid[UI_FLAGS] & 2); 2919 /* lowest bit is set when we were primary, 2920 * next bit (weight 2) is set when peer was primary */ 2921 *rule_nr = 40; 2922 2923 switch (rct) { 2924 case 0: /* !self_pri && !peer_pri */ return 0; 2925 case 1: /* self_pri && !peer_pri */ return 1; 2926 case 2: /* !self_pri && peer_pri */ return -1; 2927 case 3: /* self_pri && peer_pri */ 2928 dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags); 2929 return dc ? -1 : 1; 2930 } 2931 } 2932 2933 *rule_nr = 50; 2934 peer = device->p_uuid[UI_BITMAP] & ~((u64)1); 2935 if (self == peer) 2936 return -1; 2937 2938 *rule_nr = 51; 2939 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1); 2940 if (self == peer) { 2941 if (first_peer_device(device)->connection->agreed_pro_version < 96 ? 2942 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == 2943 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) : 2944 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) { 2945 /* The last P_SYNC_UUID did not get though. Undo the last start of 2946 resync as sync source modifications of the peer's UUIDs. */ 2947 2948 if (first_peer_device(device)->connection->agreed_pro_version < 91) 2949 return -1091; 2950 2951 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START]; 2952 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1]; 2953 2954 drbd_info(device, "Lost last syncUUID packet, corrected:\n"); 2955 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 2956 2957 return -1; 2958 } 2959 } 2960 2961 *rule_nr = 60; 2962 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1); 2963 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 2964 peer = device->p_uuid[i] & ~((u64)1); 2965 if (self == peer) 2966 return -2; 2967 } 2968 2969 *rule_nr = 70; 2970 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1); 2971 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 2972 if (self == peer) 2973 return 1; 2974 2975 *rule_nr = 71; 2976 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1); 2977 if (self == peer) { 2978 if (first_peer_device(device)->connection->agreed_pro_version < 96 ? 2979 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == 2980 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) : 2981 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) { 2982 /* The last P_SYNC_UUID did not get though. Undo the last start of 2983 resync as sync source modifications of our UUIDs. */ 2984 2985 if (first_peer_device(device)->connection->agreed_pro_version < 91) 2986 return -1091; 2987 2988 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]); 2989 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]); 2990 2991 drbd_info(device, "Last syncUUID did not get through, corrected:\n"); 2992 drbd_uuid_dump(device, "self", device->ldev->md.uuid, 2993 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0); 2994 2995 return 1; 2996 } 2997 } 2998 2999 3000 *rule_nr = 80; 3001 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 3002 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 3003 self = device->ldev->md.uuid[i] & ~((u64)1); 3004 if (self == peer) 3005 return 2; 3006 } 3007 3008 *rule_nr = 90; 3009 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1); 3010 peer = device->p_uuid[UI_BITMAP] & ~((u64)1); 3011 if (self == peer && self != ((u64)0)) 3012 return 100; 3013 3014 *rule_nr = 100; 3015 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 3016 self = device->ldev->md.uuid[i] & ~((u64)1); 3017 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) { 3018 peer = device->p_uuid[j] & ~((u64)1); 3019 if (self == peer) 3020 return -100; 3021 } 3022 } 3023 3024 return -1000; 3025 } 3026 3027 /* drbd_sync_handshake() returns the new conn state on success, or 3028 CONN_MASK (-1) on failure. 3029 */ 3030 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device, 3031 enum drbd_role peer_role, 3032 enum drbd_disk_state peer_disk) __must_hold(local) 3033 { 3034 struct drbd_device *device = peer_device->device; 3035 enum drbd_conns rv = C_MASK; 3036 enum drbd_disk_state mydisk; 3037 struct net_conf *nc; 3038 int hg, rule_nr, rr_conflict, tentative; 3039 3040 mydisk = device->state.disk; 3041 if (mydisk == D_NEGOTIATING) 3042 mydisk = device->new_state_tmp.disk; 3043 3044 drbd_info(device, "drbd_sync_handshake:\n"); 3045 3046 spin_lock_irq(&device->ldev->md.uuid_lock); 3047 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0); 3048 drbd_uuid_dump(device, "peer", device->p_uuid, 3049 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 3050 3051 hg = drbd_uuid_compare(device, &rule_nr); 3052 spin_unlock_irq(&device->ldev->md.uuid_lock); 3053 3054 drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr); 3055 3056 if (hg == -1000) { 3057 drbd_alert(device, "Unrelated data, aborting!\n"); 3058 return C_MASK; 3059 } 3060 if (hg < -1000) { 3061 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000); 3062 return C_MASK; 3063 } 3064 3065 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) || 3066 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) { 3067 int f = (hg == -100) || abs(hg) == 2; 3068 hg = mydisk > D_INCONSISTENT ? 1 : -1; 3069 if (f) 3070 hg = hg*2; 3071 drbd_info(device, "Becoming sync %s due to disk states.\n", 3072 hg > 0 ? "source" : "target"); 3073 } 3074 3075 if (abs(hg) == 100) 3076 drbd_khelper(device, "initial-split-brain"); 3077 3078 rcu_read_lock(); 3079 nc = rcu_dereference(peer_device->connection->net_conf); 3080 3081 if (hg == 100 || (hg == -100 && nc->always_asbp)) { 3082 int pcount = (device->state.role == R_PRIMARY) 3083 + (peer_role == R_PRIMARY); 3084 int forced = (hg == -100); 3085 3086 switch (pcount) { 3087 case 0: 3088 hg = drbd_asb_recover_0p(peer_device); 3089 break; 3090 case 1: 3091 hg = drbd_asb_recover_1p(peer_device); 3092 break; 3093 case 2: 3094 hg = drbd_asb_recover_2p(peer_device); 3095 break; 3096 } 3097 if (abs(hg) < 100) { 3098 drbd_warn(device, "Split-Brain detected, %d primaries, " 3099 "automatically solved. Sync from %s node\n", 3100 pcount, (hg < 0) ? "peer" : "this"); 3101 if (forced) { 3102 drbd_warn(device, "Doing a full sync, since" 3103 " UUIDs where ambiguous.\n"); 3104 hg = hg*2; 3105 } 3106 } 3107 } 3108 3109 if (hg == -100) { 3110 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1)) 3111 hg = -1; 3112 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1)) 3113 hg = 1; 3114 3115 if (abs(hg) < 100) 3116 drbd_warn(device, "Split-Brain detected, manually solved. " 3117 "Sync from %s node\n", 3118 (hg < 0) ? "peer" : "this"); 3119 } 3120 rr_conflict = nc->rr_conflict; 3121 tentative = nc->tentative; 3122 rcu_read_unlock(); 3123 3124 if (hg == -100) { 3125 /* FIXME this log message is not correct if we end up here 3126 * after an attempted attach on a diskless node. 3127 * We just refuse to attach -- well, we drop the "connection" 3128 * to that disk, in a way... */ 3129 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n"); 3130 drbd_khelper(device, "split-brain"); 3131 return C_MASK; 3132 } 3133 3134 if (hg > 0 && mydisk <= D_INCONSISTENT) { 3135 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n"); 3136 return C_MASK; 3137 } 3138 3139 if (hg < 0 && /* by intention we do not use mydisk here. */ 3140 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) { 3141 switch (rr_conflict) { 3142 case ASB_CALL_HELPER: 3143 drbd_khelper(device, "pri-lost"); 3144 /* fall through */ 3145 case ASB_DISCONNECT: 3146 drbd_err(device, "I shall become SyncTarget, but I am primary!\n"); 3147 return C_MASK; 3148 case ASB_VIOLENTLY: 3149 drbd_warn(device, "Becoming SyncTarget, violating the stable-data" 3150 "assumption\n"); 3151 } 3152 } 3153 3154 if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) { 3155 if (hg == 0) 3156 drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n"); 3157 else 3158 drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.", 3159 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET), 3160 abs(hg) >= 2 ? "full" : "bit-map based"); 3161 return C_MASK; 3162 } 3163 3164 if (abs(hg) >= 2) { 3165 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n"); 3166 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake", 3167 BM_LOCKED_SET_ALLOWED)) 3168 return C_MASK; 3169 } 3170 3171 if (hg > 0) { /* become sync source. */ 3172 rv = C_WF_BITMAP_S; 3173 } else if (hg < 0) { /* become sync target */ 3174 rv = C_WF_BITMAP_T; 3175 } else { 3176 rv = C_CONNECTED; 3177 if (drbd_bm_total_weight(device)) { 3178 drbd_info(device, "No resync, but %lu bits in bitmap!\n", 3179 drbd_bm_total_weight(device)); 3180 } 3181 } 3182 3183 return rv; 3184 } 3185 3186 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer) 3187 { 3188 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */ 3189 if (peer == ASB_DISCARD_REMOTE) 3190 return ASB_DISCARD_LOCAL; 3191 3192 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */ 3193 if (peer == ASB_DISCARD_LOCAL) 3194 return ASB_DISCARD_REMOTE; 3195 3196 /* everything else is valid if they are equal on both sides. */ 3197 return peer; 3198 } 3199 3200 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi) 3201 { 3202 struct p_protocol *p = pi->data; 3203 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p; 3204 int p_proto, p_discard_my_data, p_two_primaries, cf; 3205 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL; 3206 char integrity_alg[SHARED_SECRET_MAX] = ""; 3207 struct crypto_hash *peer_integrity_tfm = NULL; 3208 void *int_dig_in = NULL, *int_dig_vv = NULL; 3209 3210 p_proto = be32_to_cpu(p->protocol); 3211 p_after_sb_0p = be32_to_cpu(p->after_sb_0p); 3212 p_after_sb_1p = be32_to_cpu(p->after_sb_1p); 3213 p_after_sb_2p = be32_to_cpu(p->after_sb_2p); 3214 p_two_primaries = be32_to_cpu(p->two_primaries); 3215 cf = be32_to_cpu(p->conn_flags); 3216 p_discard_my_data = cf & CF_DISCARD_MY_DATA; 3217 3218 if (connection->agreed_pro_version >= 87) { 3219 int err; 3220 3221 if (pi->size > sizeof(integrity_alg)) 3222 return -EIO; 3223 err = drbd_recv_all(connection, integrity_alg, pi->size); 3224 if (err) 3225 return err; 3226 integrity_alg[SHARED_SECRET_MAX - 1] = 0; 3227 } 3228 3229 if (pi->cmd != P_PROTOCOL_UPDATE) { 3230 clear_bit(CONN_DRY_RUN, &connection->flags); 3231 3232 if (cf & CF_DRY_RUN) 3233 set_bit(CONN_DRY_RUN, &connection->flags); 3234 3235 rcu_read_lock(); 3236 nc = rcu_dereference(connection->net_conf); 3237 3238 if (p_proto != nc->wire_protocol) { 3239 drbd_err(connection, "incompatible %s settings\n", "protocol"); 3240 goto disconnect_rcu_unlock; 3241 } 3242 3243 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) { 3244 drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri"); 3245 goto disconnect_rcu_unlock; 3246 } 3247 3248 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) { 3249 drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri"); 3250 goto disconnect_rcu_unlock; 3251 } 3252 3253 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) { 3254 drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri"); 3255 goto disconnect_rcu_unlock; 3256 } 3257 3258 if (p_discard_my_data && nc->discard_my_data) { 3259 drbd_err(connection, "incompatible %s settings\n", "discard-my-data"); 3260 goto disconnect_rcu_unlock; 3261 } 3262 3263 if (p_two_primaries != nc->two_primaries) { 3264 drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries"); 3265 goto disconnect_rcu_unlock; 3266 } 3267 3268 if (strcmp(integrity_alg, nc->integrity_alg)) { 3269 drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg"); 3270 goto disconnect_rcu_unlock; 3271 } 3272 3273 rcu_read_unlock(); 3274 } 3275 3276 if (integrity_alg[0]) { 3277 int hash_size; 3278 3279 /* 3280 * We can only change the peer data integrity algorithm 3281 * here. Changing our own data integrity algorithm 3282 * requires that we send a P_PROTOCOL_UPDATE packet at 3283 * the same time; otherwise, the peer has no way to 3284 * tell between which packets the algorithm should 3285 * change. 3286 */ 3287 3288 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC); 3289 if (!peer_integrity_tfm) { 3290 drbd_err(connection, "peer data-integrity-alg %s not supported\n", 3291 integrity_alg); 3292 goto disconnect; 3293 } 3294 3295 hash_size = crypto_hash_digestsize(peer_integrity_tfm); 3296 int_dig_in = kmalloc(hash_size, GFP_KERNEL); 3297 int_dig_vv = kmalloc(hash_size, GFP_KERNEL); 3298 if (!(int_dig_in && int_dig_vv)) { 3299 drbd_err(connection, "Allocation of buffers for data integrity checking failed\n"); 3300 goto disconnect; 3301 } 3302 } 3303 3304 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL); 3305 if (!new_net_conf) { 3306 drbd_err(connection, "Allocation of new net_conf failed\n"); 3307 goto disconnect; 3308 } 3309 3310 mutex_lock(&connection->data.mutex); 3311 mutex_lock(&connection->resource->conf_update); 3312 old_net_conf = connection->net_conf; 3313 *new_net_conf = *old_net_conf; 3314 3315 new_net_conf->wire_protocol = p_proto; 3316 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p); 3317 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p); 3318 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p); 3319 new_net_conf->two_primaries = p_two_primaries; 3320 3321 rcu_assign_pointer(connection->net_conf, new_net_conf); 3322 mutex_unlock(&connection->resource->conf_update); 3323 mutex_unlock(&connection->data.mutex); 3324 3325 crypto_free_hash(connection->peer_integrity_tfm); 3326 kfree(connection->int_dig_in); 3327 kfree(connection->int_dig_vv); 3328 connection->peer_integrity_tfm = peer_integrity_tfm; 3329 connection->int_dig_in = int_dig_in; 3330 connection->int_dig_vv = int_dig_vv; 3331 3332 if (strcmp(old_net_conf->integrity_alg, integrity_alg)) 3333 drbd_info(connection, "peer data-integrity-alg: %s\n", 3334 integrity_alg[0] ? integrity_alg : "(none)"); 3335 3336 synchronize_rcu(); 3337 kfree(old_net_conf); 3338 return 0; 3339 3340 disconnect_rcu_unlock: 3341 rcu_read_unlock(); 3342 disconnect: 3343 crypto_free_hash(peer_integrity_tfm); 3344 kfree(int_dig_in); 3345 kfree(int_dig_vv); 3346 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 3347 return -EIO; 3348 } 3349 3350 /* helper function 3351 * input: alg name, feature name 3352 * return: NULL (alg name was "") 3353 * ERR_PTR(error) if something goes wrong 3354 * or the crypto hash ptr, if it worked out ok. */ 3355 static 3356 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device, 3357 const char *alg, const char *name) 3358 { 3359 struct crypto_hash *tfm; 3360 3361 if (!alg[0]) 3362 return NULL; 3363 3364 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC); 3365 if (IS_ERR(tfm)) { 3366 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n", 3367 alg, name, PTR_ERR(tfm)); 3368 return tfm; 3369 } 3370 return tfm; 3371 } 3372 3373 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi) 3374 { 3375 void *buffer = connection->data.rbuf; 3376 int size = pi->size; 3377 3378 while (size) { 3379 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE); 3380 s = drbd_recv(connection, buffer, s); 3381 if (s <= 0) { 3382 if (s < 0) 3383 return s; 3384 break; 3385 } 3386 size -= s; 3387 } 3388 if (size) 3389 return -EIO; 3390 return 0; 3391 } 3392 3393 /* 3394 * config_unknown_volume - device configuration command for unknown volume 3395 * 3396 * When a device is added to an existing connection, the node on which the 3397 * device is added first will send configuration commands to its peer but the 3398 * peer will not know about the device yet. It will warn and ignore these 3399 * commands. Once the device is added on the second node, the second node will 3400 * send the same device configuration commands, but in the other direction. 3401 * 3402 * (We can also end up here if drbd is misconfigured.) 3403 */ 3404 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi) 3405 { 3406 drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n", 3407 cmdname(pi->cmd), pi->vnr); 3408 return ignore_remaining_packet(connection, pi); 3409 } 3410 3411 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi) 3412 { 3413 struct drbd_peer_device *peer_device; 3414 struct drbd_device *device; 3415 struct p_rs_param_95 *p; 3416 unsigned int header_size, data_size, exp_max_sz; 3417 struct crypto_hash *verify_tfm = NULL; 3418 struct crypto_hash *csums_tfm = NULL; 3419 struct net_conf *old_net_conf, *new_net_conf = NULL; 3420 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL; 3421 const int apv = connection->agreed_pro_version; 3422 struct fifo_buffer *old_plan = NULL, *new_plan = NULL; 3423 int fifo_size = 0; 3424 int err; 3425 3426 peer_device = conn_peer_device(connection, pi->vnr); 3427 if (!peer_device) 3428 return config_unknown_volume(connection, pi); 3429 device = peer_device->device; 3430 3431 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param) 3432 : apv == 88 ? sizeof(struct p_rs_param) 3433 + SHARED_SECRET_MAX 3434 : apv <= 94 ? sizeof(struct p_rs_param_89) 3435 : /* apv >= 95 */ sizeof(struct p_rs_param_95); 3436 3437 if (pi->size > exp_max_sz) { 3438 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n", 3439 pi->size, exp_max_sz); 3440 return -EIO; 3441 } 3442 3443 if (apv <= 88) { 3444 header_size = sizeof(struct p_rs_param); 3445 data_size = pi->size - header_size; 3446 } else if (apv <= 94) { 3447 header_size = sizeof(struct p_rs_param_89); 3448 data_size = pi->size - header_size; 3449 D_ASSERT(device, data_size == 0); 3450 } else { 3451 header_size = sizeof(struct p_rs_param_95); 3452 data_size = pi->size - header_size; 3453 D_ASSERT(device, data_size == 0); 3454 } 3455 3456 /* initialize verify_alg and csums_alg */ 3457 p = pi->data; 3458 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX); 3459 3460 err = drbd_recv_all(peer_device->connection, p, header_size); 3461 if (err) 3462 return err; 3463 3464 mutex_lock(&connection->resource->conf_update); 3465 old_net_conf = peer_device->connection->net_conf; 3466 if (get_ldev(device)) { 3467 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL); 3468 if (!new_disk_conf) { 3469 put_ldev(device); 3470 mutex_unlock(&connection->resource->conf_update); 3471 drbd_err(device, "Allocation of new disk_conf failed\n"); 3472 return -ENOMEM; 3473 } 3474 3475 old_disk_conf = device->ldev->disk_conf; 3476 *new_disk_conf = *old_disk_conf; 3477 3478 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate); 3479 } 3480 3481 if (apv >= 88) { 3482 if (apv == 88) { 3483 if (data_size > SHARED_SECRET_MAX || data_size == 0) { 3484 drbd_err(device, "verify-alg of wrong size, " 3485 "peer wants %u, accepting only up to %u byte\n", 3486 data_size, SHARED_SECRET_MAX); 3487 err = -EIO; 3488 goto reconnect; 3489 } 3490 3491 err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size); 3492 if (err) 3493 goto reconnect; 3494 /* we expect NUL terminated string */ 3495 /* but just in case someone tries to be evil */ 3496 D_ASSERT(device, p->verify_alg[data_size-1] == 0); 3497 p->verify_alg[data_size-1] = 0; 3498 3499 } else /* apv >= 89 */ { 3500 /* we still expect NUL terminated strings */ 3501 /* but just in case someone tries to be evil */ 3502 D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0); 3503 D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0); 3504 p->verify_alg[SHARED_SECRET_MAX-1] = 0; 3505 p->csums_alg[SHARED_SECRET_MAX-1] = 0; 3506 } 3507 3508 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) { 3509 if (device->state.conn == C_WF_REPORT_PARAMS) { 3510 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n", 3511 old_net_conf->verify_alg, p->verify_alg); 3512 goto disconnect; 3513 } 3514 verify_tfm = drbd_crypto_alloc_digest_safe(device, 3515 p->verify_alg, "verify-alg"); 3516 if (IS_ERR(verify_tfm)) { 3517 verify_tfm = NULL; 3518 goto disconnect; 3519 } 3520 } 3521 3522 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) { 3523 if (device->state.conn == C_WF_REPORT_PARAMS) { 3524 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n", 3525 old_net_conf->csums_alg, p->csums_alg); 3526 goto disconnect; 3527 } 3528 csums_tfm = drbd_crypto_alloc_digest_safe(device, 3529 p->csums_alg, "csums-alg"); 3530 if (IS_ERR(csums_tfm)) { 3531 csums_tfm = NULL; 3532 goto disconnect; 3533 } 3534 } 3535 3536 if (apv > 94 && new_disk_conf) { 3537 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead); 3538 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target); 3539 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target); 3540 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate); 3541 3542 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ; 3543 if (fifo_size != device->rs_plan_s->size) { 3544 new_plan = fifo_alloc(fifo_size); 3545 if (!new_plan) { 3546 drbd_err(device, "kmalloc of fifo_buffer failed"); 3547 put_ldev(device); 3548 goto disconnect; 3549 } 3550 } 3551 } 3552 3553 if (verify_tfm || csums_tfm) { 3554 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL); 3555 if (!new_net_conf) { 3556 drbd_err(device, "Allocation of new net_conf failed\n"); 3557 goto disconnect; 3558 } 3559 3560 *new_net_conf = *old_net_conf; 3561 3562 if (verify_tfm) { 3563 strcpy(new_net_conf->verify_alg, p->verify_alg); 3564 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1; 3565 crypto_free_hash(peer_device->connection->verify_tfm); 3566 peer_device->connection->verify_tfm = verify_tfm; 3567 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg); 3568 } 3569 if (csums_tfm) { 3570 strcpy(new_net_conf->csums_alg, p->csums_alg); 3571 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1; 3572 crypto_free_hash(peer_device->connection->csums_tfm); 3573 peer_device->connection->csums_tfm = csums_tfm; 3574 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg); 3575 } 3576 rcu_assign_pointer(connection->net_conf, new_net_conf); 3577 } 3578 } 3579 3580 if (new_disk_conf) { 3581 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf); 3582 put_ldev(device); 3583 } 3584 3585 if (new_plan) { 3586 old_plan = device->rs_plan_s; 3587 rcu_assign_pointer(device->rs_plan_s, new_plan); 3588 } 3589 3590 mutex_unlock(&connection->resource->conf_update); 3591 synchronize_rcu(); 3592 if (new_net_conf) 3593 kfree(old_net_conf); 3594 kfree(old_disk_conf); 3595 kfree(old_plan); 3596 3597 return 0; 3598 3599 reconnect: 3600 if (new_disk_conf) { 3601 put_ldev(device); 3602 kfree(new_disk_conf); 3603 } 3604 mutex_unlock(&connection->resource->conf_update); 3605 return -EIO; 3606 3607 disconnect: 3608 kfree(new_plan); 3609 if (new_disk_conf) { 3610 put_ldev(device); 3611 kfree(new_disk_conf); 3612 } 3613 mutex_unlock(&connection->resource->conf_update); 3614 /* just for completeness: actually not needed, 3615 * as this is not reached if csums_tfm was ok. */ 3616 crypto_free_hash(csums_tfm); 3617 /* but free the verify_tfm again, if csums_tfm did not work out */ 3618 crypto_free_hash(verify_tfm); 3619 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 3620 return -EIO; 3621 } 3622 3623 /* warn if the arguments differ by more than 12.5% */ 3624 static void warn_if_differ_considerably(struct drbd_device *device, 3625 const char *s, sector_t a, sector_t b) 3626 { 3627 sector_t d; 3628 if (a == 0 || b == 0) 3629 return; 3630 d = (a > b) ? (a - b) : (b - a); 3631 if (d > (a>>3) || d > (b>>3)) 3632 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s, 3633 (unsigned long long)a, (unsigned long long)b); 3634 } 3635 3636 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi) 3637 { 3638 struct drbd_peer_device *peer_device; 3639 struct drbd_device *device; 3640 struct p_sizes *p = pi->data; 3641 enum determine_dev_size dd = DS_UNCHANGED; 3642 sector_t p_size, p_usize, my_usize; 3643 int ldsc = 0; /* local disk size changed */ 3644 enum dds_flags ddsf; 3645 3646 peer_device = conn_peer_device(connection, pi->vnr); 3647 if (!peer_device) 3648 return config_unknown_volume(connection, pi); 3649 device = peer_device->device; 3650 3651 p_size = be64_to_cpu(p->d_size); 3652 p_usize = be64_to_cpu(p->u_size); 3653 3654 /* just store the peer's disk size for now. 3655 * we still need to figure out whether we accept that. */ 3656 device->p_size = p_size; 3657 3658 if (get_ldev(device)) { 3659 rcu_read_lock(); 3660 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size; 3661 rcu_read_unlock(); 3662 3663 warn_if_differ_considerably(device, "lower level device sizes", 3664 p_size, drbd_get_max_capacity(device->ldev)); 3665 warn_if_differ_considerably(device, "user requested size", 3666 p_usize, my_usize); 3667 3668 /* if this is the first connect, or an otherwise expected 3669 * param exchange, choose the minimum */ 3670 if (device->state.conn == C_WF_REPORT_PARAMS) 3671 p_usize = min_not_zero(my_usize, p_usize); 3672 3673 /* Never shrink a device with usable data during connect. 3674 But allow online shrinking if we are connected. */ 3675 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) < 3676 drbd_get_capacity(device->this_bdev) && 3677 device->state.disk >= D_OUTDATED && 3678 device->state.conn < C_CONNECTED) { 3679 drbd_err(device, "The peer's disk size is too small!\n"); 3680 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 3681 put_ldev(device); 3682 return -EIO; 3683 } 3684 3685 if (my_usize != p_usize) { 3686 struct disk_conf *old_disk_conf, *new_disk_conf = NULL; 3687 3688 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL); 3689 if (!new_disk_conf) { 3690 drbd_err(device, "Allocation of new disk_conf failed\n"); 3691 put_ldev(device); 3692 return -ENOMEM; 3693 } 3694 3695 mutex_lock(&connection->resource->conf_update); 3696 old_disk_conf = device->ldev->disk_conf; 3697 *new_disk_conf = *old_disk_conf; 3698 new_disk_conf->disk_size = p_usize; 3699 3700 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf); 3701 mutex_unlock(&connection->resource->conf_update); 3702 synchronize_rcu(); 3703 kfree(old_disk_conf); 3704 3705 drbd_info(device, "Peer sets u_size to %lu sectors\n", 3706 (unsigned long)my_usize); 3707 } 3708 3709 put_ldev(device); 3710 } 3711 3712 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size); 3713 drbd_reconsider_max_bio_size(device); 3714 /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size(). 3715 In case we cleared the QUEUE_FLAG_DISCARD from our queue in 3716 drbd_reconsider_max_bio_size(), we can be sure that after 3717 drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */ 3718 3719 ddsf = be16_to_cpu(p->dds_flags); 3720 if (get_ldev(device)) { 3721 dd = drbd_determine_dev_size(device, ddsf, NULL); 3722 put_ldev(device); 3723 if (dd == DS_ERROR) 3724 return -EIO; 3725 drbd_md_sync(device); 3726 } else { 3727 /* I am diskless, need to accept the peer's size. */ 3728 drbd_set_my_capacity(device, p_size); 3729 } 3730 3731 if (get_ldev(device)) { 3732 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) { 3733 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev); 3734 ldsc = 1; 3735 } 3736 3737 put_ldev(device); 3738 } 3739 3740 if (device->state.conn > C_WF_REPORT_PARAMS) { 3741 if (be64_to_cpu(p->c_size) != 3742 drbd_get_capacity(device->this_bdev) || ldsc) { 3743 /* we have different sizes, probably peer 3744 * needs to know my new size... */ 3745 drbd_send_sizes(peer_device, 0, ddsf); 3746 } 3747 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) || 3748 (dd == DS_GREW && device->state.conn == C_CONNECTED)) { 3749 if (device->state.pdsk >= D_INCONSISTENT && 3750 device->state.disk >= D_INCONSISTENT) { 3751 if (ddsf & DDSF_NO_RESYNC) 3752 drbd_info(device, "Resync of new storage suppressed with --assume-clean\n"); 3753 else 3754 resync_after_online_grow(device); 3755 } else 3756 set_bit(RESYNC_AFTER_NEG, &device->flags); 3757 } 3758 } 3759 3760 return 0; 3761 } 3762 3763 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi) 3764 { 3765 struct drbd_peer_device *peer_device; 3766 struct drbd_device *device; 3767 struct p_uuids *p = pi->data; 3768 u64 *p_uuid; 3769 int i, updated_uuids = 0; 3770 3771 peer_device = conn_peer_device(connection, pi->vnr); 3772 if (!peer_device) 3773 return config_unknown_volume(connection, pi); 3774 device = peer_device->device; 3775 3776 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO); 3777 if (!p_uuid) { 3778 drbd_err(device, "kmalloc of p_uuid failed\n"); 3779 return false; 3780 } 3781 3782 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++) 3783 p_uuid[i] = be64_to_cpu(p->uuid[i]); 3784 3785 kfree(device->p_uuid); 3786 device->p_uuid = p_uuid; 3787 3788 if (device->state.conn < C_CONNECTED && 3789 device->state.disk < D_INCONSISTENT && 3790 device->state.role == R_PRIMARY && 3791 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) { 3792 drbd_err(device, "Can only connect to data with current UUID=%016llX\n", 3793 (unsigned long long)device->ed_uuid); 3794 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 3795 return -EIO; 3796 } 3797 3798 if (get_ldev(device)) { 3799 int skip_initial_sync = 3800 device->state.conn == C_CONNECTED && 3801 peer_device->connection->agreed_pro_version >= 90 && 3802 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED && 3803 (p_uuid[UI_FLAGS] & 8); 3804 if (skip_initial_sync) { 3805 drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n"); 3806 drbd_bitmap_io(device, &drbd_bmio_clear_n_write, 3807 "clear_n_write from receive_uuids", 3808 BM_LOCKED_TEST_ALLOWED); 3809 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]); 3810 _drbd_uuid_set(device, UI_BITMAP, 0); 3811 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE), 3812 CS_VERBOSE, NULL); 3813 drbd_md_sync(device); 3814 updated_uuids = 1; 3815 } 3816 put_ldev(device); 3817 } else if (device->state.disk < D_INCONSISTENT && 3818 device->state.role == R_PRIMARY) { 3819 /* I am a diskless primary, the peer just created a new current UUID 3820 for me. */ 3821 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]); 3822 } 3823 3824 /* Before we test for the disk state, we should wait until an eventually 3825 ongoing cluster wide state change is finished. That is important if 3826 we are primary and are detaching from our disk. We need to see the 3827 new disk state... */ 3828 mutex_lock(device->state_mutex); 3829 mutex_unlock(device->state_mutex); 3830 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT) 3831 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]); 3832 3833 if (updated_uuids) 3834 drbd_print_uuids(device, "receiver updated UUIDs to"); 3835 3836 return 0; 3837 } 3838 3839 /** 3840 * convert_state() - Converts the peer's view of the cluster state to our point of view 3841 * @ps: The state as seen by the peer. 3842 */ 3843 static union drbd_state convert_state(union drbd_state ps) 3844 { 3845 union drbd_state ms; 3846 3847 static enum drbd_conns c_tab[] = { 3848 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS, 3849 [C_CONNECTED] = C_CONNECTED, 3850 3851 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T, 3852 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S, 3853 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */ 3854 [C_VERIFY_S] = C_VERIFY_T, 3855 [C_MASK] = C_MASK, 3856 }; 3857 3858 ms.i = ps.i; 3859 3860 ms.conn = c_tab[ps.conn]; 3861 ms.peer = ps.role; 3862 ms.role = ps.peer; 3863 ms.pdsk = ps.disk; 3864 ms.disk = ps.pdsk; 3865 ms.peer_isp = (ps.aftr_isp | ps.user_isp); 3866 3867 return ms; 3868 } 3869 3870 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi) 3871 { 3872 struct drbd_peer_device *peer_device; 3873 struct drbd_device *device; 3874 struct p_req_state *p = pi->data; 3875 union drbd_state mask, val; 3876 enum drbd_state_rv rv; 3877 3878 peer_device = conn_peer_device(connection, pi->vnr); 3879 if (!peer_device) 3880 return -EIO; 3881 device = peer_device->device; 3882 3883 mask.i = be32_to_cpu(p->mask); 3884 val.i = be32_to_cpu(p->val); 3885 3886 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) && 3887 mutex_is_locked(device->state_mutex)) { 3888 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG); 3889 return 0; 3890 } 3891 3892 mask = convert_state(mask); 3893 val = convert_state(val); 3894 3895 rv = drbd_change_state(device, CS_VERBOSE, mask, val); 3896 drbd_send_sr_reply(peer_device, rv); 3897 3898 drbd_md_sync(device); 3899 3900 return 0; 3901 } 3902 3903 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi) 3904 { 3905 struct p_req_state *p = pi->data; 3906 union drbd_state mask, val; 3907 enum drbd_state_rv rv; 3908 3909 mask.i = be32_to_cpu(p->mask); 3910 val.i = be32_to_cpu(p->val); 3911 3912 if (test_bit(RESOLVE_CONFLICTS, &connection->flags) && 3913 mutex_is_locked(&connection->cstate_mutex)) { 3914 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG); 3915 return 0; 3916 } 3917 3918 mask = convert_state(mask); 3919 val = convert_state(val); 3920 3921 rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL); 3922 conn_send_sr_reply(connection, rv); 3923 3924 return 0; 3925 } 3926 3927 static int receive_state(struct drbd_connection *connection, struct packet_info *pi) 3928 { 3929 struct drbd_peer_device *peer_device; 3930 struct drbd_device *device; 3931 struct p_state *p = pi->data; 3932 union drbd_state os, ns, peer_state; 3933 enum drbd_disk_state real_peer_disk; 3934 enum chg_state_flags cs_flags; 3935 int rv; 3936 3937 peer_device = conn_peer_device(connection, pi->vnr); 3938 if (!peer_device) 3939 return config_unknown_volume(connection, pi); 3940 device = peer_device->device; 3941 3942 peer_state.i = be32_to_cpu(p->state); 3943 3944 real_peer_disk = peer_state.disk; 3945 if (peer_state.disk == D_NEGOTIATING) { 3946 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT; 3947 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk)); 3948 } 3949 3950 spin_lock_irq(&device->resource->req_lock); 3951 retry: 3952 os = ns = drbd_read_state(device); 3953 spin_unlock_irq(&device->resource->req_lock); 3954 3955 /* If some other part of the code (asender thread, timeout) 3956 * already decided to close the connection again, 3957 * we must not "re-establish" it here. */ 3958 if (os.conn <= C_TEAR_DOWN) 3959 return -ECONNRESET; 3960 3961 /* If this is the "end of sync" confirmation, usually the peer disk 3962 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits 3963 * set) resync started in PausedSyncT, or if the timing of pause-/ 3964 * unpause-sync events has been "just right", the peer disk may 3965 * transition from D_CONSISTENT to D_UP_TO_DATE as well. 3966 */ 3967 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) && 3968 real_peer_disk == D_UP_TO_DATE && 3969 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) { 3970 /* If we are (becoming) SyncSource, but peer is still in sync 3971 * preparation, ignore its uptodate-ness to avoid flapping, it 3972 * will change to inconsistent once the peer reaches active 3973 * syncing states. 3974 * It may have changed syncer-paused flags, however, so we 3975 * cannot ignore this completely. */ 3976 if (peer_state.conn > C_CONNECTED && 3977 peer_state.conn < C_SYNC_SOURCE) 3978 real_peer_disk = D_INCONSISTENT; 3979 3980 /* if peer_state changes to connected at the same time, 3981 * it explicitly notifies us that it finished resync. 3982 * Maybe we should finish it up, too? */ 3983 else if (os.conn >= C_SYNC_SOURCE && 3984 peer_state.conn == C_CONNECTED) { 3985 if (drbd_bm_total_weight(device) <= device->rs_failed) 3986 drbd_resync_finished(device); 3987 return 0; 3988 } 3989 } 3990 3991 /* explicit verify finished notification, stop sector reached. */ 3992 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE && 3993 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) { 3994 ov_out_of_sync_print(device); 3995 drbd_resync_finished(device); 3996 return 0; 3997 } 3998 3999 /* peer says his disk is inconsistent, while we think it is uptodate, 4000 * and this happens while the peer still thinks we have a sync going on, 4001 * but we think we are already done with the sync. 4002 * We ignore this to avoid flapping pdsk. 4003 * This should not happen, if the peer is a recent version of drbd. */ 4004 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT && 4005 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE) 4006 real_peer_disk = D_UP_TO_DATE; 4007 4008 if (ns.conn == C_WF_REPORT_PARAMS) 4009 ns.conn = C_CONNECTED; 4010 4011 if (peer_state.conn == C_AHEAD) 4012 ns.conn = C_BEHIND; 4013 4014 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING && 4015 get_ldev_if_state(device, D_NEGOTIATING)) { 4016 int cr; /* consider resync */ 4017 4018 /* if we established a new connection */ 4019 cr = (os.conn < C_CONNECTED); 4020 /* if we had an established connection 4021 * and one of the nodes newly attaches a disk */ 4022 cr |= (os.conn == C_CONNECTED && 4023 (peer_state.disk == D_NEGOTIATING || 4024 os.disk == D_NEGOTIATING)); 4025 /* if we have both been inconsistent, and the peer has been 4026 * forced to be UpToDate with --overwrite-data */ 4027 cr |= test_bit(CONSIDER_RESYNC, &device->flags); 4028 /* if we had been plain connected, and the admin requested to 4029 * start a sync by "invalidate" or "invalidate-remote" */ 4030 cr |= (os.conn == C_CONNECTED && 4031 (peer_state.conn >= C_STARTING_SYNC_S && 4032 peer_state.conn <= C_WF_BITMAP_T)); 4033 4034 if (cr) 4035 ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk); 4036 4037 put_ldev(device); 4038 if (ns.conn == C_MASK) { 4039 ns.conn = C_CONNECTED; 4040 if (device->state.disk == D_NEGOTIATING) { 4041 drbd_force_state(device, NS(disk, D_FAILED)); 4042 } else if (peer_state.disk == D_NEGOTIATING) { 4043 drbd_err(device, "Disk attach process on the peer node was aborted.\n"); 4044 peer_state.disk = D_DISKLESS; 4045 real_peer_disk = D_DISKLESS; 4046 } else { 4047 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags)) 4048 return -EIO; 4049 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS); 4050 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 4051 return -EIO; 4052 } 4053 } 4054 } 4055 4056 spin_lock_irq(&device->resource->req_lock); 4057 if (os.i != drbd_read_state(device).i) 4058 goto retry; 4059 clear_bit(CONSIDER_RESYNC, &device->flags); 4060 ns.peer = peer_state.role; 4061 ns.pdsk = real_peer_disk; 4062 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp); 4063 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING) 4064 ns.disk = device->new_state_tmp.disk; 4065 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD); 4066 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED && 4067 test_bit(NEW_CUR_UUID, &device->flags)) { 4068 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this 4069 for temporal network outages! */ 4070 spin_unlock_irq(&device->resource->req_lock); 4071 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n"); 4072 tl_clear(peer_device->connection); 4073 drbd_uuid_new_current(device); 4074 clear_bit(NEW_CUR_UUID, &device->flags); 4075 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD); 4076 return -EIO; 4077 } 4078 rv = _drbd_set_state(device, ns, cs_flags, NULL); 4079 ns = drbd_read_state(device); 4080 spin_unlock_irq(&device->resource->req_lock); 4081 4082 if (rv < SS_SUCCESS) { 4083 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 4084 return -EIO; 4085 } 4086 4087 if (os.conn > C_WF_REPORT_PARAMS) { 4088 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED && 4089 peer_state.disk != D_NEGOTIATING ) { 4090 /* we want resync, peer has not yet decided to sync... */ 4091 /* Nowadays only used when forcing a node into primary role and 4092 setting its disk to UpToDate with that */ 4093 drbd_send_uuids(peer_device); 4094 drbd_send_current_state(peer_device); 4095 } 4096 } 4097 4098 clear_bit(DISCARD_MY_DATA, &device->flags); 4099 4100 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */ 4101 4102 return 0; 4103 } 4104 4105 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi) 4106 { 4107 struct drbd_peer_device *peer_device; 4108 struct drbd_device *device; 4109 struct p_rs_uuid *p = pi->data; 4110 4111 peer_device = conn_peer_device(connection, pi->vnr); 4112 if (!peer_device) 4113 return -EIO; 4114 device = peer_device->device; 4115 4116 wait_event(device->misc_wait, 4117 device->state.conn == C_WF_SYNC_UUID || 4118 device->state.conn == C_BEHIND || 4119 device->state.conn < C_CONNECTED || 4120 device->state.disk < D_NEGOTIATING); 4121 4122 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */ 4123 4124 /* Here the _drbd_uuid_ functions are right, current should 4125 _not_ be rotated into the history */ 4126 if (get_ldev_if_state(device, D_NEGOTIATING)) { 4127 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid)); 4128 _drbd_uuid_set(device, UI_BITMAP, 0UL); 4129 4130 drbd_print_uuids(device, "updated sync uuid"); 4131 drbd_start_resync(device, C_SYNC_TARGET); 4132 4133 put_ldev(device); 4134 } else 4135 drbd_err(device, "Ignoring SyncUUID packet!\n"); 4136 4137 return 0; 4138 } 4139 4140 /** 4141 * receive_bitmap_plain 4142 * 4143 * Return 0 when done, 1 when another iteration is needed, and a negative error 4144 * code upon failure. 4145 */ 4146 static int 4147 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size, 4148 unsigned long *p, struct bm_xfer_ctx *c) 4149 { 4150 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - 4151 drbd_header_size(peer_device->connection); 4152 unsigned int num_words = min_t(size_t, data_size / sizeof(*p), 4153 c->bm_words - c->word_offset); 4154 unsigned int want = num_words * sizeof(*p); 4155 int err; 4156 4157 if (want != size) { 4158 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size); 4159 return -EIO; 4160 } 4161 if (want == 0) 4162 return 0; 4163 err = drbd_recv_all(peer_device->connection, p, want); 4164 if (err) 4165 return err; 4166 4167 drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p); 4168 4169 c->word_offset += num_words; 4170 c->bit_offset = c->word_offset * BITS_PER_LONG; 4171 if (c->bit_offset > c->bm_bits) 4172 c->bit_offset = c->bm_bits; 4173 4174 return 1; 4175 } 4176 4177 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p) 4178 { 4179 return (enum drbd_bitmap_code)(p->encoding & 0x0f); 4180 } 4181 4182 static int dcbp_get_start(struct p_compressed_bm *p) 4183 { 4184 return (p->encoding & 0x80) != 0; 4185 } 4186 4187 static int dcbp_get_pad_bits(struct p_compressed_bm *p) 4188 { 4189 return (p->encoding >> 4) & 0x7; 4190 } 4191 4192 /** 4193 * recv_bm_rle_bits 4194 * 4195 * Return 0 when done, 1 when another iteration is needed, and a negative error 4196 * code upon failure. 4197 */ 4198 static int 4199 recv_bm_rle_bits(struct drbd_peer_device *peer_device, 4200 struct p_compressed_bm *p, 4201 struct bm_xfer_ctx *c, 4202 unsigned int len) 4203 { 4204 struct bitstream bs; 4205 u64 look_ahead; 4206 u64 rl; 4207 u64 tmp; 4208 unsigned long s = c->bit_offset; 4209 unsigned long e; 4210 int toggle = dcbp_get_start(p); 4211 int have; 4212 int bits; 4213 4214 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p)); 4215 4216 bits = bitstream_get_bits(&bs, &look_ahead, 64); 4217 if (bits < 0) 4218 return -EIO; 4219 4220 for (have = bits; have > 0; s += rl, toggle = !toggle) { 4221 bits = vli_decode_bits(&rl, look_ahead); 4222 if (bits <= 0) 4223 return -EIO; 4224 4225 if (toggle) { 4226 e = s + rl -1; 4227 if (e >= c->bm_bits) { 4228 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e); 4229 return -EIO; 4230 } 4231 _drbd_bm_set_bits(peer_device->device, s, e); 4232 } 4233 4234 if (have < bits) { 4235 drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n", 4236 have, bits, look_ahead, 4237 (unsigned int)(bs.cur.b - p->code), 4238 (unsigned int)bs.buf_len); 4239 return -EIO; 4240 } 4241 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */ 4242 if (likely(bits < 64)) 4243 look_ahead >>= bits; 4244 else 4245 look_ahead = 0; 4246 have -= bits; 4247 4248 bits = bitstream_get_bits(&bs, &tmp, 64 - have); 4249 if (bits < 0) 4250 return -EIO; 4251 look_ahead |= tmp << have; 4252 have += bits; 4253 } 4254 4255 c->bit_offset = s; 4256 bm_xfer_ctx_bit_to_word_offset(c); 4257 4258 return (s != c->bm_bits); 4259 } 4260 4261 /** 4262 * decode_bitmap_c 4263 * 4264 * Return 0 when done, 1 when another iteration is needed, and a negative error 4265 * code upon failure. 4266 */ 4267 static int 4268 decode_bitmap_c(struct drbd_peer_device *peer_device, 4269 struct p_compressed_bm *p, 4270 struct bm_xfer_ctx *c, 4271 unsigned int len) 4272 { 4273 if (dcbp_get_code(p) == RLE_VLI_Bits) 4274 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p)); 4275 4276 /* other variants had been implemented for evaluation, 4277 * but have been dropped as this one turned out to be "best" 4278 * during all our tests. */ 4279 4280 drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding); 4281 conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD); 4282 return -EIO; 4283 } 4284 4285 void INFO_bm_xfer_stats(struct drbd_device *device, 4286 const char *direction, struct bm_xfer_ctx *c) 4287 { 4288 /* what would it take to transfer it "plaintext" */ 4289 unsigned int header_size = drbd_header_size(first_peer_device(device)->connection); 4290 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size; 4291 unsigned int plain = 4292 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) + 4293 c->bm_words * sizeof(unsigned long); 4294 unsigned int total = c->bytes[0] + c->bytes[1]; 4295 unsigned int r; 4296 4297 /* total can not be zero. but just in case: */ 4298 if (total == 0) 4299 return; 4300 4301 /* don't report if not compressed */ 4302 if (total >= plain) 4303 return; 4304 4305 /* total < plain. check for overflow, still */ 4306 r = (total > UINT_MAX/1000) ? (total / (plain/1000)) 4307 : (1000 * total / plain); 4308 4309 if (r > 1000) 4310 r = 1000; 4311 4312 r = 1000 - r; 4313 drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), " 4314 "total %u; compression: %u.%u%%\n", 4315 direction, 4316 c->bytes[1], c->packets[1], 4317 c->bytes[0], c->packets[0], 4318 total, r/10, r % 10); 4319 } 4320 4321 /* Since we are processing the bitfield from lower addresses to higher, 4322 it does not matter if the process it in 32 bit chunks or 64 bit 4323 chunks as long as it is little endian. (Understand it as byte stream, 4324 beginning with the lowest byte...) If we would use big endian 4325 we would need to process it from the highest address to the lowest, 4326 in order to be agnostic to the 32 vs 64 bits issue. 4327 4328 returns 0 on failure, 1 if we successfully received it. */ 4329 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi) 4330 { 4331 struct drbd_peer_device *peer_device; 4332 struct drbd_device *device; 4333 struct bm_xfer_ctx c; 4334 int err; 4335 4336 peer_device = conn_peer_device(connection, pi->vnr); 4337 if (!peer_device) 4338 return -EIO; 4339 device = peer_device->device; 4340 4341 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED); 4342 /* you are supposed to send additional out-of-sync information 4343 * if you actually set bits during this phase */ 4344 4345 c = (struct bm_xfer_ctx) { 4346 .bm_bits = drbd_bm_bits(device), 4347 .bm_words = drbd_bm_words(device), 4348 }; 4349 4350 for(;;) { 4351 if (pi->cmd == P_BITMAP) 4352 err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c); 4353 else if (pi->cmd == P_COMPRESSED_BITMAP) { 4354 /* MAYBE: sanity check that we speak proto >= 90, 4355 * and the feature is enabled! */ 4356 struct p_compressed_bm *p = pi->data; 4357 4358 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) { 4359 drbd_err(device, "ReportCBitmap packet too large\n"); 4360 err = -EIO; 4361 goto out; 4362 } 4363 if (pi->size <= sizeof(*p)) { 4364 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size); 4365 err = -EIO; 4366 goto out; 4367 } 4368 err = drbd_recv_all(peer_device->connection, p, pi->size); 4369 if (err) 4370 goto out; 4371 err = decode_bitmap_c(peer_device, p, &c, pi->size); 4372 } else { 4373 drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd); 4374 err = -EIO; 4375 goto out; 4376 } 4377 4378 c.packets[pi->cmd == P_BITMAP]++; 4379 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size; 4380 4381 if (err <= 0) { 4382 if (err < 0) 4383 goto out; 4384 break; 4385 } 4386 err = drbd_recv_header(peer_device->connection, pi); 4387 if (err) 4388 goto out; 4389 } 4390 4391 INFO_bm_xfer_stats(device, "receive", &c); 4392 4393 if (device->state.conn == C_WF_BITMAP_T) { 4394 enum drbd_state_rv rv; 4395 4396 err = drbd_send_bitmap(device); 4397 if (err) 4398 goto out; 4399 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */ 4400 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE); 4401 D_ASSERT(device, rv == SS_SUCCESS); 4402 } else if (device->state.conn != C_WF_BITMAP_S) { 4403 /* admin may have requested C_DISCONNECTING, 4404 * other threads may have noticed network errors */ 4405 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n", 4406 drbd_conn_str(device->state.conn)); 4407 } 4408 err = 0; 4409 4410 out: 4411 drbd_bm_unlock(device); 4412 if (!err && device->state.conn == C_WF_BITMAP_S) 4413 drbd_start_resync(device, C_SYNC_SOURCE); 4414 return err; 4415 } 4416 4417 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi) 4418 { 4419 drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n", 4420 pi->cmd, pi->size); 4421 4422 return ignore_remaining_packet(connection, pi); 4423 } 4424 4425 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi) 4426 { 4427 /* Make sure we've acked all the TCP data associated 4428 * with the data requests being unplugged */ 4429 drbd_tcp_quickack(connection->data.socket); 4430 4431 return 0; 4432 } 4433 4434 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi) 4435 { 4436 struct drbd_peer_device *peer_device; 4437 struct drbd_device *device; 4438 struct p_block_desc *p = pi->data; 4439 4440 peer_device = conn_peer_device(connection, pi->vnr); 4441 if (!peer_device) 4442 return -EIO; 4443 device = peer_device->device; 4444 4445 switch (device->state.conn) { 4446 case C_WF_SYNC_UUID: 4447 case C_WF_BITMAP_T: 4448 case C_BEHIND: 4449 break; 4450 default: 4451 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n", 4452 drbd_conn_str(device->state.conn)); 4453 } 4454 4455 drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize)); 4456 4457 return 0; 4458 } 4459 4460 struct data_cmd { 4461 int expect_payload; 4462 size_t pkt_size; 4463 int (*fn)(struct drbd_connection *, struct packet_info *); 4464 }; 4465 4466 static struct data_cmd drbd_cmd_handler[] = { 4467 [P_DATA] = { 1, sizeof(struct p_data), receive_Data }, 4468 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply }, 4469 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } , 4470 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } , 4471 [P_BITMAP] = { 1, 0, receive_bitmap } , 4472 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } , 4473 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote }, 4474 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4475 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4476 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam }, 4477 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam }, 4478 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol }, 4479 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids }, 4480 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes }, 4481 [P_STATE] = { 0, sizeof(struct p_state), receive_state }, 4482 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state }, 4483 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid }, 4484 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4485 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest }, 4486 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest }, 4487 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip }, 4488 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync }, 4489 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state }, 4490 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol }, 4491 [P_TRIM] = { 0, sizeof(struct p_trim), receive_Data }, 4492 }; 4493 4494 static void drbdd(struct drbd_connection *connection) 4495 { 4496 struct packet_info pi; 4497 size_t shs; /* sub header size */ 4498 int err; 4499 4500 while (get_t_state(&connection->receiver) == RUNNING) { 4501 struct data_cmd *cmd; 4502 4503 drbd_thread_current_set_cpu(&connection->receiver); 4504 if (drbd_recv_header(connection, &pi)) 4505 goto err_out; 4506 4507 cmd = &drbd_cmd_handler[pi.cmd]; 4508 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) { 4509 drbd_err(connection, "Unexpected data packet %s (0x%04x)", 4510 cmdname(pi.cmd), pi.cmd); 4511 goto err_out; 4512 } 4513 4514 shs = cmd->pkt_size; 4515 if (pi.size > shs && !cmd->expect_payload) { 4516 drbd_err(connection, "No payload expected %s l:%d\n", 4517 cmdname(pi.cmd), pi.size); 4518 goto err_out; 4519 } 4520 4521 if (shs) { 4522 err = drbd_recv_all_warn(connection, pi.data, shs); 4523 if (err) 4524 goto err_out; 4525 pi.size -= shs; 4526 } 4527 4528 err = cmd->fn(connection, &pi); 4529 if (err) { 4530 drbd_err(connection, "error receiving %s, e: %d l: %d!\n", 4531 cmdname(pi.cmd), err, pi.size); 4532 goto err_out; 4533 } 4534 } 4535 return; 4536 4537 err_out: 4538 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD); 4539 } 4540 4541 static void conn_disconnect(struct drbd_connection *connection) 4542 { 4543 struct drbd_peer_device *peer_device; 4544 enum drbd_conns oc; 4545 int vnr; 4546 4547 if (connection->cstate == C_STANDALONE) 4548 return; 4549 4550 /* We are about to start the cleanup after connection loss. 4551 * Make sure drbd_make_request knows about that. 4552 * Usually we should be in some network failure state already, 4553 * but just in case we are not, we fix it up here. 4554 */ 4555 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 4556 4557 /* asender does not clean up anything. it must not interfere, either */ 4558 drbd_thread_stop(&connection->asender); 4559 drbd_free_sock(connection); 4560 4561 rcu_read_lock(); 4562 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 4563 struct drbd_device *device = peer_device->device; 4564 kref_get(&device->kref); 4565 rcu_read_unlock(); 4566 drbd_disconnected(peer_device); 4567 kref_put(&device->kref, drbd_destroy_device); 4568 rcu_read_lock(); 4569 } 4570 rcu_read_unlock(); 4571 4572 if (!list_empty(&connection->current_epoch->list)) 4573 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n"); 4574 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */ 4575 atomic_set(&connection->current_epoch->epoch_size, 0); 4576 connection->send.seen_any_write_yet = false; 4577 4578 drbd_info(connection, "Connection closed\n"); 4579 4580 if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN) 4581 conn_try_outdate_peer_async(connection); 4582 4583 spin_lock_irq(&connection->resource->req_lock); 4584 oc = connection->cstate; 4585 if (oc >= C_UNCONNECTED) 4586 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE); 4587 4588 spin_unlock_irq(&connection->resource->req_lock); 4589 4590 if (oc == C_DISCONNECTING) 4591 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD); 4592 } 4593 4594 static int drbd_disconnected(struct drbd_peer_device *peer_device) 4595 { 4596 struct drbd_device *device = peer_device->device; 4597 unsigned int i; 4598 4599 /* wait for current activity to cease. */ 4600 spin_lock_irq(&device->resource->req_lock); 4601 _drbd_wait_ee_list_empty(device, &device->active_ee); 4602 _drbd_wait_ee_list_empty(device, &device->sync_ee); 4603 _drbd_wait_ee_list_empty(device, &device->read_ee); 4604 spin_unlock_irq(&device->resource->req_lock); 4605 4606 /* We do not have data structures that would allow us to 4607 * get the rs_pending_cnt down to 0 again. 4608 * * On C_SYNC_TARGET we do not have any data structures describing 4609 * the pending RSDataRequest's we have sent. 4610 * * On C_SYNC_SOURCE there is no data structure that tracks 4611 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget. 4612 * And no, it is not the sum of the reference counts in the 4613 * resync_LRU. The resync_LRU tracks the whole operation including 4614 * the disk-IO, while the rs_pending_cnt only tracks the blocks 4615 * on the fly. */ 4616 drbd_rs_cancel_all(device); 4617 device->rs_total = 0; 4618 device->rs_failed = 0; 4619 atomic_set(&device->rs_pending_cnt, 0); 4620 wake_up(&device->misc_wait); 4621 4622 del_timer_sync(&device->resync_timer); 4623 resync_timer_fn((unsigned long)device); 4624 4625 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier, 4626 * w_make_resync_request etc. which may still be on the worker queue 4627 * to be "canceled" */ 4628 drbd_flush_workqueue(&peer_device->connection->sender_work); 4629 4630 drbd_finish_peer_reqs(device); 4631 4632 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs() 4633 might have issued a work again. The one before drbd_finish_peer_reqs() is 4634 necessary to reclain net_ee in drbd_finish_peer_reqs(). */ 4635 drbd_flush_workqueue(&peer_device->connection->sender_work); 4636 4637 /* need to do it again, drbd_finish_peer_reqs() may have populated it 4638 * again via drbd_try_clear_on_disk_bm(). */ 4639 drbd_rs_cancel_all(device); 4640 4641 kfree(device->p_uuid); 4642 device->p_uuid = NULL; 4643 4644 if (!drbd_suspended(device)) 4645 tl_clear(peer_device->connection); 4646 4647 drbd_md_sync(device); 4648 4649 /* serialize with bitmap writeout triggered by the state change, 4650 * if any. */ 4651 wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags)); 4652 4653 /* tcp_close and release of sendpage pages can be deferred. I don't 4654 * want to use SO_LINGER, because apparently it can be deferred for 4655 * more than 20 seconds (longest time I checked). 4656 * 4657 * Actually we don't care for exactly when the network stack does its 4658 * put_page(), but release our reference on these pages right here. 4659 */ 4660 i = drbd_free_peer_reqs(device, &device->net_ee); 4661 if (i) 4662 drbd_info(device, "net_ee not empty, killed %u entries\n", i); 4663 i = atomic_read(&device->pp_in_use_by_net); 4664 if (i) 4665 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i); 4666 i = atomic_read(&device->pp_in_use); 4667 if (i) 4668 drbd_info(device, "pp_in_use = %d, expected 0\n", i); 4669 4670 D_ASSERT(device, list_empty(&device->read_ee)); 4671 D_ASSERT(device, list_empty(&device->active_ee)); 4672 D_ASSERT(device, list_empty(&device->sync_ee)); 4673 D_ASSERT(device, list_empty(&device->done_ee)); 4674 4675 return 0; 4676 } 4677 4678 /* 4679 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version 4680 * we can agree on is stored in agreed_pro_version. 4681 * 4682 * feature flags and the reserved array should be enough room for future 4683 * enhancements of the handshake protocol, and possible plugins... 4684 * 4685 * for now, they are expected to be zero, but ignored. 4686 */ 4687 static int drbd_send_features(struct drbd_connection *connection) 4688 { 4689 struct drbd_socket *sock; 4690 struct p_connection_features *p; 4691 4692 sock = &connection->data; 4693 p = conn_prepare_command(connection, sock); 4694 if (!p) 4695 return -EIO; 4696 memset(p, 0, sizeof(*p)); 4697 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN); 4698 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX); 4699 p->feature_flags = cpu_to_be32(PRO_FEATURES); 4700 return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0); 4701 } 4702 4703 /* 4704 * return values: 4705 * 1 yes, we have a valid connection 4706 * 0 oops, did not work out, please try again 4707 * -1 peer talks different language, 4708 * no point in trying again, please go standalone. 4709 */ 4710 static int drbd_do_features(struct drbd_connection *connection) 4711 { 4712 /* ASSERT current == connection->receiver ... */ 4713 struct p_connection_features *p; 4714 const int expect = sizeof(struct p_connection_features); 4715 struct packet_info pi; 4716 int err; 4717 4718 err = drbd_send_features(connection); 4719 if (err) 4720 return 0; 4721 4722 err = drbd_recv_header(connection, &pi); 4723 if (err) 4724 return 0; 4725 4726 if (pi.cmd != P_CONNECTION_FEATURES) { 4727 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n", 4728 cmdname(pi.cmd), pi.cmd); 4729 return -1; 4730 } 4731 4732 if (pi.size != expect) { 4733 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n", 4734 expect, pi.size); 4735 return -1; 4736 } 4737 4738 p = pi.data; 4739 err = drbd_recv_all_warn(connection, p, expect); 4740 if (err) 4741 return 0; 4742 4743 p->protocol_min = be32_to_cpu(p->protocol_min); 4744 p->protocol_max = be32_to_cpu(p->protocol_max); 4745 if (p->protocol_max == 0) 4746 p->protocol_max = p->protocol_min; 4747 4748 if (PRO_VERSION_MAX < p->protocol_min || 4749 PRO_VERSION_MIN > p->protocol_max) 4750 goto incompat; 4751 4752 connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max); 4753 connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags); 4754 4755 drbd_info(connection, "Handshake successful: " 4756 "Agreed network protocol version %d\n", connection->agreed_pro_version); 4757 4758 drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n", 4759 connection->agreed_features & FF_TRIM ? " " : " not "); 4760 4761 return 1; 4762 4763 incompat: 4764 drbd_err(connection, "incompatible DRBD dialects: " 4765 "I support %d-%d, peer supports %d-%d\n", 4766 PRO_VERSION_MIN, PRO_VERSION_MAX, 4767 p->protocol_min, p->protocol_max); 4768 return -1; 4769 } 4770 4771 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE) 4772 static int drbd_do_auth(struct drbd_connection *connection) 4773 { 4774 drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n"); 4775 drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n"); 4776 return -1; 4777 } 4778 #else 4779 #define CHALLENGE_LEN 64 4780 4781 /* Return value: 4782 1 - auth succeeded, 4783 0 - failed, try again (network error), 4784 -1 - auth failed, don't try again. 4785 */ 4786 4787 static int drbd_do_auth(struct drbd_connection *connection) 4788 { 4789 struct drbd_socket *sock; 4790 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */ 4791 struct scatterlist sg; 4792 char *response = NULL; 4793 char *right_response = NULL; 4794 char *peers_ch = NULL; 4795 unsigned int key_len; 4796 char secret[SHARED_SECRET_MAX]; /* 64 byte */ 4797 unsigned int resp_size; 4798 struct hash_desc desc; 4799 struct packet_info pi; 4800 struct net_conf *nc; 4801 int err, rv; 4802 4803 /* FIXME: Put the challenge/response into the preallocated socket buffer. */ 4804 4805 rcu_read_lock(); 4806 nc = rcu_dereference(connection->net_conf); 4807 key_len = strlen(nc->shared_secret); 4808 memcpy(secret, nc->shared_secret, key_len); 4809 rcu_read_unlock(); 4810 4811 desc.tfm = connection->cram_hmac_tfm; 4812 desc.flags = 0; 4813 4814 rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len); 4815 if (rv) { 4816 drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv); 4817 rv = -1; 4818 goto fail; 4819 } 4820 4821 get_random_bytes(my_challenge, CHALLENGE_LEN); 4822 4823 sock = &connection->data; 4824 if (!conn_prepare_command(connection, sock)) { 4825 rv = 0; 4826 goto fail; 4827 } 4828 rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0, 4829 my_challenge, CHALLENGE_LEN); 4830 if (!rv) 4831 goto fail; 4832 4833 err = drbd_recv_header(connection, &pi); 4834 if (err) { 4835 rv = 0; 4836 goto fail; 4837 } 4838 4839 if (pi.cmd != P_AUTH_CHALLENGE) { 4840 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n", 4841 cmdname(pi.cmd), pi.cmd); 4842 rv = 0; 4843 goto fail; 4844 } 4845 4846 if (pi.size > CHALLENGE_LEN * 2) { 4847 drbd_err(connection, "expected AuthChallenge payload too big.\n"); 4848 rv = -1; 4849 goto fail; 4850 } 4851 4852 if (pi.size < CHALLENGE_LEN) { 4853 drbd_err(connection, "AuthChallenge payload too small.\n"); 4854 rv = -1; 4855 goto fail; 4856 } 4857 4858 peers_ch = kmalloc(pi.size, GFP_NOIO); 4859 if (peers_ch == NULL) { 4860 drbd_err(connection, "kmalloc of peers_ch failed\n"); 4861 rv = -1; 4862 goto fail; 4863 } 4864 4865 err = drbd_recv_all_warn(connection, peers_ch, pi.size); 4866 if (err) { 4867 rv = 0; 4868 goto fail; 4869 } 4870 4871 if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) { 4872 drbd_err(connection, "Peer presented the same challenge!\n"); 4873 rv = -1; 4874 goto fail; 4875 } 4876 4877 resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm); 4878 response = kmalloc(resp_size, GFP_NOIO); 4879 if (response == NULL) { 4880 drbd_err(connection, "kmalloc of response failed\n"); 4881 rv = -1; 4882 goto fail; 4883 } 4884 4885 sg_init_table(&sg, 1); 4886 sg_set_buf(&sg, peers_ch, pi.size); 4887 4888 rv = crypto_hash_digest(&desc, &sg, sg.length, response); 4889 if (rv) { 4890 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv); 4891 rv = -1; 4892 goto fail; 4893 } 4894 4895 if (!conn_prepare_command(connection, sock)) { 4896 rv = 0; 4897 goto fail; 4898 } 4899 rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0, 4900 response, resp_size); 4901 if (!rv) 4902 goto fail; 4903 4904 err = drbd_recv_header(connection, &pi); 4905 if (err) { 4906 rv = 0; 4907 goto fail; 4908 } 4909 4910 if (pi.cmd != P_AUTH_RESPONSE) { 4911 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n", 4912 cmdname(pi.cmd), pi.cmd); 4913 rv = 0; 4914 goto fail; 4915 } 4916 4917 if (pi.size != resp_size) { 4918 drbd_err(connection, "expected AuthResponse payload of wrong size\n"); 4919 rv = 0; 4920 goto fail; 4921 } 4922 4923 err = drbd_recv_all_warn(connection, response , resp_size); 4924 if (err) { 4925 rv = 0; 4926 goto fail; 4927 } 4928 4929 right_response = kmalloc(resp_size, GFP_NOIO); 4930 if (right_response == NULL) { 4931 drbd_err(connection, "kmalloc of right_response failed\n"); 4932 rv = -1; 4933 goto fail; 4934 } 4935 4936 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN); 4937 4938 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response); 4939 if (rv) { 4940 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv); 4941 rv = -1; 4942 goto fail; 4943 } 4944 4945 rv = !memcmp(response, right_response, resp_size); 4946 4947 if (rv) 4948 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n", 4949 resp_size); 4950 else 4951 rv = -1; 4952 4953 fail: 4954 kfree(peers_ch); 4955 kfree(response); 4956 kfree(right_response); 4957 4958 return rv; 4959 } 4960 #endif 4961 4962 int drbd_receiver(struct drbd_thread *thi) 4963 { 4964 struct drbd_connection *connection = thi->connection; 4965 int h; 4966 4967 drbd_info(connection, "receiver (re)started\n"); 4968 4969 do { 4970 h = conn_connect(connection); 4971 if (h == 0) { 4972 conn_disconnect(connection); 4973 schedule_timeout_interruptible(HZ); 4974 } 4975 if (h == -1) { 4976 drbd_warn(connection, "Discarding network configuration.\n"); 4977 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 4978 } 4979 } while (h == 0); 4980 4981 if (h > 0) 4982 drbdd(connection); 4983 4984 conn_disconnect(connection); 4985 4986 drbd_info(connection, "receiver terminated\n"); 4987 return 0; 4988 } 4989 4990 /* ********* acknowledge sender ******** */ 4991 4992 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi) 4993 { 4994 struct p_req_state_reply *p = pi->data; 4995 int retcode = be32_to_cpu(p->retcode); 4996 4997 if (retcode >= SS_SUCCESS) { 4998 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags); 4999 } else { 5000 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags); 5001 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n", 5002 drbd_set_st_err_str(retcode), retcode); 5003 } 5004 wake_up(&connection->ping_wait); 5005 5006 return 0; 5007 } 5008 5009 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi) 5010 { 5011 struct drbd_peer_device *peer_device; 5012 struct drbd_device *device; 5013 struct p_req_state_reply *p = pi->data; 5014 int retcode = be32_to_cpu(p->retcode); 5015 5016 peer_device = conn_peer_device(connection, pi->vnr); 5017 if (!peer_device) 5018 return -EIO; 5019 device = peer_device->device; 5020 5021 if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) { 5022 D_ASSERT(device, connection->agreed_pro_version < 100); 5023 return got_conn_RqSReply(connection, pi); 5024 } 5025 5026 if (retcode >= SS_SUCCESS) { 5027 set_bit(CL_ST_CHG_SUCCESS, &device->flags); 5028 } else { 5029 set_bit(CL_ST_CHG_FAIL, &device->flags); 5030 drbd_err(device, "Requested state change failed by peer: %s (%d)\n", 5031 drbd_set_st_err_str(retcode), retcode); 5032 } 5033 wake_up(&device->state_wait); 5034 5035 return 0; 5036 } 5037 5038 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi) 5039 { 5040 return drbd_send_ping_ack(connection); 5041 5042 } 5043 5044 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi) 5045 { 5046 /* restore idle timeout */ 5047 connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ; 5048 if (!test_and_set_bit(GOT_PING_ACK, &connection->flags)) 5049 wake_up(&connection->ping_wait); 5050 5051 return 0; 5052 } 5053 5054 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi) 5055 { 5056 struct drbd_peer_device *peer_device; 5057 struct drbd_device *device; 5058 struct p_block_ack *p = pi->data; 5059 sector_t sector = be64_to_cpu(p->sector); 5060 int blksize = be32_to_cpu(p->blksize); 5061 5062 peer_device = conn_peer_device(connection, pi->vnr); 5063 if (!peer_device) 5064 return -EIO; 5065 device = peer_device->device; 5066 5067 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89); 5068 5069 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5070 5071 if (get_ldev(device)) { 5072 drbd_rs_complete_io(device, sector); 5073 drbd_set_in_sync(device, sector, blksize); 5074 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */ 5075 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT); 5076 put_ldev(device); 5077 } 5078 dec_rs_pending(device); 5079 atomic_add(blksize >> 9, &device->rs_sect_in); 5080 5081 return 0; 5082 } 5083 5084 static int 5085 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector, 5086 struct rb_root *root, const char *func, 5087 enum drbd_req_event what, bool missing_ok) 5088 { 5089 struct drbd_request *req; 5090 struct bio_and_error m; 5091 5092 spin_lock_irq(&device->resource->req_lock); 5093 req = find_request(device, root, id, sector, missing_ok, func); 5094 if (unlikely(!req)) { 5095 spin_unlock_irq(&device->resource->req_lock); 5096 return -EIO; 5097 } 5098 __req_mod(req, what, &m); 5099 spin_unlock_irq(&device->resource->req_lock); 5100 5101 if (m.bio) 5102 complete_master_bio(device, &m); 5103 return 0; 5104 } 5105 5106 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi) 5107 { 5108 struct drbd_peer_device *peer_device; 5109 struct drbd_device *device; 5110 struct p_block_ack *p = pi->data; 5111 sector_t sector = be64_to_cpu(p->sector); 5112 int blksize = be32_to_cpu(p->blksize); 5113 enum drbd_req_event what; 5114 5115 peer_device = conn_peer_device(connection, pi->vnr); 5116 if (!peer_device) 5117 return -EIO; 5118 device = peer_device->device; 5119 5120 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5121 5122 if (p->block_id == ID_SYNCER) { 5123 drbd_set_in_sync(device, sector, blksize); 5124 dec_rs_pending(device); 5125 return 0; 5126 } 5127 switch (pi->cmd) { 5128 case P_RS_WRITE_ACK: 5129 what = WRITE_ACKED_BY_PEER_AND_SIS; 5130 break; 5131 case P_WRITE_ACK: 5132 what = WRITE_ACKED_BY_PEER; 5133 break; 5134 case P_RECV_ACK: 5135 what = RECV_ACKED_BY_PEER; 5136 break; 5137 case P_SUPERSEDED: 5138 what = CONFLICT_RESOLVED; 5139 break; 5140 case P_RETRY_WRITE: 5141 what = POSTPONE_WRITE; 5142 break; 5143 default: 5144 BUG(); 5145 } 5146 5147 return validate_req_change_req_state(device, p->block_id, sector, 5148 &device->write_requests, __func__, 5149 what, false); 5150 } 5151 5152 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi) 5153 { 5154 struct drbd_peer_device *peer_device; 5155 struct drbd_device *device; 5156 struct p_block_ack *p = pi->data; 5157 sector_t sector = be64_to_cpu(p->sector); 5158 int size = be32_to_cpu(p->blksize); 5159 int err; 5160 5161 peer_device = conn_peer_device(connection, pi->vnr); 5162 if (!peer_device) 5163 return -EIO; 5164 device = peer_device->device; 5165 5166 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5167 5168 if (p->block_id == ID_SYNCER) { 5169 dec_rs_pending(device); 5170 drbd_rs_failed_io(device, sector, size); 5171 return 0; 5172 } 5173 5174 err = validate_req_change_req_state(device, p->block_id, sector, 5175 &device->write_requests, __func__, 5176 NEG_ACKED, true); 5177 if (err) { 5178 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs. 5179 The master bio might already be completed, therefore the 5180 request is no longer in the collision hash. */ 5181 /* In Protocol B we might already have got a P_RECV_ACK 5182 but then get a P_NEG_ACK afterwards. */ 5183 drbd_set_out_of_sync(device, sector, size); 5184 } 5185 return 0; 5186 } 5187 5188 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi) 5189 { 5190 struct drbd_peer_device *peer_device; 5191 struct drbd_device *device; 5192 struct p_block_ack *p = pi->data; 5193 sector_t sector = be64_to_cpu(p->sector); 5194 5195 peer_device = conn_peer_device(connection, pi->vnr); 5196 if (!peer_device) 5197 return -EIO; 5198 device = peer_device->device; 5199 5200 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5201 5202 drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n", 5203 (unsigned long long)sector, be32_to_cpu(p->blksize)); 5204 5205 return validate_req_change_req_state(device, p->block_id, sector, 5206 &device->read_requests, __func__, 5207 NEG_ACKED, false); 5208 } 5209 5210 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi) 5211 { 5212 struct drbd_peer_device *peer_device; 5213 struct drbd_device *device; 5214 sector_t sector; 5215 int size; 5216 struct p_block_ack *p = pi->data; 5217 5218 peer_device = conn_peer_device(connection, pi->vnr); 5219 if (!peer_device) 5220 return -EIO; 5221 device = peer_device->device; 5222 5223 sector = be64_to_cpu(p->sector); 5224 size = be32_to_cpu(p->blksize); 5225 5226 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5227 5228 dec_rs_pending(device); 5229 5230 if (get_ldev_if_state(device, D_FAILED)) { 5231 drbd_rs_complete_io(device, sector); 5232 switch (pi->cmd) { 5233 case P_NEG_RS_DREPLY: 5234 drbd_rs_failed_io(device, sector, size); 5235 case P_RS_CANCEL: 5236 break; 5237 default: 5238 BUG(); 5239 } 5240 put_ldev(device); 5241 } 5242 5243 return 0; 5244 } 5245 5246 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi) 5247 { 5248 struct p_barrier_ack *p = pi->data; 5249 struct drbd_peer_device *peer_device; 5250 int vnr; 5251 5252 tl_release(connection, p->barrier, be32_to_cpu(p->set_size)); 5253 5254 rcu_read_lock(); 5255 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 5256 struct drbd_device *device = peer_device->device; 5257 5258 if (device->state.conn == C_AHEAD && 5259 atomic_read(&device->ap_in_flight) == 0 && 5260 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) { 5261 device->start_resync_timer.expires = jiffies + HZ; 5262 add_timer(&device->start_resync_timer); 5263 } 5264 } 5265 rcu_read_unlock(); 5266 5267 return 0; 5268 } 5269 5270 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi) 5271 { 5272 struct drbd_peer_device *peer_device; 5273 struct drbd_device *device; 5274 struct p_block_ack *p = pi->data; 5275 struct drbd_device_work *dw; 5276 sector_t sector; 5277 int size; 5278 5279 peer_device = conn_peer_device(connection, pi->vnr); 5280 if (!peer_device) 5281 return -EIO; 5282 device = peer_device->device; 5283 5284 sector = be64_to_cpu(p->sector); 5285 size = be32_to_cpu(p->blksize); 5286 5287 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5288 5289 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC) 5290 drbd_ov_out_of_sync_found(device, sector, size); 5291 else 5292 ov_out_of_sync_print(device); 5293 5294 if (!get_ldev(device)) 5295 return 0; 5296 5297 drbd_rs_complete_io(device, sector); 5298 dec_rs_pending(device); 5299 5300 --device->ov_left; 5301 5302 /* let's advance progress step marks only for every other megabyte */ 5303 if ((device->ov_left & 0x200) == 0x200) 5304 drbd_advance_rs_marks(device, device->ov_left); 5305 5306 if (device->ov_left == 0) { 5307 dw = kmalloc(sizeof(*dw), GFP_NOIO); 5308 if (dw) { 5309 dw->w.cb = w_ov_finished; 5310 dw->device = device; 5311 drbd_queue_work(&peer_device->connection->sender_work, &dw->w); 5312 } else { 5313 drbd_err(device, "kmalloc(dw) failed."); 5314 ov_out_of_sync_print(device); 5315 drbd_resync_finished(device); 5316 } 5317 } 5318 put_ldev(device); 5319 return 0; 5320 } 5321 5322 static int got_skip(struct drbd_connection *connection, struct packet_info *pi) 5323 { 5324 return 0; 5325 } 5326 5327 static int connection_finish_peer_reqs(struct drbd_connection *connection) 5328 { 5329 struct drbd_peer_device *peer_device; 5330 int vnr, not_empty = 0; 5331 5332 do { 5333 clear_bit(SIGNAL_ASENDER, &connection->flags); 5334 flush_signals(current); 5335 5336 rcu_read_lock(); 5337 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 5338 struct drbd_device *device = peer_device->device; 5339 kref_get(&device->kref); 5340 rcu_read_unlock(); 5341 if (drbd_finish_peer_reqs(device)) { 5342 kref_put(&device->kref, drbd_destroy_device); 5343 return 1; 5344 } 5345 kref_put(&device->kref, drbd_destroy_device); 5346 rcu_read_lock(); 5347 } 5348 set_bit(SIGNAL_ASENDER, &connection->flags); 5349 5350 spin_lock_irq(&connection->resource->req_lock); 5351 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 5352 struct drbd_device *device = peer_device->device; 5353 not_empty = !list_empty(&device->done_ee); 5354 if (not_empty) 5355 break; 5356 } 5357 spin_unlock_irq(&connection->resource->req_lock); 5358 rcu_read_unlock(); 5359 } while (not_empty); 5360 5361 return 0; 5362 } 5363 5364 struct asender_cmd { 5365 size_t pkt_size; 5366 int (*fn)(struct drbd_connection *connection, struct packet_info *); 5367 }; 5368 5369 static struct asender_cmd asender_tbl[] = { 5370 [P_PING] = { 0, got_Ping }, 5371 [P_PING_ACK] = { 0, got_PingAck }, 5372 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5373 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5374 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5375 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck }, 5376 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck }, 5377 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply }, 5378 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply }, 5379 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult }, 5380 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck }, 5381 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply }, 5382 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync }, 5383 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip }, 5384 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply }, 5385 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply }, 5386 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck }, 5387 }; 5388 5389 int drbd_asender(struct drbd_thread *thi) 5390 { 5391 struct drbd_connection *connection = thi->connection; 5392 struct asender_cmd *cmd = NULL; 5393 struct packet_info pi; 5394 int rv; 5395 void *buf = connection->meta.rbuf; 5396 int received = 0; 5397 unsigned int header_size = drbd_header_size(connection); 5398 int expect = header_size; 5399 bool ping_timeout_active = false; 5400 struct net_conf *nc; 5401 int ping_timeo, tcp_cork, ping_int; 5402 struct sched_param param = { .sched_priority = 2 }; 5403 5404 rv = sched_setscheduler(current, SCHED_RR, ¶m); 5405 if (rv < 0) 5406 drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv); 5407 5408 while (get_t_state(thi) == RUNNING) { 5409 drbd_thread_current_set_cpu(thi); 5410 5411 rcu_read_lock(); 5412 nc = rcu_dereference(connection->net_conf); 5413 ping_timeo = nc->ping_timeo; 5414 tcp_cork = nc->tcp_cork; 5415 ping_int = nc->ping_int; 5416 rcu_read_unlock(); 5417 5418 if (test_and_clear_bit(SEND_PING, &connection->flags)) { 5419 if (drbd_send_ping(connection)) { 5420 drbd_err(connection, "drbd_send_ping has failed\n"); 5421 goto reconnect; 5422 } 5423 connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10; 5424 ping_timeout_active = true; 5425 } 5426 5427 /* TODO: conditionally cork; it may hurt latency if we cork without 5428 much to send */ 5429 if (tcp_cork) 5430 drbd_tcp_cork(connection->meta.socket); 5431 if (connection_finish_peer_reqs(connection)) { 5432 drbd_err(connection, "connection_finish_peer_reqs() failed\n"); 5433 goto reconnect; 5434 } 5435 /* but unconditionally uncork unless disabled */ 5436 if (tcp_cork) 5437 drbd_tcp_uncork(connection->meta.socket); 5438 5439 /* short circuit, recv_msg would return EINTR anyways. */ 5440 if (signal_pending(current)) 5441 continue; 5442 5443 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0); 5444 clear_bit(SIGNAL_ASENDER, &connection->flags); 5445 5446 flush_signals(current); 5447 5448 /* Note: 5449 * -EINTR (on meta) we got a signal 5450 * -EAGAIN (on meta) rcvtimeo expired 5451 * -ECONNRESET other side closed the connection 5452 * -ERESTARTSYS (on data) we got a signal 5453 * rv < 0 other than above: unexpected error! 5454 * rv == expected: full header or command 5455 * rv < expected: "woken" by signal during receive 5456 * rv == 0 : "connection shut down by peer" 5457 */ 5458 if (likely(rv > 0)) { 5459 received += rv; 5460 buf += rv; 5461 } else if (rv == 0) { 5462 if (test_bit(DISCONNECT_SENT, &connection->flags)) { 5463 long t; 5464 rcu_read_lock(); 5465 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10; 5466 rcu_read_unlock(); 5467 5468 t = wait_event_timeout(connection->ping_wait, 5469 connection->cstate < C_WF_REPORT_PARAMS, 5470 t); 5471 if (t) 5472 break; 5473 } 5474 drbd_err(connection, "meta connection shut down by peer.\n"); 5475 goto reconnect; 5476 } else if (rv == -EAGAIN) { 5477 /* If the data socket received something meanwhile, 5478 * that is good enough: peer is still alive. */ 5479 if (time_after(connection->last_received, 5480 jiffies - connection->meta.socket->sk->sk_rcvtimeo)) 5481 continue; 5482 if (ping_timeout_active) { 5483 drbd_err(connection, "PingAck did not arrive in time.\n"); 5484 goto reconnect; 5485 } 5486 set_bit(SEND_PING, &connection->flags); 5487 continue; 5488 } else if (rv == -EINTR) { 5489 continue; 5490 } else { 5491 drbd_err(connection, "sock_recvmsg returned %d\n", rv); 5492 goto reconnect; 5493 } 5494 5495 if (received == expect && cmd == NULL) { 5496 if (decode_header(connection, connection->meta.rbuf, &pi)) 5497 goto reconnect; 5498 cmd = &asender_tbl[pi.cmd]; 5499 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) { 5500 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n", 5501 cmdname(pi.cmd), pi.cmd); 5502 goto disconnect; 5503 } 5504 expect = header_size + cmd->pkt_size; 5505 if (pi.size != expect - header_size) { 5506 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n", 5507 pi.cmd, pi.size); 5508 goto reconnect; 5509 } 5510 } 5511 if (received == expect) { 5512 bool err; 5513 5514 err = cmd->fn(connection, &pi); 5515 if (err) { 5516 drbd_err(connection, "%pf failed\n", cmd->fn); 5517 goto reconnect; 5518 } 5519 5520 connection->last_received = jiffies; 5521 5522 if (cmd == &asender_tbl[P_PING_ACK]) { 5523 /* restore idle timeout */ 5524 connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ; 5525 ping_timeout_active = false; 5526 } 5527 5528 buf = connection->meta.rbuf; 5529 received = 0; 5530 expect = header_size; 5531 cmd = NULL; 5532 } 5533 } 5534 5535 if (0) { 5536 reconnect: 5537 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 5538 conn_md_sync(connection); 5539 } 5540 if (0) { 5541 disconnect: 5542 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 5543 } 5544 clear_bit(SIGNAL_ASENDER, &connection->flags); 5545 5546 drbd_info(connection, "asender terminated\n"); 5547 5548 return 0; 5549 } 5550