1 /* 2 drbd_receiver.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 */ 24 25 26 #include <linux/module.h> 27 28 #include <asm/uaccess.h> 29 #include <net/sock.h> 30 31 #include <linux/drbd.h> 32 #include <linux/fs.h> 33 #include <linux/file.h> 34 #include <linux/in.h> 35 #include <linux/mm.h> 36 #include <linux/memcontrol.h> 37 #include <linux/mm_inline.h> 38 #include <linux/slab.h> 39 #include <linux/pkt_sched.h> 40 #define __KERNEL_SYSCALLS__ 41 #include <linux/unistd.h> 42 #include <linux/vmalloc.h> 43 #include <linux/random.h> 44 #include <linux/string.h> 45 #include <linux/scatterlist.h> 46 #include "drbd_int.h" 47 #include "drbd_protocol.h" 48 #include "drbd_req.h" 49 #include "drbd_vli.h" 50 51 #define PRO_FEATURES (FF_TRIM) 52 53 struct packet_info { 54 enum drbd_packet cmd; 55 unsigned int size; 56 unsigned int vnr; 57 void *data; 58 }; 59 60 enum finish_epoch { 61 FE_STILL_LIVE, 62 FE_DESTROYED, 63 FE_RECYCLED, 64 }; 65 66 static int drbd_do_features(struct drbd_connection *connection); 67 static int drbd_do_auth(struct drbd_connection *connection); 68 static int drbd_disconnected(struct drbd_peer_device *); 69 static void conn_wait_active_ee_empty(struct drbd_connection *connection); 70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event); 71 static int e_end_block(struct drbd_work *, int); 72 73 74 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 75 76 /* 77 * some helper functions to deal with single linked page lists, 78 * page->private being our "next" pointer. 79 */ 80 81 /* If at least n pages are linked at head, get n pages off. 82 * Otherwise, don't modify head, and return NULL. 83 * Locking is the responsibility of the caller. 84 */ 85 static struct page *page_chain_del(struct page **head, int n) 86 { 87 struct page *page; 88 struct page *tmp; 89 90 BUG_ON(!n); 91 BUG_ON(!head); 92 93 page = *head; 94 95 if (!page) 96 return NULL; 97 98 while (page) { 99 tmp = page_chain_next(page); 100 if (--n == 0) 101 break; /* found sufficient pages */ 102 if (tmp == NULL) 103 /* insufficient pages, don't use any of them. */ 104 return NULL; 105 page = tmp; 106 } 107 108 /* add end of list marker for the returned list */ 109 set_page_private(page, 0); 110 /* actual return value, and adjustment of head */ 111 page = *head; 112 *head = tmp; 113 return page; 114 } 115 116 /* may be used outside of locks to find the tail of a (usually short) 117 * "private" page chain, before adding it back to a global chain head 118 * with page_chain_add() under a spinlock. */ 119 static struct page *page_chain_tail(struct page *page, int *len) 120 { 121 struct page *tmp; 122 int i = 1; 123 while ((tmp = page_chain_next(page))) 124 ++i, page = tmp; 125 if (len) 126 *len = i; 127 return page; 128 } 129 130 static int page_chain_free(struct page *page) 131 { 132 struct page *tmp; 133 int i = 0; 134 page_chain_for_each_safe(page, tmp) { 135 put_page(page); 136 ++i; 137 } 138 return i; 139 } 140 141 static void page_chain_add(struct page **head, 142 struct page *chain_first, struct page *chain_last) 143 { 144 #if 1 145 struct page *tmp; 146 tmp = page_chain_tail(chain_first, NULL); 147 BUG_ON(tmp != chain_last); 148 #endif 149 150 /* add chain to head */ 151 set_page_private(chain_last, (unsigned long)*head); 152 *head = chain_first; 153 } 154 155 static struct page *__drbd_alloc_pages(struct drbd_device *device, 156 unsigned int number) 157 { 158 struct page *page = NULL; 159 struct page *tmp = NULL; 160 unsigned int i = 0; 161 162 /* Yes, testing drbd_pp_vacant outside the lock is racy. 163 * So what. It saves a spin_lock. */ 164 if (drbd_pp_vacant >= number) { 165 spin_lock(&drbd_pp_lock); 166 page = page_chain_del(&drbd_pp_pool, number); 167 if (page) 168 drbd_pp_vacant -= number; 169 spin_unlock(&drbd_pp_lock); 170 if (page) 171 return page; 172 } 173 174 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD 175 * "criss-cross" setup, that might cause write-out on some other DRBD, 176 * which in turn might block on the other node at this very place. */ 177 for (i = 0; i < number; i++) { 178 tmp = alloc_page(GFP_TRY); 179 if (!tmp) 180 break; 181 set_page_private(tmp, (unsigned long)page); 182 page = tmp; 183 } 184 185 if (i == number) 186 return page; 187 188 /* Not enough pages immediately available this time. 189 * No need to jump around here, drbd_alloc_pages will retry this 190 * function "soon". */ 191 if (page) { 192 tmp = page_chain_tail(page, NULL); 193 spin_lock(&drbd_pp_lock); 194 page_chain_add(&drbd_pp_pool, page, tmp); 195 drbd_pp_vacant += i; 196 spin_unlock(&drbd_pp_lock); 197 } 198 return NULL; 199 } 200 201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device, 202 struct list_head *to_be_freed) 203 { 204 struct drbd_peer_request *peer_req, *tmp; 205 206 /* The EEs are always appended to the end of the list. Since 207 they are sent in order over the wire, they have to finish 208 in order. As soon as we see the first not finished we can 209 stop to examine the list... */ 210 211 list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) { 212 if (drbd_peer_req_has_active_page(peer_req)) 213 break; 214 list_move(&peer_req->w.list, to_be_freed); 215 } 216 } 217 218 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device) 219 { 220 LIST_HEAD(reclaimed); 221 struct drbd_peer_request *peer_req, *t; 222 223 spin_lock_irq(&device->resource->req_lock); 224 reclaim_finished_net_peer_reqs(device, &reclaimed); 225 spin_unlock_irq(&device->resource->req_lock); 226 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list) 227 drbd_free_net_peer_req(device, peer_req); 228 } 229 230 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection) 231 { 232 struct drbd_peer_device *peer_device; 233 int vnr; 234 235 rcu_read_lock(); 236 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 237 struct drbd_device *device = peer_device->device; 238 if (!atomic_read(&device->pp_in_use_by_net)) 239 continue; 240 241 kref_get(&device->kref); 242 rcu_read_unlock(); 243 drbd_reclaim_net_peer_reqs(device); 244 kref_put(&device->kref, drbd_destroy_device); 245 rcu_read_lock(); 246 } 247 rcu_read_unlock(); 248 } 249 250 /** 251 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled) 252 * @device: DRBD device. 253 * @number: number of pages requested 254 * @retry: whether to retry, if not enough pages are available right now 255 * 256 * Tries to allocate number pages, first from our own page pool, then from 257 * the kernel. 258 * Possibly retry until DRBD frees sufficient pages somewhere else. 259 * 260 * If this allocation would exceed the max_buffers setting, we throttle 261 * allocation (schedule_timeout) to give the system some room to breathe. 262 * 263 * We do not use max-buffers as hard limit, because it could lead to 264 * congestion and further to a distributed deadlock during online-verify or 265 * (checksum based) resync, if the max-buffers, socket buffer sizes and 266 * resync-rate settings are mis-configured. 267 * 268 * Returns a page chain linked via page->private. 269 */ 270 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number, 271 bool retry) 272 { 273 struct drbd_device *device = peer_device->device; 274 struct page *page = NULL; 275 struct net_conf *nc; 276 DEFINE_WAIT(wait); 277 unsigned int mxb; 278 279 rcu_read_lock(); 280 nc = rcu_dereference(peer_device->connection->net_conf); 281 mxb = nc ? nc->max_buffers : 1000000; 282 rcu_read_unlock(); 283 284 if (atomic_read(&device->pp_in_use) < mxb) 285 page = __drbd_alloc_pages(device, number); 286 287 /* Try to keep the fast path fast, but occasionally we need 288 * to reclaim the pages we lended to the network stack. */ 289 if (page && atomic_read(&device->pp_in_use_by_net) > 512) 290 drbd_reclaim_net_peer_reqs(device); 291 292 while (page == NULL) { 293 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE); 294 295 drbd_reclaim_net_peer_reqs(device); 296 297 if (atomic_read(&device->pp_in_use) < mxb) { 298 page = __drbd_alloc_pages(device, number); 299 if (page) 300 break; 301 } 302 303 if (!retry) 304 break; 305 306 if (signal_pending(current)) { 307 drbd_warn(device, "drbd_alloc_pages interrupted!\n"); 308 break; 309 } 310 311 if (schedule_timeout(HZ/10) == 0) 312 mxb = UINT_MAX; 313 } 314 finish_wait(&drbd_pp_wait, &wait); 315 316 if (page) 317 atomic_add(number, &device->pp_in_use); 318 return page; 319 } 320 321 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages. 322 * Is also used from inside an other spin_lock_irq(&resource->req_lock); 323 * Either links the page chain back to the global pool, 324 * or returns all pages to the system. */ 325 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net) 326 { 327 atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use; 328 int i; 329 330 if (page == NULL) 331 return; 332 333 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count) 334 i = page_chain_free(page); 335 else { 336 struct page *tmp; 337 tmp = page_chain_tail(page, &i); 338 spin_lock(&drbd_pp_lock); 339 page_chain_add(&drbd_pp_pool, page, tmp); 340 drbd_pp_vacant += i; 341 spin_unlock(&drbd_pp_lock); 342 } 343 i = atomic_sub_return(i, a); 344 if (i < 0) 345 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n", 346 is_net ? "pp_in_use_by_net" : "pp_in_use", i); 347 wake_up(&drbd_pp_wait); 348 } 349 350 /* 351 You need to hold the req_lock: 352 _drbd_wait_ee_list_empty() 353 354 You must not have the req_lock: 355 drbd_free_peer_req() 356 drbd_alloc_peer_req() 357 drbd_free_peer_reqs() 358 drbd_ee_fix_bhs() 359 drbd_finish_peer_reqs() 360 drbd_clear_done_ee() 361 drbd_wait_ee_list_empty() 362 */ 363 364 struct drbd_peer_request * 365 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector, 366 unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local) 367 { 368 struct drbd_device *device = peer_device->device; 369 struct drbd_peer_request *peer_req; 370 struct page *page = NULL; 371 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT; 372 373 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE)) 374 return NULL; 375 376 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM); 377 if (!peer_req) { 378 if (!(gfp_mask & __GFP_NOWARN)) 379 drbd_err(device, "%s: allocation failed\n", __func__); 380 return NULL; 381 } 382 383 if (has_payload && data_size) { 384 page = drbd_alloc_pages(peer_device, nr_pages, 385 gfpflags_allow_blocking(gfp_mask)); 386 if (!page) 387 goto fail; 388 } 389 390 memset(peer_req, 0, sizeof(*peer_req)); 391 INIT_LIST_HEAD(&peer_req->w.list); 392 drbd_clear_interval(&peer_req->i); 393 peer_req->i.size = data_size; 394 peer_req->i.sector = sector; 395 peer_req->submit_jif = jiffies; 396 peer_req->peer_device = peer_device; 397 peer_req->pages = page; 398 /* 399 * The block_id is opaque to the receiver. It is not endianness 400 * converted, and sent back to the sender unchanged. 401 */ 402 peer_req->block_id = id; 403 404 return peer_req; 405 406 fail: 407 mempool_free(peer_req, drbd_ee_mempool); 408 return NULL; 409 } 410 411 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req, 412 int is_net) 413 { 414 might_sleep(); 415 if (peer_req->flags & EE_HAS_DIGEST) 416 kfree(peer_req->digest); 417 drbd_free_pages(device, peer_req->pages, is_net); 418 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0); 419 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 420 if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) { 421 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO; 422 drbd_al_complete_io(device, &peer_req->i); 423 } 424 mempool_free(peer_req, drbd_ee_mempool); 425 } 426 427 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list) 428 { 429 LIST_HEAD(work_list); 430 struct drbd_peer_request *peer_req, *t; 431 int count = 0; 432 int is_net = list == &device->net_ee; 433 434 spin_lock_irq(&device->resource->req_lock); 435 list_splice_init(list, &work_list); 436 spin_unlock_irq(&device->resource->req_lock); 437 438 list_for_each_entry_safe(peer_req, t, &work_list, w.list) { 439 __drbd_free_peer_req(device, peer_req, is_net); 440 count++; 441 } 442 return count; 443 } 444 445 /* 446 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier. 447 */ 448 static int drbd_finish_peer_reqs(struct drbd_device *device) 449 { 450 LIST_HEAD(work_list); 451 LIST_HEAD(reclaimed); 452 struct drbd_peer_request *peer_req, *t; 453 int err = 0; 454 455 spin_lock_irq(&device->resource->req_lock); 456 reclaim_finished_net_peer_reqs(device, &reclaimed); 457 list_splice_init(&device->done_ee, &work_list); 458 spin_unlock_irq(&device->resource->req_lock); 459 460 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list) 461 drbd_free_net_peer_req(device, peer_req); 462 463 /* possible callbacks here: 464 * e_end_block, and e_end_resync_block, e_send_superseded. 465 * all ignore the last argument. 466 */ 467 list_for_each_entry_safe(peer_req, t, &work_list, w.list) { 468 int err2; 469 470 /* list_del not necessary, next/prev members not touched */ 471 err2 = peer_req->w.cb(&peer_req->w, !!err); 472 if (!err) 473 err = err2; 474 drbd_free_peer_req(device, peer_req); 475 } 476 wake_up(&device->ee_wait); 477 478 return err; 479 } 480 481 static void _drbd_wait_ee_list_empty(struct drbd_device *device, 482 struct list_head *head) 483 { 484 DEFINE_WAIT(wait); 485 486 /* avoids spin_lock/unlock 487 * and calling prepare_to_wait in the fast path */ 488 while (!list_empty(head)) { 489 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE); 490 spin_unlock_irq(&device->resource->req_lock); 491 io_schedule(); 492 finish_wait(&device->ee_wait, &wait); 493 spin_lock_irq(&device->resource->req_lock); 494 } 495 } 496 497 static void drbd_wait_ee_list_empty(struct drbd_device *device, 498 struct list_head *head) 499 { 500 spin_lock_irq(&device->resource->req_lock); 501 _drbd_wait_ee_list_empty(device, head); 502 spin_unlock_irq(&device->resource->req_lock); 503 } 504 505 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags) 506 { 507 struct kvec iov = { 508 .iov_base = buf, 509 .iov_len = size, 510 }; 511 struct msghdr msg = { 512 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL) 513 }; 514 return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags); 515 } 516 517 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size) 518 { 519 int rv; 520 521 rv = drbd_recv_short(connection->data.socket, buf, size, 0); 522 523 if (rv < 0) { 524 if (rv == -ECONNRESET) 525 drbd_info(connection, "sock was reset by peer\n"); 526 else if (rv != -ERESTARTSYS) 527 drbd_err(connection, "sock_recvmsg returned %d\n", rv); 528 } else if (rv == 0) { 529 if (test_bit(DISCONNECT_SENT, &connection->flags)) { 530 long t; 531 rcu_read_lock(); 532 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10; 533 rcu_read_unlock(); 534 535 t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t); 536 537 if (t) 538 goto out; 539 } 540 drbd_info(connection, "sock was shut down by peer\n"); 541 } 542 543 if (rv != size) 544 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD); 545 546 out: 547 return rv; 548 } 549 550 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size) 551 { 552 int err; 553 554 err = drbd_recv(connection, buf, size); 555 if (err != size) { 556 if (err >= 0) 557 err = -EIO; 558 } else 559 err = 0; 560 return err; 561 } 562 563 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size) 564 { 565 int err; 566 567 err = drbd_recv_all(connection, buf, size); 568 if (err && !signal_pending(current)) 569 drbd_warn(connection, "short read (expected size %d)\n", (int)size); 570 return err; 571 } 572 573 /* quoting tcp(7): 574 * On individual connections, the socket buffer size must be set prior to the 575 * listen(2) or connect(2) calls in order to have it take effect. 576 * This is our wrapper to do so. 577 */ 578 static void drbd_setbufsize(struct socket *sock, unsigned int snd, 579 unsigned int rcv) 580 { 581 /* open coded SO_SNDBUF, SO_RCVBUF */ 582 if (snd) { 583 sock->sk->sk_sndbuf = snd; 584 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK; 585 } 586 if (rcv) { 587 sock->sk->sk_rcvbuf = rcv; 588 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK; 589 } 590 } 591 592 static struct socket *drbd_try_connect(struct drbd_connection *connection) 593 { 594 const char *what; 595 struct socket *sock; 596 struct sockaddr_in6 src_in6; 597 struct sockaddr_in6 peer_in6; 598 struct net_conf *nc; 599 int err, peer_addr_len, my_addr_len; 600 int sndbuf_size, rcvbuf_size, connect_int; 601 int disconnect_on_error = 1; 602 603 rcu_read_lock(); 604 nc = rcu_dereference(connection->net_conf); 605 if (!nc) { 606 rcu_read_unlock(); 607 return NULL; 608 } 609 sndbuf_size = nc->sndbuf_size; 610 rcvbuf_size = nc->rcvbuf_size; 611 connect_int = nc->connect_int; 612 rcu_read_unlock(); 613 614 my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6)); 615 memcpy(&src_in6, &connection->my_addr, my_addr_len); 616 617 if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6) 618 src_in6.sin6_port = 0; 619 else 620 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */ 621 622 peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6)); 623 memcpy(&peer_in6, &connection->peer_addr, peer_addr_len); 624 625 what = "sock_create_kern"; 626 err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family, 627 SOCK_STREAM, IPPROTO_TCP, &sock); 628 if (err < 0) { 629 sock = NULL; 630 goto out; 631 } 632 633 sock->sk->sk_rcvtimeo = 634 sock->sk->sk_sndtimeo = connect_int * HZ; 635 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size); 636 637 /* explicitly bind to the configured IP as source IP 638 * for the outgoing connections. 639 * This is needed for multihomed hosts and to be 640 * able to use lo: interfaces for drbd. 641 * Make sure to use 0 as port number, so linux selects 642 * a free one dynamically. 643 */ 644 what = "bind before connect"; 645 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len); 646 if (err < 0) 647 goto out; 648 649 /* connect may fail, peer not yet available. 650 * stay C_WF_CONNECTION, don't go Disconnecting! */ 651 disconnect_on_error = 0; 652 what = "connect"; 653 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0); 654 655 out: 656 if (err < 0) { 657 if (sock) { 658 sock_release(sock); 659 sock = NULL; 660 } 661 switch (-err) { 662 /* timeout, busy, signal pending */ 663 case ETIMEDOUT: case EAGAIN: case EINPROGRESS: 664 case EINTR: case ERESTARTSYS: 665 /* peer not (yet) available, network problem */ 666 case ECONNREFUSED: case ENETUNREACH: 667 case EHOSTDOWN: case EHOSTUNREACH: 668 disconnect_on_error = 0; 669 break; 670 default: 671 drbd_err(connection, "%s failed, err = %d\n", what, err); 672 } 673 if (disconnect_on_error) 674 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 675 } 676 677 return sock; 678 } 679 680 struct accept_wait_data { 681 struct drbd_connection *connection; 682 struct socket *s_listen; 683 struct completion door_bell; 684 void (*original_sk_state_change)(struct sock *sk); 685 686 }; 687 688 static void drbd_incoming_connection(struct sock *sk) 689 { 690 struct accept_wait_data *ad = sk->sk_user_data; 691 void (*state_change)(struct sock *sk); 692 693 state_change = ad->original_sk_state_change; 694 if (sk->sk_state == TCP_ESTABLISHED) 695 complete(&ad->door_bell); 696 state_change(sk); 697 } 698 699 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad) 700 { 701 int err, sndbuf_size, rcvbuf_size, my_addr_len; 702 struct sockaddr_in6 my_addr; 703 struct socket *s_listen; 704 struct net_conf *nc; 705 const char *what; 706 707 rcu_read_lock(); 708 nc = rcu_dereference(connection->net_conf); 709 if (!nc) { 710 rcu_read_unlock(); 711 return -EIO; 712 } 713 sndbuf_size = nc->sndbuf_size; 714 rcvbuf_size = nc->rcvbuf_size; 715 rcu_read_unlock(); 716 717 my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6)); 718 memcpy(&my_addr, &connection->my_addr, my_addr_len); 719 720 what = "sock_create_kern"; 721 err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family, 722 SOCK_STREAM, IPPROTO_TCP, &s_listen); 723 if (err) { 724 s_listen = NULL; 725 goto out; 726 } 727 728 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 729 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size); 730 731 what = "bind before listen"; 732 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len); 733 if (err < 0) 734 goto out; 735 736 ad->s_listen = s_listen; 737 write_lock_bh(&s_listen->sk->sk_callback_lock); 738 ad->original_sk_state_change = s_listen->sk->sk_state_change; 739 s_listen->sk->sk_state_change = drbd_incoming_connection; 740 s_listen->sk->sk_user_data = ad; 741 write_unlock_bh(&s_listen->sk->sk_callback_lock); 742 743 what = "listen"; 744 err = s_listen->ops->listen(s_listen, 5); 745 if (err < 0) 746 goto out; 747 748 return 0; 749 out: 750 if (s_listen) 751 sock_release(s_listen); 752 if (err < 0) { 753 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) { 754 drbd_err(connection, "%s failed, err = %d\n", what, err); 755 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 756 } 757 } 758 759 return -EIO; 760 } 761 762 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad) 763 { 764 write_lock_bh(&sk->sk_callback_lock); 765 sk->sk_state_change = ad->original_sk_state_change; 766 sk->sk_user_data = NULL; 767 write_unlock_bh(&sk->sk_callback_lock); 768 } 769 770 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad) 771 { 772 int timeo, connect_int, err = 0; 773 struct socket *s_estab = NULL; 774 struct net_conf *nc; 775 776 rcu_read_lock(); 777 nc = rcu_dereference(connection->net_conf); 778 if (!nc) { 779 rcu_read_unlock(); 780 return NULL; 781 } 782 connect_int = nc->connect_int; 783 rcu_read_unlock(); 784 785 timeo = connect_int * HZ; 786 /* 28.5% random jitter */ 787 timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7; 788 789 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo); 790 if (err <= 0) 791 return NULL; 792 793 err = kernel_accept(ad->s_listen, &s_estab, 0); 794 if (err < 0) { 795 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) { 796 drbd_err(connection, "accept failed, err = %d\n", err); 797 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 798 } 799 } 800 801 if (s_estab) 802 unregister_state_change(s_estab->sk, ad); 803 804 return s_estab; 805 } 806 807 static int decode_header(struct drbd_connection *, void *, struct packet_info *); 808 809 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock, 810 enum drbd_packet cmd) 811 { 812 if (!conn_prepare_command(connection, sock)) 813 return -EIO; 814 return conn_send_command(connection, sock, cmd, 0, NULL, 0); 815 } 816 817 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock) 818 { 819 unsigned int header_size = drbd_header_size(connection); 820 struct packet_info pi; 821 struct net_conf *nc; 822 int err; 823 824 rcu_read_lock(); 825 nc = rcu_dereference(connection->net_conf); 826 if (!nc) { 827 rcu_read_unlock(); 828 return -EIO; 829 } 830 sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10; 831 rcu_read_unlock(); 832 833 err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0); 834 if (err != header_size) { 835 if (err >= 0) 836 err = -EIO; 837 return err; 838 } 839 err = decode_header(connection, connection->data.rbuf, &pi); 840 if (err) 841 return err; 842 return pi.cmd; 843 } 844 845 /** 846 * drbd_socket_okay() - Free the socket if its connection is not okay 847 * @sock: pointer to the pointer to the socket. 848 */ 849 static bool drbd_socket_okay(struct socket **sock) 850 { 851 int rr; 852 char tb[4]; 853 854 if (!*sock) 855 return false; 856 857 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK); 858 859 if (rr > 0 || rr == -EAGAIN) { 860 return true; 861 } else { 862 sock_release(*sock); 863 *sock = NULL; 864 return false; 865 } 866 } 867 868 static bool connection_established(struct drbd_connection *connection, 869 struct socket **sock1, 870 struct socket **sock2) 871 { 872 struct net_conf *nc; 873 int timeout; 874 bool ok; 875 876 if (!*sock1 || !*sock2) 877 return false; 878 879 rcu_read_lock(); 880 nc = rcu_dereference(connection->net_conf); 881 timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10; 882 rcu_read_unlock(); 883 schedule_timeout_interruptible(timeout); 884 885 ok = drbd_socket_okay(sock1); 886 ok = drbd_socket_okay(sock2) && ok; 887 888 return ok; 889 } 890 891 /* Gets called if a connection is established, or if a new minor gets created 892 in a connection */ 893 int drbd_connected(struct drbd_peer_device *peer_device) 894 { 895 struct drbd_device *device = peer_device->device; 896 int err; 897 898 atomic_set(&device->packet_seq, 0); 899 device->peer_seq = 0; 900 901 device->state_mutex = peer_device->connection->agreed_pro_version < 100 ? 902 &peer_device->connection->cstate_mutex : 903 &device->own_state_mutex; 904 905 err = drbd_send_sync_param(peer_device); 906 if (!err) 907 err = drbd_send_sizes(peer_device, 0, 0); 908 if (!err) 909 err = drbd_send_uuids(peer_device); 910 if (!err) 911 err = drbd_send_current_state(peer_device); 912 clear_bit(USE_DEGR_WFC_T, &device->flags); 913 clear_bit(RESIZE_PENDING, &device->flags); 914 atomic_set(&device->ap_in_flight, 0); 915 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */ 916 return err; 917 } 918 919 /* 920 * return values: 921 * 1 yes, we have a valid connection 922 * 0 oops, did not work out, please try again 923 * -1 peer talks different language, 924 * no point in trying again, please go standalone. 925 * -2 We do not have a network config... 926 */ 927 static int conn_connect(struct drbd_connection *connection) 928 { 929 struct drbd_socket sock, msock; 930 struct drbd_peer_device *peer_device; 931 struct net_conf *nc; 932 int vnr, timeout, h; 933 bool discard_my_data, ok; 934 enum drbd_state_rv rv; 935 struct accept_wait_data ad = { 936 .connection = connection, 937 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell), 938 }; 939 940 clear_bit(DISCONNECT_SENT, &connection->flags); 941 if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS) 942 return -2; 943 944 mutex_init(&sock.mutex); 945 sock.sbuf = connection->data.sbuf; 946 sock.rbuf = connection->data.rbuf; 947 sock.socket = NULL; 948 mutex_init(&msock.mutex); 949 msock.sbuf = connection->meta.sbuf; 950 msock.rbuf = connection->meta.rbuf; 951 msock.socket = NULL; 952 953 /* Assume that the peer only understands protocol 80 until we know better. */ 954 connection->agreed_pro_version = 80; 955 956 if (prepare_listen_socket(connection, &ad)) 957 return 0; 958 959 do { 960 struct socket *s; 961 962 s = drbd_try_connect(connection); 963 if (s) { 964 if (!sock.socket) { 965 sock.socket = s; 966 send_first_packet(connection, &sock, P_INITIAL_DATA); 967 } else if (!msock.socket) { 968 clear_bit(RESOLVE_CONFLICTS, &connection->flags); 969 msock.socket = s; 970 send_first_packet(connection, &msock, P_INITIAL_META); 971 } else { 972 drbd_err(connection, "Logic error in conn_connect()\n"); 973 goto out_release_sockets; 974 } 975 } 976 977 if (connection_established(connection, &sock.socket, &msock.socket)) 978 break; 979 980 retry: 981 s = drbd_wait_for_connect(connection, &ad); 982 if (s) { 983 int fp = receive_first_packet(connection, s); 984 drbd_socket_okay(&sock.socket); 985 drbd_socket_okay(&msock.socket); 986 switch (fp) { 987 case P_INITIAL_DATA: 988 if (sock.socket) { 989 drbd_warn(connection, "initial packet S crossed\n"); 990 sock_release(sock.socket); 991 sock.socket = s; 992 goto randomize; 993 } 994 sock.socket = s; 995 break; 996 case P_INITIAL_META: 997 set_bit(RESOLVE_CONFLICTS, &connection->flags); 998 if (msock.socket) { 999 drbd_warn(connection, "initial packet M crossed\n"); 1000 sock_release(msock.socket); 1001 msock.socket = s; 1002 goto randomize; 1003 } 1004 msock.socket = s; 1005 break; 1006 default: 1007 drbd_warn(connection, "Error receiving initial packet\n"); 1008 sock_release(s); 1009 randomize: 1010 if (prandom_u32() & 1) 1011 goto retry; 1012 } 1013 } 1014 1015 if (connection->cstate <= C_DISCONNECTING) 1016 goto out_release_sockets; 1017 if (signal_pending(current)) { 1018 flush_signals(current); 1019 smp_rmb(); 1020 if (get_t_state(&connection->receiver) == EXITING) 1021 goto out_release_sockets; 1022 } 1023 1024 ok = connection_established(connection, &sock.socket, &msock.socket); 1025 } while (!ok); 1026 1027 if (ad.s_listen) 1028 sock_release(ad.s_listen); 1029 1030 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 1031 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 1032 1033 sock.socket->sk->sk_allocation = GFP_NOIO; 1034 msock.socket->sk->sk_allocation = GFP_NOIO; 1035 1036 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK; 1037 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE; 1038 1039 /* NOT YET ... 1040 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10; 1041 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; 1042 * first set it to the P_CONNECTION_FEATURES timeout, 1043 * which we set to 4x the configured ping_timeout. */ 1044 rcu_read_lock(); 1045 nc = rcu_dereference(connection->net_conf); 1046 1047 sock.socket->sk->sk_sndtimeo = 1048 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10; 1049 1050 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ; 1051 timeout = nc->timeout * HZ / 10; 1052 discard_my_data = nc->discard_my_data; 1053 rcu_read_unlock(); 1054 1055 msock.socket->sk->sk_sndtimeo = timeout; 1056 1057 /* we don't want delays. 1058 * we use TCP_CORK where appropriate, though */ 1059 drbd_tcp_nodelay(sock.socket); 1060 drbd_tcp_nodelay(msock.socket); 1061 1062 connection->data.socket = sock.socket; 1063 connection->meta.socket = msock.socket; 1064 connection->last_received = jiffies; 1065 1066 h = drbd_do_features(connection); 1067 if (h <= 0) 1068 return h; 1069 1070 if (connection->cram_hmac_tfm) { 1071 /* drbd_request_state(device, NS(conn, WFAuth)); */ 1072 switch (drbd_do_auth(connection)) { 1073 case -1: 1074 drbd_err(connection, "Authentication of peer failed\n"); 1075 return -1; 1076 case 0: 1077 drbd_err(connection, "Authentication of peer failed, trying again.\n"); 1078 return 0; 1079 } 1080 } 1081 1082 connection->data.socket->sk->sk_sndtimeo = timeout; 1083 connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; 1084 1085 if (drbd_send_protocol(connection) == -EOPNOTSUPP) 1086 return -1; 1087 1088 /* Prevent a race between resync-handshake and 1089 * being promoted to Primary. 1090 * 1091 * Grab and release the state mutex, so we know that any current 1092 * drbd_set_role() is finished, and any incoming drbd_set_role 1093 * will see the STATE_SENT flag, and wait for it to be cleared. 1094 */ 1095 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) 1096 mutex_lock(peer_device->device->state_mutex); 1097 1098 set_bit(STATE_SENT, &connection->flags); 1099 1100 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) 1101 mutex_unlock(peer_device->device->state_mutex); 1102 1103 rcu_read_lock(); 1104 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1105 struct drbd_device *device = peer_device->device; 1106 kref_get(&device->kref); 1107 rcu_read_unlock(); 1108 1109 if (discard_my_data) 1110 set_bit(DISCARD_MY_DATA, &device->flags); 1111 else 1112 clear_bit(DISCARD_MY_DATA, &device->flags); 1113 1114 drbd_connected(peer_device); 1115 kref_put(&device->kref, drbd_destroy_device); 1116 rcu_read_lock(); 1117 } 1118 rcu_read_unlock(); 1119 1120 rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE); 1121 if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) { 1122 clear_bit(STATE_SENT, &connection->flags); 1123 return 0; 1124 } 1125 1126 drbd_thread_start(&connection->ack_receiver); 1127 /* opencoded create_singlethread_workqueue(), 1128 * to be able to use format string arguments */ 1129 connection->ack_sender = 1130 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name); 1131 if (!connection->ack_sender) { 1132 drbd_err(connection, "Failed to create workqueue ack_sender\n"); 1133 return 0; 1134 } 1135 1136 mutex_lock(&connection->resource->conf_update); 1137 /* The discard_my_data flag is a single-shot modifier to the next 1138 * connection attempt, the handshake of which is now well underway. 1139 * No need for rcu style copying of the whole struct 1140 * just to clear a single value. */ 1141 connection->net_conf->discard_my_data = 0; 1142 mutex_unlock(&connection->resource->conf_update); 1143 1144 return h; 1145 1146 out_release_sockets: 1147 if (ad.s_listen) 1148 sock_release(ad.s_listen); 1149 if (sock.socket) 1150 sock_release(sock.socket); 1151 if (msock.socket) 1152 sock_release(msock.socket); 1153 return -1; 1154 } 1155 1156 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi) 1157 { 1158 unsigned int header_size = drbd_header_size(connection); 1159 1160 if (header_size == sizeof(struct p_header100) && 1161 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) { 1162 struct p_header100 *h = header; 1163 if (h->pad != 0) { 1164 drbd_err(connection, "Header padding is not zero\n"); 1165 return -EINVAL; 1166 } 1167 pi->vnr = be16_to_cpu(h->volume); 1168 pi->cmd = be16_to_cpu(h->command); 1169 pi->size = be32_to_cpu(h->length); 1170 } else if (header_size == sizeof(struct p_header95) && 1171 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) { 1172 struct p_header95 *h = header; 1173 pi->cmd = be16_to_cpu(h->command); 1174 pi->size = be32_to_cpu(h->length); 1175 pi->vnr = 0; 1176 } else if (header_size == sizeof(struct p_header80) && 1177 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) { 1178 struct p_header80 *h = header; 1179 pi->cmd = be16_to_cpu(h->command); 1180 pi->size = be16_to_cpu(h->length); 1181 pi->vnr = 0; 1182 } else { 1183 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n", 1184 be32_to_cpu(*(__be32 *)header), 1185 connection->agreed_pro_version); 1186 return -EINVAL; 1187 } 1188 pi->data = header + header_size; 1189 return 0; 1190 } 1191 1192 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi) 1193 { 1194 void *buffer = connection->data.rbuf; 1195 int err; 1196 1197 err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection)); 1198 if (err) 1199 return err; 1200 1201 err = decode_header(connection, buffer, pi); 1202 connection->last_received = jiffies; 1203 1204 return err; 1205 } 1206 1207 static void drbd_flush(struct drbd_connection *connection) 1208 { 1209 int rv; 1210 struct drbd_peer_device *peer_device; 1211 int vnr; 1212 1213 if (connection->resource->write_ordering >= WO_BDEV_FLUSH) { 1214 rcu_read_lock(); 1215 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1216 struct drbd_device *device = peer_device->device; 1217 1218 if (!get_ldev(device)) 1219 continue; 1220 kref_get(&device->kref); 1221 rcu_read_unlock(); 1222 1223 /* Right now, we have only this one synchronous code path 1224 * for flushes between request epochs. 1225 * We may want to make those asynchronous, 1226 * or at least parallelize the flushes to the volume devices. 1227 */ 1228 device->flush_jif = jiffies; 1229 set_bit(FLUSH_PENDING, &device->flags); 1230 rv = blkdev_issue_flush(device->ldev->backing_bdev, 1231 GFP_NOIO, NULL); 1232 clear_bit(FLUSH_PENDING, &device->flags); 1233 if (rv) { 1234 drbd_info(device, "local disk flush failed with status %d\n", rv); 1235 /* would rather check on EOPNOTSUPP, but that is not reliable. 1236 * don't try again for ANY return value != 0 1237 * if (rv == -EOPNOTSUPP) */ 1238 drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO); 1239 } 1240 put_ldev(device); 1241 kref_put(&device->kref, drbd_destroy_device); 1242 1243 rcu_read_lock(); 1244 if (rv) 1245 break; 1246 } 1247 rcu_read_unlock(); 1248 } 1249 } 1250 1251 /** 1252 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it. 1253 * @device: DRBD device. 1254 * @epoch: Epoch object. 1255 * @ev: Epoch event. 1256 */ 1257 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection, 1258 struct drbd_epoch *epoch, 1259 enum epoch_event ev) 1260 { 1261 int epoch_size; 1262 struct drbd_epoch *next_epoch; 1263 enum finish_epoch rv = FE_STILL_LIVE; 1264 1265 spin_lock(&connection->epoch_lock); 1266 do { 1267 next_epoch = NULL; 1268 1269 epoch_size = atomic_read(&epoch->epoch_size); 1270 1271 switch (ev & ~EV_CLEANUP) { 1272 case EV_PUT: 1273 atomic_dec(&epoch->active); 1274 break; 1275 case EV_GOT_BARRIER_NR: 1276 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags); 1277 break; 1278 case EV_BECAME_LAST: 1279 /* nothing to do*/ 1280 break; 1281 } 1282 1283 if (epoch_size != 0 && 1284 atomic_read(&epoch->active) == 0 && 1285 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) { 1286 if (!(ev & EV_CLEANUP)) { 1287 spin_unlock(&connection->epoch_lock); 1288 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size); 1289 spin_lock(&connection->epoch_lock); 1290 } 1291 #if 0 1292 /* FIXME: dec unacked on connection, once we have 1293 * something to count pending connection packets in. */ 1294 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) 1295 dec_unacked(epoch->connection); 1296 #endif 1297 1298 if (connection->current_epoch != epoch) { 1299 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list); 1300 list_del(&epoch->list); 1301 ev = EV_BECAME_LAST | (ev & EV_CLEANUP); 1302 connection->epochs--; 1303 kfree(epoch); 1304 1305 if (rv == FE_STILL_LIVE) 1306 rv = FE_DESTROYED; 1307 } else { 1308 epoch->flags = 0; 1309 atomic_set(&epoch->epoch_size, 0); 1310 /* atomic_set(&epoch->active, 0); is already zero */ 1311 if (rv == FE_STILL_LIVE) 1312 rv = FE_RECYCLED; 1313 } 1314 } 1315 1316 if (!next_epoch) 1317 break; 1318 1319 epoch = next_epoch; 1320 } while (1); 1321 1322 spin_unlock(&connection->epoch_lock); 1323 1324 return rv; 1325 } 1326 1327 static enum write_ordering_e 1328 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo) 1329 { 1330 struct disk_conf *dc; 1331 1332 dc = rcu_dereference(bdev->disk_conf); 1333 1334 if (wo == WO_BDEV_FLUSH && !dc->disk_flushes) 1335 wo = WO_DRAIN_IO; 1336 if (wo == WO_DRAIN_IO && !dc->disk_drain) 1337 wo = WO_NONE; 1338 1339 return wo; 1340 } 1341 1342 /** 1343 * drbd_bump_write_ordering() - Fall back to an other write ordering method 1344 * @connection: DRBD connection. 1345 * @wo: Write ordering method to try. 1346 */ 1347 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev, 1348 enum write_ordering_e wo) 1349 { 1350 struct drbd_device *device; 1351 enum write_ordering_e pwo; 1352 int vnr; 1353 static char *write_ordering_str[] = { 1354 [WO_NONE] = "none", 1355 [WO_DRAIN_IO] = "drain", 1356 [WO_BDEV_FLUSH] = "flush", 1357 }; 1358 1359 pwo = resource->write_ordering; 1360 if (wo != WO_BDEV_FLUSH) 1361 wo = min(pwo, wo); 1362 rcu_read_lock(); 1363 idr_for_each_entry(&resource->devices, device, vnr) { 1364 if (get_ldev(device)) { 1365 wo = max_allowed_wo(device->ldev, wo); 1366 if (device->ldev == bdev) 1367 bdev = NULL; 1368 put_ldev(device); 1369 } 1370 } 1371 1372 if (bdev) 1373 wo = max_allowed_wo(bdev, wo); 1374 1375 rcu_read_unlock(); 1376 1377 resource->write_ordering = wo; 1378 if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH) 1379 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]); 1380 } 1381 1382 /** 1383 * drbd_submit_peer_request() 1384 * @device: DRBD device. 1385 * @peer_req: peer request 1386 * @rw: flag field, see bio->bi_rw 1387 * 1388 * May spread the pages to multiple bios, 1389 * depending on bio_add_page restrictions. 1390 * 1391 * Returns 0 if all bios have been submitted, 1392 * -ENOMEM if we could not allocate enough bios, 1393 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a 1394 * single page to an empty bio (which should never happen and likely indicates 1395 * that the lower level IO stack is in some way broken). This has been observed 1396 * on certain Xen deployments. 1397 */ 1398 /* TODO allocate from our own bio_set. */ 1399 int drbd_submit_peer_request(struct drbd_device *device, 1400 struct drbd_peer_request *peer_req, 1401 const unsigned rw, const int fault_type) 1402 { 1403 struct bio *bios = NULL; 1404 struct bio *bio; 1405 struct page *page = peer_req->pages; 1406 sector_t sector = peer_req->i.sector; 1407 unsigned data_size = peer_req->i.size; 1408 unsigned n_bios = 0; 1409 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT; 1410 int err = -ENOMEM; 1411 1412 if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) { 1413 /* wait for all pending IO completions, before we start 1414 * zeroing things out. */ 1415 conn_wait_active_ee_empty(peer_req->peer_device->connection); 1416 /* add it to the active list now, 1417 * so we can find it to present it in debugfs */ 1418 peer_req->submit_jif = jiffies; 1419 peer_req->flags |= EE_SUBMITTED; 1420 spin_lock_irq(&device->resource->req_lock); 1421 list_add_tail(&peer_req->w.list, &device->active_ee); 1422 spin_unlock_irq(&device->resource->req_lock); 1423 if (blkdev_issue_zeroout(device->ldev->backing_bdev, 1424 sector, data_size >> 9, GFP_NOIO, false)) 1425 peer_req->flags |= EE_WAS_ERROR; 1426 drbd_endio_write_sec_final(peer_req); 1427 return 0; 1428 } 1429 1430 /* Discards don't have any payload. 1431 * But the scsi layer still expects a bio_vec it can use internally, 1432 * see sd_setup_discard_cmnd() and blk_add_request_payload(). */ 1433 if (peer_req->flags & EE_IS_TRIM) 1434 nr_pages = 1; 1435 1436 /* In most cases, we will only need one bio. But in case the lower 1437 * level restrictions happen to be different at this offset on this 1438 * side than those of the sending peer, we may need to submit the 1439 * request in more than one bio. 1440 * 1441 * Plain bio_alloc is good enough here, this is no DRBD internally 1442 * generated bio, but a bio allocated on behalf of the peer. 1443 */ 1444 next_bio: 1445 bio = bio_alloc(GFP_NOIO, nr_pages); 1446 if (!bio) { 1447 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages); 1448 goto fail; 1449 } 1450 /* > peer_req->i.sector, unless this is the first bio */ 1451 bio->bi_iter.bi_sector = sector; 1452 bio->bi_bdev = device->ldev->backing_bdev; 1453 bio->bi_rw = rw; 1454 bio->bi_private = peer_req; 1455 bio->bi_end_io = drbd_peer_request_endio; 1456 1457 bio->bi_next = bios; 1458 bios = bio; 1459 ++n_bios; 1460 1461 if (rw & REQ_DISCARD) { 1462 bio->bi_iter.bi_size = data_size; 1463 goto submit; 1464 } 1465 1466 page_chain_for_each(page) { 1467 unsigned len = min_t(unsigned, data_size, PAGE_SIZE); 1468 if (!bio_add_page(bio, page, len, 0)) { 1469 /* A single page must always be possible! 1470 * But in case it fails anyways, 1471 * we deal with it, and complain (below). */ 1472 if (bio->bi_vcnt == 0) { 1473 drbd_err(device, 1474 "bio_add_page failed for len=%u, " 1475 "bi_vcnt=0 (bi_sector=%llu)\n", 1476 len, (uint64_t)bio->bi_iter.bi_sector); 1477 err = -ENOSPC; 1478 goto fail; 1479 } 1480 goto next_bio; 1481 } 1482 data_size -= len; 1483 sector += len >> 9; 1484 --nr_pages; 1485 } 1486 D_ASSERT(device, data_size == 0); 1487 submit: 1488 D_ASSERT(device, page == NULL); 1489 1490 atomic_set(&peer_req->pending_bios, n_bios); 1491 /* for debugfs: update timestamp, mark as submitted */ 1492 peer_req->submit_jif = jiffies; 1493 peer_req->flags |= EE_SUBMITTED; 1494 do { 1495 bio = bios; 1496 bios = bios->bi_next; 1497 bio->bi_next = NULL; 1498 1499 drbd_generic_make_request(device, fault_type, bio); 1500 } while (bios); 1501 return 0; 1502 1503 fail: 1504 while (bios) { 1505 bio = bios; 1506 bios = bios->bi_next; 1507 bio_put(bio); 1508 } 1509 return err; 1510 } 1511 1512 static void drbd_remove_epoch_entry_interval(struct drbd_device *device, 1513 struct drbd_peer_request *peer_req) 1514 { 1515 struct drbd_interval *i = &peer_req->i; 1516 1517 drbd_remove_interval(&device->write_requests, i); 1518 drbd_clear_interval(i); 1519 1520 /* Wake up any processes waiting for this peer request to complete. */ 1521 if (i->waiting) 1522 wake_up(&device->misc_wait); 1523 } 1524 1525 static void conn_wait_active_ee_empty(struct drbd_connection *connection) 1526 { 1527 struct drbd_peer_device *peer_device; 1528 int vnr; 1529 1530 rcu_read_lock(); 1531 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1532 struct drbd_device *device = peer_device->device; 1533 1534 kref_get(&device->kref); 1535 rcu_read_unlock(); 1536 drbd_wait_ee_list_empty(device, &device->active_ee); 1537 kref_put(&device->kref, drbd_destroy_device); 1538 rcu_read_lock(); 1539 } 1540 rcu_read_unlock(); 1541 } 1542 1543 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi) 1544 { 1545 int rv; 1546 struct p_barrier *p = pi->data; 1547 struct drbd_epoch *epoch; 1548 1549 /* FIXME these are unacked on connection, 1550 * not a specific (peer)device. 1551 */ 1552 connection->current_epoch->barrier_nr = p->barrier; 1553 connection->current_epoch->connection = connection; 1554 rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR); 1555 1556 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from 1557 * the activity log, which means it would not be resynced in case the 1558 * R_PRIMARY crashes now. 1559 * Therefore we must send the barrier_ack after the barrier request was 1560 * completed. */ 1561 switch (connection->resource->write_ordering) { 1562 case WO_NONE: 1563 if (rv == FE_RECYCLED) 1564 return 0; 1565 1566 /* receiver context, in the writeout path of the other node. 1567 * avoid potential distributed deadlock */ 1568 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO); 1569 if (epoch) 1570 break; 1571 else 1572 drbd_warn(connection, "Allocation of an epoch failed, slowing down\n"); 1573 /* Fall through */ 1574 1575 case WO_BDEV_FLUSH: 1576 case WO_DRAIN_IO: 1577 conn_wait_active_ee_empty(connection); 1578 drbd_flush(connection); 1579 1580 if (atomic_read(&connection->current_epoch->epoch_size)) { 1581 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO); 1582 if (epoch) 1583 break; 1584 } 1585 1586 return 0; 1587 default: 1588 drbd_err(connection, "Strangeness in connection->write_ordering %d\n", 1589 connection->resource->write_ordering); 1590 return -EIO; 1591 } 1592 1593 epoch->flags = 0; 1594 atomic_set(&epoch->epoch_size, 0); 1595 atomic_set(&epoch->active, 0); 1596 1597 spin_lock(&connection->epoch_lock); 1598 if (atomic_read(&connection->current_epoch->epoch_size)) { 1599 list_add(&epoch->list, &connection->current_epoch->list); 1600 connection->current_epoch = epoch; 1601 connection->epochs++; 1602 } else { 1603 /* The current_epoch got recycled while we allocated this one... */ 1604 kfree(epoch); 1605 } 1606 spin_unlock(&connection->epoch_lock); 1607 1608 return 0; 1609 } 1610 1611 /* used from receive_RSDataReply (recv_resync_read) 1612 * and from receive_Data */ 1613 static struct drbd_peer_request * 1614 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector, 1615 struct packet_info *pi) __must_hold(local) 1616 { 1617 struct drbd_device *device = peer_device->device; 1618 const sector_t capacity = drbd_get_capacity(device->this_bdev); 1619 struct drbd_peer_request *peer_req; 1620 struct page *page; 1621 int digest_size, err; 1622 unsigned int data_size = pi->size, ds; 1623 void *dig_in = peer_device->connection->int_dig_in; 1624 void *dig_vv = peer_device->connection->int_dig_vv; 1625 unsigned long *data; 1626 struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL; 1627 1628 digest_size = 0; 1629 if (!trim && peer_device->connection->peer_integrity_tfm) { 1630 digest_size = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm); 1631 /* 1632 * FIXME: Receive the incoming digest into the receive buffer 1633 * here, together with its struct p_data? 1634 */ 1635 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size); 1636 if (err) 1637 return NULL; 1638 data_size -= digest_size; 1639 } 1640 1641 if (trim) { 1642 D_ASSERT(peer_device, data_size == 0); 1643 data_size = be32_to_cpu(trim->size); 1644 } 1645 1646 if (!expect(IS_ALIGNED(data_size, 512))) 1647 return NULL; 1648 /* prepare for larger trim requests. */ 1649 if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE)) 1650 return NULL; 1651 1652 /* even though we trust out peer, 1653 * we sometimes have to double check. */ 1654 if (sector + (data_size>>9) > capacity) { 1655 drbd_err(device, "request from peer beyond end of local disk: " 1656 "capacity: %llus < sector: %llus + size: %u\n", 1657 (unsigned long long)capacity, 1658 (unsigned long long)sector, data_size); 1659 return NULL; 1660 } 1661 1662 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD 1663 * "criss-cross" setup, that might cause write-out on some other DRBD, 1664 * which in turn might block on the other node at this very place. */ 1665 peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO); 1666 if (!peer_req) 1667 return NULL; 1668 1669 peer_req->flags |= EE_WRITE; 1670 if (trim) 1671 return peer_req; 1672 1673 ds = data_size; 1674 page = peer_req->pages; 1675 page_chain_for_each(page) { 1676 unsigned len = min_t(int, ds, PAGE_SIZE); 1677 data = kmap(page); 1678 err = drbd_recv_all_warn(peer_device->connection, data, len); 1679 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) { 1680 drbd_err(device, "Fault injection: Corrupting data on receive\n"); 1681 data[0] = data[0] ^ (unsigned long)-1; 1682 } 1683 kunmap(page); 1684 if (err) { 1685 drbd_free_peer_req(device, peer_req); 1686 return NULL; 1687 } 1688 ds -= len; 1689 } 1690 1691 if (digest_size) { 1692 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv); 1693 if (memcmp(dig_in, dig_vv, digest_size)) { 1694 drbd_err(device, "Digest integrity check FAILED: %llus +%u\n", 1695 (unsigned long long)sector, data_size); 1696 drbd_free_peer_req(device, peer_req); 1697 return NULL; 1698 } 1699 } 1700 device->recv_cnt += data_size >> 9; 1701 return peer_req; 1702 } 1703 1704 /* drbd_drain_block() just takes a data block 1705 * out of the socket input buffer, and discards it. 1706 */ 1707 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size) 1708 { 1709 struct page *page; 1710 int err = 0; 1711 void *data; 1712 1713 if (!data_size) 1714 return 0; 1715 1716 page = drbd_alloc_pages(peer_device, 1, 1); 1717 1718 data = kmap(page); 1719 while (data_size) { 1720 unsigned int len = min_t(int, data_size, PAGE_SIZE); 1721 1722 err = drbd_recv_all_warn(peer_device->connection, data, len); 1723 if (err) 1724 break; 1725 data_size -= len; 1726 } 1727 kunmap(page); 1728 drbd_free_pages(peer_device->device, page, 0); 1729 return err; 1730 } 1731 1732 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req, 1733 sector_t sector, int data_size) 1734 { 1735 struct bio_vec bvec; 1736 struct bvec_iter iter; 1737 struct bio *bio; 1738 int digest_size, err, expect; 1739 void *dig_in = peer_device->connection->int_dig_in; 1740 void *dig_vv = peer_device->connection->int_dig_vv; 1741 1742 digest_size = 0; 1743 if (peer_device->connection->peer_integrity_tfm) { 1744 digest_size = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm); 1745 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size); 1746 if (err) 1747 return err; 1748 data_size -= digest_size; 1749 } 1750 1751 /* optimistically update recv_cnt. if receiving fails below, 1752 * we disconnect anyways, and counters will be reset. */ 1753 peer_device->device->recv_cnt += data_size>>9; 1754 1755 bio = req->master_bio; 1756 D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector); 1757 1758 bio_for_each_segment(bvec, bio, iter) { 1759 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset; 1760 expect = min_t(int, data_size, bvec.bv_len); 1761 err = drbd_recv_all_warn(peer_device->connection, mapped, expect); 1762 kunmap(bvec.bv_page); 1763 if (err) 1764 return err; 1765 data_size -= expect; 1766 } 1767 1768 if (digest_size) { 1769 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv); 1770 if (memcmp(dig_in, dig_vv, digest_size)) { 1771 drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n"); 1772 return -EINVAL; 1773 } 1774 } 1775 1776 D_ASSERT(peer_device->device, data_size == 0); 1777 return 0; 1778 } 1779 1780 /* 1781 * e_end_resync_block() is called in ack_sender context via 1782 * drbd_finish_peer_reqs(). 1783 */ 1784 static int e_end_resync_block(struct drbd_work *w, int unused) 1785 { 1786 struct drbd_peer_request *peer_req = 1787 container_of(w, struct drbd_peer_request, w); 1788 struct drbd_peer_device *peer_device = peer_req->peer_device; 1789 struct drbd_device *device = peer_device->device; 1790 sector_t sector = peer_req->i.sector; 1791 int err; 1792 1793 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 1794 1795 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1796 drbd_set_in_sync(device, sector, peer_req->i.size); 1797 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req); 1798 } else { 1799 /* Record failure to sync */ 1800 drbd_rs_failed_io(device, sector, peer_req->i.size); 1801 1802 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req); 1803 } 1804 dec_unacked(device); 1805 1806 return err; 1807 } 1808 1809 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector, 1810 struct packet_info *pi) __releases(local) 1811 { 1812 struct drbd_device *device = peer_device->device; 1813 struct drbd_peer_request *peer_req; 1814 1815 peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi); 1816 if (!peer_req) 1817 goto fail; 1818 1819 dec_rs_pending(device); 1820 1821 inc_unacked(device); 1822 /* corresponding dec_unacked() in e_end_resync_block() 1823 * respective _drbd_clear_done_ee */ 1824 1825 peer_req->w.cb = e_end_resync_block; 1826 peer_req->submit_jif = jiffies; 1827 1828 spin_lock_irq(&device->resource->req_lock); 1829 list_add_tail(&peer_req->w.list, &device->sync_ee); 1830 spin_unlock_irq(&device->resource->req_lock); 1831 1832 atomic_add(pi->size >> 9, &device->rs_sect_ev); 1833 if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0) 1834 return 0; 1835 1836 /* don't care for the reason here */ 1837 drbd_err(device, "submit failed, triggering re-connect\n"); 1838 spin_lock_irq(&device->resource->req_lock); 1839 list_del(&peer_req->w.list); 1840 spin_unlock_irq(&device->resource->req_lock); 1841 1842 drbd_free_peer_req(device, peer_req); 1843 fail: 1844 put_ldev(device); 1845 return -EIO; 1846 } 1847 1848 static struct drbd_request * 1849 find_request(struct drbd_device *device, struct rb_root *root, u64 id, 1850 sector_t sector, bool missing_ok, const char *func) 1851 { 1852 struct drbd_request *req; 1853 1854 /* Request object according to our peer */ 1855 req = (struct drbd_request *)(unsigned long)id; 1856 if (drbd_contains_interval(root, sector, &req->i) && req->i.local) 1857 return req; 1858 if (!missing_ok) { 1859 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func, 1860 (unsigned long)id, (unsigned long long)sector); 1861 } 1862 return NULL; 1863 } 1864 1865 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi) 1866 { 1867 struct drbd_peer_device *peer_device; 1868 struct drbd_device *device; 1869 struct drbd_request *req; 1870 sector_t sector; 1871 int err; 1872 struct p_data *p = pi->data; 1873 1874 peer_device = conn_peer_device(connection, pi->vnr); 1875 if (!peer_device) 1876 return -EIO; 1877 device = peer_device->device; 1878 1879 sector = be64_to_cpu(p->sector); 1880 1881 spin_lock_irq(&device->resource->req_lock); 1882 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__); 1883 spin_unlock_irq(&device->resource->req_lock); 1884 if (unlikely(!req)) 1885 return -EIO; 1886 1887 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid 1888 * special casing it there for the various failure cases. 1889 * still no race with drbd_fail_pending_reads */ 1890 err = recv_dless_read(peer_device, req, sector, pi->size); 1891 if (!err) 1892 req_mod(req, DATA_RECEIVED); 1893 /* else: nothing. handled from drbd_disconnect... 1894 * I don't think we may complete this just yet 1895 * in case we are "on-disconnect: freeze" */ 1896 1897 return err; 1898 } 1899 1900 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi) 1901 { 1902 struct drbd_peer_device *peer_device; 1903 struct drbd_device *device; 1904 sector_t sector; 1905 int err; 1906 struct p_data *p = pi->data; 1907 1908 peer_device = conn_peer_device(connection, pi->vnr); 1909 if (!peer_device) 1910 return -EIO; 1911 device = peer_device->device; 1912 1913 sector = be64_to_cpu(p->sector); 1914 D_ASSERT(device, p->block_id == ID_SYNCER); 1915 1916 if (get_ldev(device)) { 1917 /* data is submitted to disk within recv_resync_read. 1918 * corresponding put_ldev done below on error, 1919 * or in drbd_peer_request_endio. */ 1920 err = recv_resync_read(peer_device, sector, pi); 1921 } else { 1922 if (__ratelimit(&drbd_ratelimit_state)) 1923 drbd_err(device, "Can not write resync data to local disk.\n"); 1924 1925 err = drbd_drain_block(peer_device, pi->size); 1926 1927 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size); 1928 } 1929 1930 atomic_add(pi->size >> 9, &device->rs_sect_in); 1931 1932 return err; 1933 } 1934 1935 static void restart_conflicting_writes(struct drbd_device *device, 1936 sector_t sector, int size) 1937 { 1938 struct drbd_interval *i; 1939 struct drbd_request *req; 1940 1941 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 1942 if (!i->local) 1943 continue; 1944 req = container_of(i, struct drbd_request, i); 1945 if (req->rq_state & RQ_LOCAL_PENDING || 1946 !(req->rq_state & RQ_POSTPONED)) 1947 continue; 1948 /* as it is RQ_POSTPONED, this will cause it to 1949 * be queued on the retry workqueue. */ 1950 __req_mod(req, CONFLICT_RESOLVED, NULL); 1951 } 1952 } 1953 1954 /* 1955 * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs(). 1956 */ 1957 static int e_end_block(struct drbd_work *w, int cancel) 1958 { 1959 struct drbd_peer_request *peer_req = 1960 container_of(w, struct drbd_peer_request, w); 1961 struct drbd_peer_device *peer_device = peer_req->peer_device; 1962 struct drbd_device *device = peer_device->device; 1963 sector_t sector = peer_req->i.sector; 1964 int err = 0, pcmd; 1965 1966 if (peer_req->flags & EE_SEND_WRITE_ACK) { 1967 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1968 pcmd = (device->state.conn >= C_SYNC_SOURCE && 1969 device->state.conn <= C_PAUSED_SYNC_T && 1970 peer_req->flags & EE_MAY_SET_IN_SYNC) ? 1971 P_RS_WRITE_ACK : P_WRITE_ACK; 1972 err = drbd_send_ack(peer_device, pcmd, peer_req); 1973 if (pcmd == P_RS_WRITE_ACK) 1974 drbd_set_in_sync(device, sector, peer_req->i.size); 1975 } else { 1976 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req); 1977 /* we expect it to be marked out of sync anyways... 1978 * maybe assert this? */ 1979 } 1980 dec_unacked(device); 1981 } 1982 1983 /* we delete from the conflict detection hash _after_ we sent out the 1984 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */ 1985 if (peer_req->flags & EE_IN_INTERVAL_TREE) { 1986 spin_lock_irq(&device->resource->req_lock); 1987 D_ASSERT(device, !drbd_interval_empty(&peer_req->i)); 1988 drbd_remove_epoch_entry_interval(device, peer_req); 1989 if (peer_req->flags & EE_RESTART_REQUESTS) 1990 restart_conflicting_writes(device, sector, peer_req->i.size); 1991 spin_unlock_irq(&device->resource->req_lock); 1992 } else 1993 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 1994 1995 drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0)); 1996 1997 return err; 1998 } 1999 2000 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack) 2001 { 2002 struct drbd_peer_request *peer_req = 2003 container_of(w, struct drbd_peer_request, w); 2004 struct drbd_peer_device *peer_device = peer_req->peer_device; 2005 int err; 2006 2007 err = drbd_send_ack(peer_device, ack, peer_req); 2008 dec_unacked(peer_device->device); 2009 2010 return err; 2011 } 2012 2013 static int e_send_superseded(struct drbd_work *w, int unused) 2014 { 2015 return e_send_ack(w, P_SUPERSEDED); 2016 } 2017 2018 static int e_send_retry_write(struct drbd_work *w, int unused) 2019 { 2020 struct drbd_peer_request *peer_req = 2021 container_of(w, struct drbd_peer_request, w); 2022 struct drbd_connection *connection = peer_req->peer_device->connection; 2023 2024 return e_send_ack(w, connection->agreed_pro_version >= 100 ? 2025 P_RETRY_WRITE : P_SUPERSEDED); 2026 } 2027 2028 static bool seq_greater(u32 a, u32 b) 2029 { 2030 /* 2031 * We assume 32-bit wrap-around here. 2032 * For 24-bit wrap-around, we would have to shift: 2033 * a <<= 8; b <<= 8; 2034 */ 2035 return (s32)a - (s32)b > 0; 2036 } 2037 2038 static u32 seq_max(u32 a, u32 b) 2039 { 2040 return seq_greater(a, b) ? a : b; 2041 } 2042 2043 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq) 2044 { 2045 struct drbd_device *device = peer_device->device; 2046 unsigned int newest_peer_seq; 2047 2048 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) { 2049 spin_lock(&device->peer_seq_lock); 2050 newest_peer_seq = seq_max(device->peer_seq, peer_seq); 2051 device->peer_seq = newest_peer_seq; 2052 spin_unlock(&device->peer_seq_lock); 2053 /* wake up only if we actually changed device->peer_seq */ 2054 if (peer_seq == newest_peer_seq) 2055 wake_up(&device->seq_wait); 2056 } 2057 } 2058 2059 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2) 2060 { 2061 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9))); 2062 } 2063 2064 /* maybe change sync_ee into interval trees as well? */ 2065 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req) 2066 { 2067 struct drbd_peer_request *rs_req; 2068 bool rv = 0; 2069 2070 spin_lock_irq(&device->resource->req_lock); 2071 list_for_each_entry(rs_req, &device->sync_ee, w.list) { 2072 if (overlaps(peer_req->i.sector, peer_req->i.size, 2073 rs_req->i.sector, rs_req->i.size)) { 2074 rv = 1; 2075 break; 2076 } 2077 } 2078 spin_unlock_irq(&device->resource->req_lock); 2079 2080 return rv; 2081 } 2082 2083 /* Called from receive_Data. 2084 * Synchronize packets on sock with packets on msock. 2085 * 2086 * This is here so even when a P_DATA packet traveling via sock overtook an Ack 2087 * packet traveling on msock, they are still processed in the order they have 2088 * been sent. 2089 * 2090 * Note: we don't care for Ack packets overtaking P_DATA packets. 2091 * 2092 * In case packet_seq is larger than device->peer_seq number, there are 2093 * outstanding packets on the msock. We wait for them to arrive. 2094 * In case we are the logically next packet, we update device->peer_seq 2095 * ourselves. Correctly handles 32bit wrap around. 2096 * 2097 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second, 2098 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds 2099 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have 2100 * 1<<9 == 512 seconds aka ages for the 32bit wrap around... 2101 * 2102 * returns 0 if we may process the packet, 2103 * -ERESTARTSYS if we were interrupted (by disconnect signal). */ 2104 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq) 2105 { 2106 struct drbd_device *device = peer_device->device; 2107 DEFINE_WAIT(wait); 2108 long timeout; 2109 int ret = 0, tp; 2110 2111 if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) 2112 return 0; 2113 2114 spin_lock(&device->peer_seq_lock); 2115 for (;;) { 2116 if (!seq_greater(peer_seq - 1, device->peer_seq)) { 2117 device->peer_seq = seq_max(device->peer_seq, peer_seq); 2118 break; 2119 } 2120 2121 if (signal_pending(current)) { 2122 ret = -ERESTARTSYS; 2123 break; 2124 } 2125 2126 rcu_read_lock(); 2127 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries; 2128 rcu_read_unlock(); 2129 2130 if (!tp) 2131 break; 2132 2133 /* Only need to wait if two_primaries is enabled */ 2134 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE); 2135 spin_unlock(&device->peer_seq_lock); 2136 rcu_read_lock(); 2137 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10; 2138 rcu_read_unlock(); 2139 timeout = schedule_timeout(timeout); 2140 spin_lock(&device->peer_seq_lock); 2141 if (!timeout) { 2142 ret = -ETIMEDOUT; 2143 drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n"); 2144 break; 2145 } 2146 } 2147 spin_unlock(&device->peer_seq_lock); 2148 finish_wait(&device->seq_wait, &wait); 2149 return ret; 2150 } 2151 2152 /* see also bio_flags_to_wire() 2153 * DRBD_REQ_*, because we need to semantically map the flags to data packet 2154 * flags and back. We may replicate to other kernel versions. */ 2155 static unsigned long wire_flags_to_bio(u32 dpf) 2156 { 2157 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) | 2158 (dpf & DP_FUA ? REQ_FUA : 0) | 2159 (dpf & DP_FLUSH ? REQ_FLUSH : 0) | 2160 (dpf & DP_DISCARD ? REQ_DISCARD : 0); 2161 } 2162 2163 static void fail_postponed_requests(struct drbd_device *device, sector_t sector, 2164 unsigned int size) 2165 { 2166 struct drbd_interval *i; 2167 2168 repeat: 2169 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 2170 struct drbd_request *req; 2171 struct bio_and_error m; 2172 2173 if (!i->local) 2174 continue; 2175 req = container_of(i, struct drbd_request, i); 2176 if (!(req->rq_state & RQ_POSTPONED)) 2177 continue; 2178 req->rq_state &= ~RQ_POSTPONED; 2179 __req_mod(req, NEG_ACKED, &m); 2180 spin_unlock_irq(&device->resource->req_lock); 2181 if (m.bio) 2182 complete_master_bio(device, &m); 2183 spin_lock_irq(&device->resource->req_lock); 2184 goto repeat; 2185 } 2186 } 2187 2188 static int handle_write_conflicts(struct drbd_device *device, 2189 struct drbd_peer_request *peer_req) 2190 { 2191 struct drbd_connection *connection = peer_req->peer_device->connection; 2192 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags); 2193 sector_t sector = peer_req->i.sector; 2194 const unsigned int size = peer_req->i.size; 2195 struct drbd_interval *i; 2196 bool equal; 2197 int err; 2198 2199 /* 2200 * Inserting the peer request into the write_requests tree will prevent 2201 * new conflicting local requests from being added. 2202 */ 2203 drbd_insert_interval(&device->write_requests, &peer_req->i); 2204 2205 repeat: 2206 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 2207 if (i == &peer_req->i) 2208 continue; 2209 if (i->completed) 2210 continue; 2211 2212 if (!i->local) { 2213 /* 2214 * Our peer has sent a conflicting remote request; this 2215 * should not happen in a two-node setup. Wait for the 2216 * earlier peer request to complete. 2217 */ 2218 err = drbd_wait_misc(device, i); 2219 if (err) 2220 goto out; 2221 goto repeat; 2222 } 2223 2224 equal = i->sector == sector && i->size == size; 2225 if (resolve_conflicts) { 2226 /* 2227 * If the peer request is fully contained within the 2228 * overlapping request, it can be considered overwritten 2229 * and thus superseded; otherwise, it will be retried 2230 * once all overlapping requests have completed. 2231 */ 2232 bool superseded = i->sector <= sector && i->sector + 2233 (i->size >> 9) >= sector + (size >> 9); 2234 2235 if (!equal) 2236 drbd_alert(device, "Concurrent writes detected: " 2237 "local=%llus +%u, remote=%llus +%u, " 2238 "assuming %s came first\n", 2239 (unsigned long long)i->sector, i->size, 2240 (unsigned long long)sector, size, 2241 superseded ? "local" : "remote"); 2242 2243 peer_req->w.cb = superseded ? e_send_superseded : 2244 e_send_retry_write; 2245 list_add_tail(&peer_req->w.list, &device->done_ee); 2246 queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work); 2247 2248 err = -ENOENT; 2249 goto out; 2250 } else { 2251 struct drbd_request *req = 2252 container_of(i, struct drbd_request, i); 2253 2254 if (!equal) 2255 drbd_alert(device, "Concurrent writes detected: " 2256 "local=%llus +%u, remote=%llus +%u\n", 2257 (unsigned long long)i->sector, i->size, 2258 (unsigned long long)sector, size); 2259 2260 if (req->rq_state & RQ_LOCAL_PENDING || 2261 !(req->rq_state & RQ_POSTPONED)) { 2262 /* 2263 * Wait for the node with the discard flag to 2264 * decide if this request has been superseded 2265 * or needs to be retried. 2266 * Requests that have been superseded will 2267 * disappear from the write_requests tree. 2268 * 2269 * In addition, wait for the conflicting 2270 * request to finish locally before submitting 2271 * the conflicting peer request. 2272 */ 2273 err = drbd_wait_misc(device, &req->i); 2274 if (err) { 2275 _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD); 2276 fail_postponed_requests(device, sector, size); 2277 goto out; 2278 } 2279 goto repeat; 2280 } 2281 /* 2282 * Remember to restart the conflicting requests after 2283 * the new peer request has completed. 2284 */ 2285 peer_req->flags |= EE_RESTART_REQUESTS; 2286 } 2287 } 2288 err = 0; 2289 2290 out: 2291 if (err) 2292 drbd_remove_epoch_entry_interval(device, peer_req); 2293 return err; 2294 } 2295 2296 /* mirrored write */ 2297 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi) 2298 { 2299 struct drbd_peer_device *peer_device; 2300 struct drbd_device *device; 2301 struct net_conf *nc; 2302 sector_t sector; 2303 struct drbd_peer_request *peer_req; 2304 struct p_data *p = pi->data; 2305 u32 peer_seq = be32_to_cpu(p->seq_num); 2306 int rw = WRITE; 2307 u32 dp_flags; 2308 int err, tp; 2309 2310 peer_device = conn_peer_device(connection, pi->vnr); 2311 if (!peer_device) 2312 return -EIO; 2313 device = peer_device->device; 2314 2315 if (!get_ldev(device)) { 2316 int err2; 2317 2318 err = wait_for_and_update_peer_seq(peer_device, peer_seq); 2319 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size); 2320 atomic_inc(&connection->current_epoch->epoch_size); 2321 err2 = drbd_drain_block(peer_device, pi->size); 2322 if (!err) 2323 err = err2; 2324 return err; 2325 } 2326 2327 /* 2328 * Corresponding put_ldev done either below (on various errors), or in 2329 * drbd_peer_request_endio, if we successfully submit the data at the 2330 * end of this function. 2331 */ 2332 2333 sector = be64_to_cpu(p->sector); 2334 peer_req = read_in_block(peer_device, p->block_id, sector, pi); 2335 if (!peer_req) { 2336 put_ldev(device); 2337 return -EIO; 2338 } 2339 2340 peer_req->w.cb = e_end_block; 2341 peer_req->submit_jif = jiffies; 2342 peer_req->flags |= EE_APPLICATION; 2343 2344 dp_flags = be32_to_cpu(p->dp_flags); 2345 rw |= wire_flags_to_bio(dp_flags); 2346 if (pi->cmd == P_TRIM) { 2347 struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev); 2348 peer_req->flags |= EE_IS_TRIM; 2349 if (!blk_queue_discard(q)) 2350 peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT; 2351 D_ASSERT(peer_device, peer_req->i.size > 0); 2352 D_ASSERT(peer_device, rw & REQ_DISCARD); 2353 D_ASSERT(peer_device, peer_req->pages == NULL); 2354 } else if (peer_req->pages == NULL) { 2355 D_ASSERT(device, peer_req->i.size == 0); 2356 D_ASSERT(device, dp_flags & DP_FLUSH); 2357 } 2358 2359 if (dp_flags & DP_MAY_SET_IN_SYNC) 2360 peer_req->flags |= EE_MAY_SET_IN_SYNC; 2361 2362 spin_lock(&connection->epoch_lock); 2363 peer_req->epoch = connection->current_epoch; 2364 atomic_inc(&peer_req->epoch->epoch_size); 2365 atomic_inc(&peer_req->epoch->active); 2366 spin_unlock(&connection->epoch_lock); 2367 2368 rcu_read_lock(); 2369 nc = rcu_dereference(peer_device->connection->net_conf); 2370 tp = nc->two_primaries; 2371 if (peer_device->connection->agreed_pro_version < 100) { 2372 switch (nc->wire_protocol) { 2373 case DRBD_PROT_C: 2374 dp_flags |= DP_SEND_WRITE_ACK; 2375 break; 2376 case DRBD_PROT_B: 2377 dp_flags |= DP_SEND_RECEIVE_ACK; 2378 break; 2379 } 2380 } 2381 rcu_read_unlock(); 2382 2383 if (dp_flags & DP_SEND_WRITE_ACK) { 2384 peer_req->flags |= EE_SEND_WRITE_ACK; 2385 inc_unacked(device); 2386 /* corresponding dec_unacked() in e_end_block() 2387 * respective _drbd_clear_done_ee */ 2388 } 2389 2390 if (dp_flags & DP_SEND_RECEIVE_ACK) { 2391 /* I really don't like it that the receiver thread 2392 * sends on the msock, but anyways */ 2393 drbd_send_ack(peer_device, P_RECV_ACK, peer_req); 2394 } 2395 2396 if (tp) { 2397 /* two primaries implies protocol C */ 2398 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK); 2399 peer_req->flags |= EE_IN_INTERVAL_TREE; 2400 err = wait_for_and_update_peer_seq(peer_device, peer_seq); 2401 if (err) 2402 goto out_interrupted; 2403 spin_lock_irq(&device->resource->req_lock); 2404 err = handle_write_conflicts(device, peer_req); 2405 if (err) { 2406 spin_unlock_irq(&device->resource->req_lock); 2407 if (err == -ENOENT) { 2408 put_ldev(device); 2409 return 0; 2410 } 2411 goto out_interrupted; 2412 } 2413 } else { 2414 update_peer_seq(peer_device, peer_seq); 2415 spin_lock_irq(&device->resource->req_lock); 2416 } 2417 /* if we use the zeroout fallback code, we process synchronously 2418 * and we wait for all pending requests, respectively wait for 2419 * active_ee to become empty in drbd_submit_peer_request(); 2420 * better not add ourselves here. */ 2421 if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0) 2422 list_add_tail(&peer_req->w.list, &device->active_ee); 2423 spin_unlock_irq(&device->resource->req_lock); 2424 2425 if (device->state.conn == C_SYNC_TARGET) 2426 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req)); 2427 2428 if (device->state.pdsk < D_INCONSISTENT) { 2429 /* In case we have the only disk of the cluster, */ 2430 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size); 2431 peer_req->flags &= ~EE_MAY_SET_IN_SYNC; 2432 drbd_al_begin_io(device, &peer_req->i); 2433 peer_req->flags |= EE_CALL_AL_COMPLETE_IO; 2434 } 2435 2436 err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR); 2437 if (!err) 2438 return 0; 2439 2440 /* don't care for the reason here */ 2441 drbd_err(device, "submit failed, triggering re-connect\n"); 2442 spin_lock_irq(&device->resource->req_lock); 2443 list_del(&peer_req->w.list); 2444 drbd_remove_epoch_entry_interval(device, peer_req); 2445 spin_unlock_irq(&device->resource->req_lock); 2446 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) { 2447 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO; 2448 drbd_al_complete_io(device, &peer_req->i); 2449 } 2450 2451 out_interrupted: 2452 drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP); 2453 put_ldev(device); 2454 drbd_free_peer_req(device, peer_req); 2455 return err; 2456 } 2457 2458 /* We may throttle resync, if the lower device seems to be busy, 2459 * and current sync rate is above c_min_rate. 2460 * 2461 * To decide whether or not the lower device is busy, we use a scheme similar 2462 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant" 2463 * (more than 64 sectors) of activity we cannot account for with our own resync 2464 * activity, it obviously is "busy". 2465 * 2466 * The current sync rate used here uses only the most recent two step marks, 2467 * to have a short time average so we can react faster. 2468 */ 2469 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector, 2470 bool throttle_if_app_is_waiting) 2471 { 2472 struct lc_element *tmp; 2473 bool throttle = drbd_rs_c_min_rate_throttle(device); 2474 2475 if (!throttle || throttle_if_app_is_waiting) 2476 return throttle; 2477 2478 spin_lock_irq(&device->al_lock); 2479 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector)); 2480 if (tmp) { 2481 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce); 2482 if (test_bit(BME_PRIORITY, &bm_ext->flags)) 2483 throttle = false; 2484 /* Do not slow down if app IO is already waiting for this extent, 2485 * and our progress is necessary for application IO to complete. */ 2486 } 2487 spin_unlock_irq(&device->al_lock); 2488 2489 return throttle; 2490 } 2491 2492 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device) 2493 { 2494 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk; 2495 unsigned long db, dt, dbdt; 2496 unsigned int c_min_rate; 2497 int curr_events; 2498 2499 rcu_read_lock(); 2500 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate; 2501 rcu_read_unlock(); 2502 2503 /* feature disabled? */ 2504 if (c_min_rate == 0) 2505 return false; 2506 2507 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) + 2508 (int)part_stat_read(&disk->part0, sectors[1]) - 2509 atomic_read(&device->rs_sect_ev); 2510 2511 if (atomic_read(&device->ap_actlog_cnt) 2512 || curr_events - device->rs_last_events > 64) { 2513 unsigned long rs_left; 2514 int i; 2515 2516 device->rs_last_events = curr_events; 2517 2518 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP, 2519 * approx. */ 2520 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS; 2521 2522 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T) 2523 rs_left = device->ov_left; 2524 else 2525 rs_left = drbd_bm_total_weight(device) - device->rs_failed; 2526 2527 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ; 2528 if (!dt) 2529 dt++; 2530 db = device->rs_mark_left[i] - rs_left; 2531 dbdt = Bit2KB(db/dt); 2532 2533 if (dbdt > c_min_rate) 2534 return true; 2535 } 2536 return false; 2537 } 2538 2539 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi) 2540 { 2541 struct drbd_peer_device *peer_device; 2542 struct drbd_device *device; 2543 sector_t sector; 2544 sector_t capacity; 2545 struct drbd_peer_request *peer_req; 2546 struct digest_info *di = NULL; 2547 int size, verb; 2548 unsigned int fault_type; 2549 struct p_block_req *p = pi->data; 2550 2551 peer_device = conn_peer_device(connection, pi->vnr); 2552 if (!peer_device) 2553 return -EIO; 2554 device = peer_device->device; 2555 capacity = drbd_get_capacity(device->this_bdev); 2556 2557 sector = be64_to_cpu(p->sector); 2558 size = be32_to_cpu(p->blksize); 2559 2560 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) { 2561 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__, 2562 (unsigned long long)sector, size); 2563 return -EINVAL; 2564 } 2565 if (sector + (size>>9) > capacity) { 2566 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__, 2567 (unsigned long long)sector, size); 2568 return -EINVAL; 2569 } 2570 2571 if (!get_ldev_if_state(device, D_UP_TO_DATE)) { 2572 verb = 1; 2573 switch (pi->cmd) { 2574 case P_DATA_REQUEST: 2575 drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p); 2576 break; 2577 case P_RS_DATA_REQUEST: 2578 case P_CSUM_RS_REQUEST: 2579 case P_OV_REQUEST: 2580 drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p); 2581 break; 2582 case P_OV_REPLY: 2583 verb = 0; 2584 dec_rs_pending(device); 2585 drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC); 2586 break; 2587 default: 2588 BUG(); 2589 } 2590 if (verb && __ratelimit(&drbd_ratelimit_state)) 2591 drbd_err(device, "Can not satisfy peer's read request, " 2592 "no local data.\n"); 2593 2594 /* drain possibly payload */ 2595 return drbd_drain_block(peer_device, pi->size); 2596 } 2597 2598 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD 2599 * "criss-cross" setup, that might cause write-out on some other DRBD, 2600 * which in turn might block on the other node at this very place. */ 2601 peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size, 2602 true /* has real payload */, GFP_NOIO); 2603 if (!peer_req) { 2604 put_ldev(device); 2605 return -ENOMEM; 2606 } 2607 2608 switch (pi->cmd) { 2609 case P_DATA_REQUEST: 2610 peer_req->w.cb = w_e_end_data_req; 2611 fault_type = DRBD_FAULT_DT_RD; 2612 /* application IO, don't drbd_rs_begin_io */ 2613 peer_req->flags |= EE_APPLICATION; 2614 goto submit; 2615 2616 case P_RS_DATA_REQUEST: 2617 peer_req->w.cb = w_e_end_rsdata_req; 2618 fault_type = DRBD_FAULT_RS_RD; 2619 /* used in the sector offset progress display */ 2620 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 2621 break; 2622 2623 case P_OV_REPLY: 2624 case P_CSUM_RS_REQUEST: 2625 fault_type = DRBD_FAULT_RS_RD; 2626 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO); 2627 if (!di) 2628 goto out_free_e; 2629 2630 di->digest_size = pi->size; 2631 di->digest = (((char *)di)+sizeof(struct digest_info)); 2632 2633 peer_req->digest = di; 2634 peer_req->flags |= EE_HAS_DIGEST; 2635 2636 if (drbd_recv_all(peer_device->connection, di->digest, pi->size)) 2637 goto out_free_e; 2638 2639 if (pi->cmd == P_CSUM_RS_REQUEST) { 2640 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89); 2641 peer_req->w.cb = w_e_end_csum_rs_req; 2642 /* used in the sector offset progress display */ 2643 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 2644 /* remember to report stats in drbd_resync_finished */ 2645 device->use_csums = true; 2646 } else if (pi->cmd == P_OV_REPLY) { 2647 /* track progress, we may need to throttle */ 2648 atomic_add(size >> 9, &device->rs_sect_in); 2649 peer_req->w.cb = w_e_end_ov_reply; 2650 dec_rs_pending(device); 2651 /* drbd_rs_begin_io done when we sent this request, 2652 * but accounting still needs to be done. */ 2653 goto submit_for_resync; 2654 } 2655 break; 2656 2657 case P_OV_REQUEST: 2658 if (device->ov_start_sector == ~(sector_t)0 && 2659 peer_device->connection->agreed_pro_version >= 90) { 2660 unsigned long now = jiffies; 2661 int i; 2662 device->ov_start_sector = sector; 2663 device->ov_position = sector; 2664 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector); 2665 device->rs_total = device->ov_left; 2666 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 2667 device->rs_mark_left[i] = device->ov_left; 2668 device->rs_mark_time[i] = now; 2669 } 2670 drbd_info(device, "Online Verify start sector: %llu\n", 2671 (unsigned long long)sector); 2672 } 2673 peer_req->w.cb = w_e_end_ov_req; 2674 fault_type = DRBD_FAULT_RS_RD; 2675 break; 2676 2677 default: 2678 BUG(); 2679 } 2680 2681 /* Throttle, drbd_rs_begin_io and submit should become asynchronous 2682 * wrt the receiver, but it is not as straightforward as it may seem. 2683 * Various places in the resync start and stop logic assume resync 2684 * requests are processed in order, requeuing this on the worker thread 2685 * introduces a bunch of new code for synchronization between threads. 2686 * 2687 * Unlimited throttling before drbd_rs_begin_io may stall the resync 2688 * "forever", throttling after drbd_rs_begin_io will lock that extent 2689 * for application writes for the same time. For now, just throttle 2690 * here, where the rest of the code expects the receiver to sleep for 2691 * a while, anyways. 2692 */ 2693 2694 /* Throttle before drbd_rs_begin_io, as that locks out application IO; 2695 * this defers syncer requests for some time, before letting at least 2696 * on request through. The resync controller on the receiving side 2697 * will adapt to the incoming rate accordingly. 2698 * 2699 * We cannot throttle here if remote is Primary/SyncTarget: 2700 * we would also throttle its application reads. 2701 * In that case, throttling is done on the SyncTarget only. 2702 */ 2703 2704 /* Even though this may be a resync request, we do add to "read_ee"; 2705 * "sync_ee" is only used for resync WRITEs. 2706 * Add to list early, so debugfs can find this request 2707 * even if we have to sleep below. */ 2708 spin_lock_irq(&device->resource->req_lock); 2709 list_add_tail(&peer_req->w.list, &device->read_ee); 2710 spin_unlock_irq(&device->resource->req_lock); 2711 2712 update_receiver_timing_details(connection, drbd_rs_should_slow_down); 2713 if (device->state.peer != R_PRIMARY 2714 && drbd_rs_should_slow_down(device, sector, false)) 2715 schedule_timeout_uninterruptible(HZ/10); 2716 update_receiver_timing_details(connection, drbd_rs_begin_io); 2717 if (drbd_rs_begin_io(device, sector)) 2718 goto out_free_e; 2719 2720 submit_for_resync: 2721 atomic_add(size >> 9, &device->rs_sect_ev); 2722 2723 submit: 2724 update_receiver_timing_details(connection, drbd_submit_peer_request); 2725 inc_unacked(device); 2726 if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0) 2727 return 0; 2728 2729 /* don't care for the reason here */ 2730 drbd_err(device, "submit failed, triggering re-connect\n"); 2731 2732 out_free_e: 2733 spin_lock_irq(&device->resource->req_lock); 2734 list_del(&peer_req->w.list); 2735 spin_unlock_irq(&device->resource->req_lock); 2736 /* no drbd_rs_complete_io(), we are dropping the connection anyways */ 2737 2738 put_ldev(device); 2739 drbd_free_peer_req(device, peer_req); 2740 return -EIO; 2741 } 2742 2743 /** 2744 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries 2745 */ 2746 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local) 2747 { 2748 struct drbd_device *device = peer_device->device; 2749 int self, peer, rv = -100; 2750 unsigned long ch_self, ch_peer; 2751 enum drbd_after_sb_p after_sb_0p; 2752 2753 self = device->ldev->md.uuid[UI_BITMAP] & 1; 2754 peer = device->p_uuid[UI_BITMAP] & 1; 2755 2756 ch_peer = device->p_uuid[UI_SIZE]; 2757 ch_self = device->comm_bm_set; 2758 2759 rcu_read_lock(); 2760 after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p; 2761 rcu_read_unlock(); 2762 switch (after_sb_0p) { 2763 case ASB_CONSENSUS: 2764 case ASB_DISCARD_SECONDARY: 2765 case ASB_CALL_HELPER: 2766 case ASB_VIOLENTLY: 2767 drbd_err(device, "Configuration error.\n"); 2768 break; 2769 case ASB_DISCONNECT: 2770 break; 2771 case ASB_DISCARD_YOUNGER_PRI: 2772 if (self == 0 && peer == 1) { 2773 rv = -1; 2774 break; 2775 } 2776 if (self == 1 && peer == 0) { 2777 rv = 1; 2778 break; 2779 } 2780 /* Else fall through to one of the other strategies... */ 2781 case ASB_DISCARD_OLDER_PRI: 2782 if (self == 0 && peer == 1) { 2783 rv = 1; 2784 break; 2785 } 2786 if (self == 1 && peer == 0) { 2787 rv = -1; 2788 break; 2789 } 2790 /* Else fall through to one of the other strategies... */ 2791 drbd_warn(device, "Discard younger/older primary did not find a decision\n" 2792 "Using discard-least-changes instead\n"); 2793 case ASB_DISCARD_ZERO_CHG: 2794 if (ch_peer == 0 && ch_self == 0) { 2795 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) 2796 ? -1 : 1; 2797 break; 2798 } else { 2799 if (ch_peer == 0) { rv = 1; break; } 2800 if (ch_self == 0) { rv = -1; break; } 2801 } 2802 if (after_sb_0p == ASB_DISCARD_ZERO_CHG) 2803 break; 2804 case ASB_DISCARD_LEAST_CHG: 2805 if (ch_self < ch_peer) 2806 rv = -1; 2807 else if (ch_self > ch_peer) 2808 rv = 1; 2809 else /* ( ch_self == ch_peer ) */ 2810 /* Well, then use something else. */ 2811 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) 2812 ? -1 : 1; 2813 break; 2814 case ASB_DISCARD_LOCAL: 2815 rv = -1; 2816 break; 2817 case ASB_DISCARD_REMOTE: 2818 rv = 1; 2819 } 2820 2821 return rv; 2822 } 2823 2824 /** 2825 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary 2826 */ 2827 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local) 2828 { 2829 struct drbd_device *device = peer_device->device; 2830 int hg, rv = -100; 2831 enum drbd_after_sb_p after_sb_1p; 2832 2833 rcu_read_lock(); 2834 after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p; 2835 rcu_read_unlock(); 2836 switch (after_sb_1p) { 2837 case ASB_DISCARD_YOUNGER_PRI: 2838 case ASB_DISCARD_OLDER_PRI: 2839 case ASB_DISCARD_LEAST_CHG: 2840 case ASB_DISCARD_LOCAL: 2841 case ASB_DISCARD_REMOTE: 2842 case ASB_DISCARD_ZERO_CHG: 2843 drbd_err(device, "Configuration error.\n"); 2844 break; 2845 case ASB_DISCONNECT: 2846 break; 2847 case ASB_CONSENSUS: 2848 hg = drbd_asb_recover_0p(peer_device); 2849 if (hg == -1 && device->state.role == R_SECONDARY) 2850 rv = hg; 2851 if (hg == 1 && device->state.role == R_PRIMARY) 2852 rv = hg; 2853 break; 2854 case ASB_VIOLENTLY: 2855 rv = drbd_asb_recover_0p(peer_device); 2856 break; 2857 case ASB_DISCARD_SECONDARY: 2858 return device->state.role == R_PRIMARY ? 1 : -1; 2859 case ASB_CALL_HELPER: 2860 hg = drbd_asb_recover_0p(peer_device); 2861 if (hg == -1 && device->state.role == R_PRIMARY) { 2862 enum drbd_state_rv rv2; 2863 2864 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE, 2865 * we might be here in C_WF_REPORT_PARAMS which is transient. 2866 * we do not need to wait for the after state change work either. */ 2867 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY)); 2868 if (rv2 != SS_SUCCESS) { 2869 drbd_khelper(device, "pri-lost-after-sb"); 2870 } else { 2871 drbd_warn(device, "Successfully gave up primary role.\n"); 2872 rv = hg; 2873 } 2874 } else 2875 rv = hg; 2876 } 2877 2878 return rv; 2879 } 2880 2881 /** 2882 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries 2883 */ 2884 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local) 2885 { 2886 struct drbd_device *device = peer_device->device; 2887 int hg, rv = -100; 2888 enum drbd_after_sb_p after_sb_2p; 2889 2890 rcu_read_lock(); 2891 after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p; 2892 rcu_read_unlock(); 2893 switch (after_sb_2p) { 2894 case ASB_DISCARD_YOUNGER_PRI: 2895 case ASB_DISCARD_OLDER_PRI: 2896 case ASB_DISCARD_LEAST_CHG: 2897 case ASB_DISCARD_LOCAL: 2898 case ASB_DISCARD_REMOTE: 2899 case ASB_CONSENSUS: 2900 case ASB_DISCARD_SECONDARY: 2901 case ASB_DISCARD_ZERO_CHG: 2902 drbd_err(device, "Configuration error.\n"); 2903 break; 2904 case ASB_VIOLENTLY: 2905 rv = drbd_asb_recover_0p(peer_device); 2906 break; 2907 case ASB_DISCONNECT: 2908 break; 2909 case ASB_CALL_HELPER: 2910 hg = drbd_asb_recover_0p(peer_device); 2911 if (hg == -1) { 2912 enum drbd_state_rv rv2; 2913 2914 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE, 2915 * we might be here in C_WF_REPORT_PARAMS which is transient. 2916 * we do not need to wait for the after state change work either. */ 2917 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY)); 2918 if (rv2 != SS_SUCCESS) { 2919 drbd_khelper(device, "pri-lost-after-sb"); 2920 } else { 2921 drbd_warn(device, "Successfully gave up primary role.\n"); 2922 rv = hg; 2923 } 2924 } else 2925 rv = hg; 2926 } 2927 2928 return rv; 2929 } 2930 2931 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid, 2932 u64 bits, u64 flags) 2933 { 2934 if (!uuid) { 2935 drbd_info(device, "%s uuid info vanished while I was looking!\n", text); 2936 return; 2937 } 2938 drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n", 2939 text, 2940 (unsigned long long)uuid[UI_CURRENT], 2941 (unsigned long long)uuid[UI_BITMAP], 2942 (unsigned long long)uuid[UI_HISTORY_START], 2943 (unsigned long long)uuid[UI_HISTORY_END], 2944 (unsigned long long)bits, 2945 (unsigned long long)flags); 2946 } 2947 2948 /* 2949 100 after split brain try auto recover 2950 2 C_SYNC_SOURCE set BitMap 2951 1 C_SYNC_SOURCE use BitMap 2952 0 no Sync 2953 -1 C_SYNC_TARGET use BitMap 2954 -2 C_SYNC_TARGET set BitMap 2955 -100 after split brain, disconnect 2956 -1000 unrelated data 2957 -1091 requires proto 91 2958 -1096 requires proto 96 2959 */ 2960 static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local) 2961 { 2962 struct drbd_peer_device *const peer_device = first_peer_device(device); 2963 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL; 2964 u64 self, peer; 2965 int i, j; 2966 2967 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1); 2968 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 2969 2970 *rule_nr = 10; 2971 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED) 2972 return 0; 2973 2974 *rule_nr = 20; 2975 if ((self == UUID_JUST_CREATED || self == (u64)0) && 2976 peer != UUID_JUST_CREATED) 2977 return -2; 2978 2979 *rule_nr = 30; 2980 if (self != UUID_JUST_CREATED && 2981 (peer == UUID_JUST_CREATED || peer == (u64)0)) 2982 return 2; 2983 2984 if (self == peer) { 2985 int rct, dc; /* roles at crash time */ 2986 2987 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) { 2988 2989 if (connection->agreed_pro_version < 91) 2990 return -1091; 2991 2992 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) && 2993 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) { 2994 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n"); 2995 drbd_uuid_move_history(device); 2996 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP]; 2997 device->ldev->md.uuid[UI_BITMAP] = 0; 2998 2999 drbd_uuid_dump(device, "self", device->ldev->md.uuid, 3000 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0); 3001 *rule_nr = 34; 3002 } else { 3003 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n"); 3004 *rule_nr = 36; 3005 } 3006 3007 return 1; 3008 } 3009 3010 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) { 3011 3012 if (connection->agreed_pro_version < 91) 3013 return -1091; 3014 3015 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) && 3016 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) { 3017 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n"); 3018 3019 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START]; 3020 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP]; 3021 device->p_uuid[UI_BITMAP] = 0UL; 3022 3023 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 3024 *rule_nr = 35; 3025 } else { 3026 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n"); 3027 *rule_nr = 37; 3028 } 3029 3030 return -1; 3031 } 3032 3033 /* Common power [off|failure] */ 3034 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) + 3035 (device->p_uuid[UI_FLAGS] & 2); 3036 /* lowest bit is set when we were primary, 3037 * next bit (weight 2) is set when peer was primary */ 3038 *rule_nr = 40; 3039 3040 switch (rct) { 3041 case 0: /* !self_pri && !peer_pri */ return 0; 3042 case 1: /* self_pri && !peer_pri */ return 1; 3043 case 2: /* !self_pri && peer_pri */ return -1; 3044 case 3: /* self_pri && peer_pri */ 3045 dc = test_bit(RESOLVE_CONFLICTS, &connection->flags); 3046 return dc ? -1 : 1; 3047 } 3048 } 3049 3050 *rule_nr = 50; 3051 peer = device->p_uuid[UI_BITMAP] & ~((u64)1); 3052 if (self == peer) 3053 return -1; 3054 3055 *rule_nr = 51; 3056 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1); 3057 if (self == peer) { 3058 if (connection->agreed_pro_version < 96 ? 3059 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == 3060 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) : 3061 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) { 3062 /* The last P_SYNC_UUID did not get though. Undo the last start of 3063 resync as sync source modifications of the peer's UUIDs. */ 3064 3065 if (connection->agreed_pro_version < 91) 3066 return -1091; 3067 3068 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START]; 3069 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1]; 3070 3071 drbd_info(device, "Lost last syncUUID packet, corrected:\n"); 3072 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 3073 3074 return -1; 3075 } 3076 } 3077 3078 *rule_nr = 60; 3079 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1); 3080 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 3081 peer = device->p_uuid[i] & ~((u64)1); 3082 if (self == peer) 3083 return -2; 3084 } 3085 3086 *rule_nr = 70; 3087 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1); 3088 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 3089 if (self == peer) 3090 return 1; 3091 3092 *rule_nr = 71; 3093 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1); 3094 if (self == peer) { 3095 if (connection->agreed_pro_version < 96 ? 3096 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == 3097 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) : 3098 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) { 3099 /* The last P_SYNC_UUID did not get though. Undo the last start of 3100 resync as sync source modifications of our UUIDs. */ 3101 3102 if (connection->agreed_pro_version < 91) 3103 return -1091; 3104 3105 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]); 3106 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]); 3107 3108 drbd_info(device, "Last syncUUID did not get through, corrected:\n"); 3109 drbd_uuid_dump(device, "self", device->ldev->md.uuid, 3110 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0); 3111 3112 return 1; 3113 } 3114 } 3115 3116 3117 *rule_nr = 80; 3118 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 3119 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 3120 self = device->ldev->md.uuid[i] & ~((u64)1); 3121 if (self == peer) 3122 return 2; 3123 } 3124 3125 *rule_nr = 90; 3126 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1); 3127 peer = device->p_uuid[UI_BITMAP] & ~((u64)1); 3128 if (self == peer && self != ((u64)0)) 3129 return 100; 3130 3131 *rule_nr = 100; 3132 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 3133 self = device->ldev->md.uuid[i] & ~((u64)1); 3134 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) { 3135 peer = device->p_uuid[j] & ~((u64)1); 3136 if (self == peer) 3137 return -100; 3138 } 3139 } 3140 3141 return -1000; 3142 } 3143 3144 /* drbd_sync_handshake() returns the new conn state on success, or 3145 CONN_MASK (-1) on failure. 3146 */ 3147 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device, 3148 enum drbd_role peer_role, 3149 enum drbd_disk_state peer_disk) __must_hold(local) 3150 { 3151 struct drbd_device *device = peer_device->device; 3152 enum drbd_conns rv = C_MASK; 3153 enum drbd_disk_state mydisk; 3154 struct net_conf *nc; 3155 int hg, rule_nr, rr_conflict, tentative; 3156 3157 mydisk = device->state.disk; 3158 if (mydisk == D_NEGOTIATING) 3159 mydisk = device->new_state_tmp.disk; 3160 3161 drbd_info(device, "drbd_sync_handshake:\n"); 3162 3163 spin_lock_irq(&device->ldev->md.uuid_lock); 3164 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0); 3165 drbd_uuid_dump(device, "peer", device->p_uuid, 3166 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 3167 3168 hg = drbd_uuid_compare(device, &rule_nr); 3169 spin_unlock_irq(&device->ldev->md.uuid_lock); 3170 3171 drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr); 3172 3173 if (hg == -1000) { 3174 drbd_alert(device, "Unrelated data, aborting!\n"); 3175 return C_MASK; 3176 } 3177 if (hg < -1000) { 3178 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000); 3179 return C_MASK; 3180 } 3181 3182 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) || 3183 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) { 3184 int f = (hg == -100) || abs(hg) == 2; 3185 hg = mydisk > D_INCONSISTENT ? 1 : -1; 3186 if (f) 3187 hg = hg*2; 3188 drbd_info(device, "Becoming sync %s due to disk states.\n", 3189 hg > 0 ? "source" : "target"); 3190 } 3191 3192 if (abs(hg) == 100) 3193 drbd_khelper(device, "initial-split-brain"); 3194 3195 rcu_read_lock(); 3196 nc = rcu_dereference(peer_device->connection->net_conf); 3197 3198 if (hg == 100 || (hg == -100 && nc->always_asbp)) { 3199 int pcount = (device->state.role == R_PRIMARY) 3200 + (peer_role == R_PRIMARY); 3201 int forced = (hg == -100); 3202 3203 switch (pcount) { 3204 case 0: 3205 hg = drbd_asb_recover_0p(peer_device); 3206 break; 3207 case 1: 3208 hg = drbd_asb_recover_1p(peer_device); 3209 break; 3210 case 2: 3211 hg = drbd_asb_recover_2p(peer_device); 3212 break; 3213 } 3214 if (abs(hg) < 100) { 3215 drbd_warn(device, "Split-Brain detected, %d primaries, " 3216 "automatically solved. Sync from %s node\n", 3217 pcount, (hg < 0) ? "peer" : "this"); 3218 if (forced) { 3219 drbd_warn(device, "Doing a full sync, since" 3220 " UUIDs where ambiguous.\n"); 3221 hg = hg*2; 3222 } 3223 } 3224 } 3225 3226 if (hg == -100) { 3227 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1)) 3228 hg = -1; 3229 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1)) 3230 hg = 1; 3231 3232 if (abs(hg) < 100) 3233 drbd_warn(device, "Split-Brain detected, manually solved. " 3234 "Sync from %s node\n", 3235 (hg < 0) ? "peer" : "this"); 3236 } 3237 rr_conflict = nc->rr_conflict; 3238 tentative = nc->tentative; 3239 rcu_read_unlock(); 3240 3241 if (hg == -100) { 3242 /* FIXME this log message is not correct if we end up here 3243 * after an attempted attach on a diskless node. 3244 * We just refuse to attach -- well, we drop the "connection" 3245 * to that disk, in a way... */ 3246 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n"); 3247 drbd_khelper(device, "split-brain"); 3248 return C_MASK; 3249 } 3250 3251 if (hg > 0 && mydisk <= D_INCONSISTENT) { 3252 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n"); 3253 return C_MASK; 3254 } 3255 3256 if (hg < 0 && /* by intention we do not use mydisk here. */ 3257 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) { 3258 switch (rr_conflict) { 3259 case ASB_CALL_HELPER: 3260 drbd_khelper(device, "pri-lost"); 3261 /* fall through */ 3262 case ASB_DISCONNECT: 3263 drbd_err(device, "I shall become SyncTarget, but I am primary!\n"); 3264 return C_MASK; 3265 case ASB_VIOLENTLY: 3266 drbd_warn(device, "Becoming SyncTarget, violating the stable-data" 3267 "assumption\n"); 3268 } 3269 } 3270 3271 if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) { 3272 if (hg == 0) 3273 drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n"); 3274 else 3275 drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.", 3276 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET), 3277 abs(hg) >= 2 ? "full" : "bit-map based"); 3278 return C_MASK; 3279 } 3280 3281 if (abs(hg) >= 2) { 3282 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n"); 3283 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake", 3284 BM_LOCKED_SET_ALLOWED)) 3285 return C_MASK; 3286 } 3287 3288 if (hg > 0) { /* become sync source. */ 3289 rv = C_WF_BITMAP_S; 3290 } else if (hg < 0) { /* become sync target */ 3291 rv = C_WF_BITMAP_T; 3292 } else { 3293 rv = C_CONNECTED; 3294 if (drbd_bm_total_weight(device)) { 3295 drbd_info(device, "No resync, but %lu bits in bitmap!\n", 3296 drbd_bm_total_weight(device)); 3297 } 3298 } 3299 3300 return rv; 3301 } 3302 3303 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer) 3304 { 3305 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */ 3306 if (peer == ASB_DISCARD_REMOTE) 3307 return ASB_DISCARD_LOCAL; 3308 3309 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */ 3310 if (peer == ASB_DISCARD_LOCAL) 3311 return ASB_DISCARD_REMOTE; 3312 3313 /* everything else is valid if they are equal on both sides. */ 3314 return peer; 3315 } 3316 3317 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi) 3318 { 3319 struct p_protocol *p = pi->data; 3320 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p; 3321 int p_proto, p_discard_my_data, p_two_primaries, cf; 3322 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL; 3323 char integrity_alg[SHARED_SECRET_MAX] = ""; 3324 struct crypto_hash *peer_integrity_tfm = NULL; 3325 void *int_dig_in = NULL, *int_dig_vv = NULL; 3326 3327 p_proto = be32_to_cpu(p->protocol); 3328 p_after_sb_0p = be32_to_cpu(p->after_sb_0p); 3329 p_after_sb_1p = be32_to_cpu(p->after_sb_1p); 3330 p_after_sb_2p = be32_to_cpu(p->after_sb_2p); 3331 p_two_primaries = be32_to_cpu(p->two_primaries); 3332 cf = be32_to_cpu(p->conn_flags); 3333 p_discard_my_data = cf & CF_DISCARD_MY_DATA; 3334 3335 if (connection->agreed_pro_version >= 87) { 3336 int err; 3337 3338 if (pi->size > sizeof(integrity_alg)) 3339 return -EIO; 3340 err = drbd_recv_all(connection, integrity_alg, pi->size); 3341 if (err) 3342 return err; 3343 integrity_alg[SHARED_SECRET_MAX - 1] = 0; 3344 } 3345 3346 if (pi->cmd != P_PROTOCOL_UPDATE) { 3347 clear_bit(CONN_DRY_RUN, &connection->flags); 3348 3349 if (cf & CF_DRY_RUN) 3350 set_bit(CONN_DRY_RUN, &connection->flags); 3351 3352 rcu_read_lock(); 3353 nc = rcu_dereference(connection->net_conf); 3354 3355 if (p_proto != nc->wire_protocol) { 3356 drbd_err(connection, "incompatible %s settings\n", "protocol"); 3357 goto disconnect_rcu_unlock; 3358 } 3359 3360 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) { 3361 drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri"); 3362 goto disconnect_rcu_unlock; 3363 } 3364 3365 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) { 3366 drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri"); 3367 goto disconnect_rcu_unlock; 3368 } 3369 3370 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) { 3371 drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri"); 3372 goto disconnect_rcu_unlock; 3373 } 3374 3375 if (p_discard_my_data && nc->discard_my_data) { 3376 drbd_err(connection, "incompatible %s settings\n", "discard-my-data"); 3377 goto disconnect_rcu_unlock; 3378 } 3379 3380 if (p_two_primaries != nc->two_primaries) { 3381 drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries"); 3382 goto disconnect_rcu_unlock; 3383 } 3384 3385 if (strcmp(integrity_alg, nc->integrity_alg)) { 3386 drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg"); 3387 goto disconnect_rcu_unlock; 3388 } 3389 3390 rcu_read_unlock(); 3391 } 3392 3393 if (integrity_alg[0]) { 3394 int hash_size; 3395 3396 /* 3397 * We can only change the peer data integrity algorithm 3398 * here. Changing our own data integrity algorithm 3399 * requires that we send a P_PROTOCOL_UPDATE packet at 3400 * the same time; otherwise, the peer has no way to 3401 * tell between which packets the algorithm should 3402 * change. 3403 */ 3404 3405 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC); 3406 if (!peer_integrity_tfm) { 3407 drbd_err(connection, "peer data-integrity-alg %s not supported\n", 3408 integrity_alg); 3409 goto disconnect; 3410 } 3411 3412 hash_size = crypto_hash_digestsize(peer_integrity_tfm); 3413 int_dig_in = kmalloc(hash_size, GFP_KERNEL); 3414 int_dig_vv = kmalloc(hash_size, GFP_KERNEL); 3415 if (!(int_dig_in && int_dig_vv)) { 3416 drbd_err(connection, "Allocation of buffers for data integrity checking failed\n"); 3417 goto disconnect; 3418 } 3419 } 3420 3421 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL); 3422 if (!new_net_conf) { 3423 drbd_err(connection, "Allocation of new net_conf failed\n"); 3424 goto disconnect; 3425 } 3426 3427 mutex_lock(&connection->data.mutex); 3428 mutex_lock(&connection->resource->conf_update); 3429 old_net_conf = connection->net_conf; 3430 *new_net_conf = *old_net_conf; 3431 3432 new_net_conf->wire_protocol = p_proto; 3433 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p); 3434 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p); 3435 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p); 3436 new_net_conf->two_primaries = p_two_primaries; 3437 3438 rcu_assign_pointer(connection->net_conf, new_net_conf); 3439 mutex_unlock(&connection->resource->conf_update); 3440 mutex_unlock(&connection->data.mutex); 3441 3442 crypto_free_hash(connection->peer_integrity_tfm); 3443 kfree(connection->int_dig_in); 3444 kfree(connection->int_dig_vv); 3445 connection->peer_integrity_tfm = peer_integrity_tfm; 3446 connection->int_dig_in = int_dig_in; 3447 connection->int_dig_vv = int_dig_vv; 3448 3449 if (strcmp(old_net_conf->integrity_alg, integrity_alg)) 3450 drbd_info(connection, "peer data-integrity-alg: %s\n", 3451 integrity_alg[0] ? integrity_alg : "(none)"); 3452 3453 synchronize_rcu(); 3454 kfree(old_net_conf); 3455 return 0; 3456 3457 disconnect_rcu_unlock: 3458 rcu_read_unlock(); 3459 disconnect: 3460 crypto_free_hash(peer_integrity_tfm); 3461 kfree(int_dig_in); 3462 kfree(int_dig_vv); 3463 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 3464 return -EIO; 3465 } 3466 3467 /* helper function 3468 * input: alg name, feature name 3469 * return: NULL (alg name was "") 3470 * ERR_PTR(error) if something goes wrong 3471 * or the crypto hash ptr, if it worked out ok. */ 3472 static struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device, 3473 const char *alg, const char *name) 3474 { 3475 struct crypto_hash *tfm; 3476 3477 if (!alg[0]) 3478 return NULL; 3479 3480 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC); 3481 if (IS_ERR(tfm)) { 3482 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n", 3483 alg, name, PTR_ERR(tfm)); 3484 return tfm; 3485 } 3486 return tfm; 3487 } 3488 3489 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi) 3490 { 3491 void *buffer = connection->data.rbuf; 3492 int size = pi->size; 3493 3494 while (size) { 3495 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE); 3496 s = drbd_recv(connection, buffer, s); 3497 if (s <= 0) { 3498 if (s < 0) 3499 return s; 3500 break; 3501 } 3502 size -= s; 3503 } 3504 if (size) 3505 return -EIO; 3506 return 0; 3507 } 3508 3509 /* 3510 * config_unknown_volume - device configuration command for unknown volume 3511 * 3512 * When a device is added to an existing connection, the node on which the 3513 * device is added first will send configuration commands to its peer but the 3514 * peer will not know about the device yet. It will warn and ignore these 3515 * commands. Once the device is added on the second node, the second node will 3516 * send the same device configuration commands, but in the other direction. 3517 * 3518 * (We can also end up here if drbd is misconfigured.) 3519 */ 3520 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi) 3521 { 3522 drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n", 3523 cmdname(pi->cmd), pi->vnr); 3524 return ignore_remaining_packet(connection, pi); 3525 } 3526 3527 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi) 3528 { 3529 struct drbd_peer_device *peer_device; 3530 struct drbd_device *device; 3531 struct p_rs_param_95 *p; 3532 unsigned int header_size, data_size, exp_max_sz; 3533 struct crypto_hash *verify_tfm = NULL; 3534 struct crypto_hash *csums_tfm = NULL; 3535 struct net_conf *old_net_conf, *new_net_conf = NULL; 3536 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL; 3537 const int apv = connection->agreed_pro_version; 3538 struct fifo_buffer *old_plan = NULL, *new_plan = NULL; 3539 int fifo_size = 0; 3540 int err; 3541 3542 peer_device = conn_peer_device(connection, pi->vnr); 3543 if (!peer_device) 3544 return config_unknown_volume(connection, pi); 3545 device = peer_device->device; 3546 3547 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param) 3548 : apv == 88 ? sizeof(struct p_rs_param) 3549 + SHARED_SECRET_MAX 3550 : apv <= 94 ? sizeof(struct p_rs_param_89) 3551 : /* apv >= 95 */ sizeof(struct p_rs_param_95); 3552 3553 if (pi->size > exp_max_sz) { 3554 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n", 3555 pi->size, exp_max_sz); 3556 return -EIO; 3557 } 3558 3559 if (apv <= 88) { 3560 header_size = sizeof(struct p_rs_param); 3561 data_size = pi->size - header_size; 3562 } else if (apv <= 94) { 3563 header_size = sizeof(struct p_rs_param_89); 3564 data_size = pi->size - header_size; 3565 D_ASSERT(device, data_size == 0); 3566 } else { 3567 header_size = sizeof(struct p_rs_param_95); 3568 data_size = pi->size - header_size; 3569 D_ASSERT(device, data_size == 0); 3570 } 3571 3572 /* initialize verify_alg and csums_alg */ 3573 p = pi->data; 3574 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX); 3575 3576 err = drbd_recv_all(peer_device->connection, p, header_size); 3577 if (err) 3578 return err; 3579 3580 mutex_lock(&connection->resource->conf_update); 3581 old_net_conf = peer_device->connection->net_conf; 3582 if (get_ldev(device)) { 3583 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL); 3584 if (!new_disk_conf) { 3585 put_ldev(device); 3586 mutex_unlock(&connection->resource->conf_update); 3587 drbd_err(device, "Allocation of new disk_conf failed\n"); 3588 return -ENOMEM; 3589 } 3590 3591 old_disk_conf = device->ldev->disk_conf; 3592 *new_disk_conf = *old_disk_conf; 3593 3594 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate); 3595 } 3596 3597 if (apv >= 88) { 3598 if (apv == 88) { 3599 if (data_size > SHARED_SECRET_MAX || data_size == 0) { 3600 drbd_err(device, "verify-alg of wrong size, " 3601 "peer wants %u, accepting only up to %u byte\n", 3602 data_size, SHARED_SECRET_MAX); 3603 err = -EIO; 3604 goto reconnect; 3605 } 3606 3607 err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size); 3608 if (err) 3609 goto reconnect; 3610 /* we expect NUL terminated string */ 3611 /* but just in case someone tries to be evil */ 3612 D_ASSERT(device, p->verify_alg[data_size-1] == 0); 3613 p->verify_alg[data_size-1] = 0; 3614 3615 } else /* apv >= 89 */ { 3616 /* we still expect NUL terminated strings */ 3617 /* but just in case someone tries to be evil */ 3618 D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0); 3619 D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0); 3620 p->verify_alg[SHARED_SECRET_MAX-1] = 0; 3621 p->csums_alg[SHARED_SECRET_MAX-1] = 0; 3622 } 3623 3624 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) { 3625 if (device->state.conn == C_WF_REPORT_PARAMS) { 3626 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n", 3627 old_net_conf->verify_alg, p->verify_alg); 3628 goto disconnect; 3629 } 3630 verify_tfm = drbd_crypto_alloc_digest_safe(device, 3631 p->verify_alg, "verify-alg"); 3632 if (IS_ERR(verify_tfm)) { 3633 verify_tfm = NULL; 3634 goto disconnect; 3635 } 3636 } 3637 3638 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) { 3639 if (device->state.conn == C_WF_REPORT_PARAMS) { 3640 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n", 3641 old_net_conf->csums_alg, p->csums_alg); 3642 goto disconnect; 3643 } 3644 csums_tfm = drbd_crypto_alloc_digest_safe(device, 3645 p->csums_alg, "csums-alg"); 3646 if (IS_ERR(csums_tfm)) { 3647 csums_tfm = NULL; 3648 goto disconnect; 3649 } 3650 } 3651 3652 if (apv > 94 && new_disk_conf) { 3653 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead); 3654 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target); 3655 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target); 3656 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate); 3657 3658 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ; 3659 if (fifo_size != device->rs_plan_s->size) { 3660 new_plan = fifo_alloc(fifo_size); 3661 if (!new_plan) { 3662 drbd_err(device, "kmalloc of fifo_buffer failed"); 3663 put_ldev(device); 3664 goto disconnect; 3665 } 3666 } 3667 } 3668 3669 if (verify_tfm || csums_tfm) { 3670 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL); 3671 if (!new_net_conf) { 3672 drbd_err(device, "Allocation of new net_conf failed\n"); 3673 goto disconnect; 3674 } 3675 3676 *new_net_conf = *old_net_conf; 3677 3678 if (verify_tfm) { 3679 strcpy(new_net_conf->verify_alg, p->verify_alg); 3680 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1; 3681 crypto_free_hash(peer_device->connection->verify_tfm); 3682 peer_device->connection->verify_tfm = verify_tfm; 3683 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg); 3684 } 3685 if (csums_tfm) { 3686 strcpy(new_net_conf->csums_alg, p->csums_alg); 3687 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1; 3688 crypto_free_hash(peer_device->connection->csums_tfm); 3689 peer_device->connection->csums_tfm = csums_tfm; 3690 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg); 3691 } 3692 rcu_assign_pointer(connection->net_conf, new_net_conf); 3693 } 3694 } 3695 3696 if (new_disk_conf) { 3697 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf); 3698 put_ldev(device); 3699 } 3700 3701 if (new_plan) { 3702 old_plan = device->rs_plan_s; 3703 rcu_assign_pointer(device->rs_plan_s, new_plan); 3704 } 3705 3706 mutex_unlock(&connection->resource->conf_update); 3707 synchronize_rcu(); 3708 if (new_net_conf) 3709 kfree(old_net_conf); 3710 kfree(old_disk_conf); 3711 kfree(old_plan); 3712 3713 return 0; 3714 3715 reconnect: 3716 if (new_disk_conf) { 3717 put_ldev(device); 3718 kfree(new_disk_conf); 3719 } 3720 mutex_unlock(&connection->resource->conf_update); 3721 return -EIO; 3722 3723 disconnect: 3724 kfree(new_plan); 3725 if (new_disk_conf) { 3726 put_ldev(device); 3727 kfree(new_disk_conf); 3728 } 3729 mutex_unlock(&connection->resource->conf_update); 3730 /* just for completeness: actually not needed, 3731 * as this is not reached if csums_tfm was ok. */ 3732 crypto_free_hash(csums_tfm); 3733 /* but free the verify_tfm again, if csums_tfm did not work out */ 3734 crypto_free_hash(verify_tfm); 3735 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 3736 return -EIO; 3737 } 3738 3739 /* warn if the arguments differ by more than 12.5% */ 3740 static void warn_if_differ_considerably(struct drbd_device *device, 3741 const char *s, sector_t a, sector_t b) 3742 { 3743 sector_t d; 3744 if (a == 0 || b == 0) 3745 return; 3746 d = (a > b) ? (a - b) : (b - a); 3747 if (d > (a>>3) || d > (b>>3)) 3748 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s, 3749 (unsigned long long)a, (unsigned long long)b); 3750 } 3751 3752 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi) 3753 { 3754 struct drbd_peer_device *peer_device; 3755 struct drbd_device *device; 3756 struct p_sizes *p = pi->data; 3757 enum determine_dev_size dd = DS_UNCHANGED; 3758 sector_t p_size, p_usize, p_csize, my_usize; 3759 int ldsc = 0; /* local disk size changed */ 3760 enum dds_flags ddsf; 3761 3762 peer_device = conn_peer_device(connection, pi->vnr); 3763 if (!peer_device) 3764 return config_unknown_volume(connection, pi); 3765 device = peer_device->device; 3766 3767 p_size = be64_to_cpu(p->d_size); 3768 p_usize = be64_to_cpu(p->u_size); 3769 p_csize = be64_to_cpu(p->c_size); 3770 3771 /* just store the peer's disk size for now. 3772 * we still need to figure out whether we accept that. */ 3773 device->p_size = p_size; 3774 3775 if (get_ldev(device)) { 3776 rcu_read_lock(); 3777 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size; 3778 rcu_read_unlock(); 3779 3780 warn_if_differ_considerably(device, "lower level device sizes", 3781 p_size, drbd_get_max_capacity(device->ldev)); 3782 warn_if_differ_considerably(device, "user requested size", 3783 p_usize, my_usize); 3784 3785 /* if this is the first connect, or an otherwise expected 3786 * param exchange, choose the minimum */ 3787 if (device->state.conn == C_WF_REPORT_PARAMS) 3788 p_usize = min_not_zero(my_usize, p_usize); 3789 3790 /* Never shrink a device with usable data during connect. 3791 But allow online shrinking if we are connected. */ 3792 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) < 3793 drbd_get_capacity(device->this_bdev) && 3794 device->state.disk >= D_OUTDATED && 3795 device->state.conn < C_CONNECTED) { 3796 drbd_err(device, "The peer's disk size is too small!\n"); 3797 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 3798 put_ldev(device); 3799 return -EIO; 3800 } 3801 3802 if (my_usize != p_usize) { 3803 struct disk_conf *old_disk_conf, *new_disk_conf = NULL; 3804 3805 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL); 3806 if (!new_disk_conf) { 3807 drbd_err(device, "Allocation of new disk_conf failed\n"); 3808 put_ldev(device); 3809 return -ENOMEM; 3810 } 3811 3812 mutex_lock(&connection->resource->conf_update); 3813 old_disk_conf = device->ldev->disk_conf; 3814 *new_disk_conf = *old_disk_conf; 3815 new_disk_conf->disk_size = p_usize; 3816 3817 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf); 3818 mutex_unlock(&connection->resource->conf_update); 3819 synchronize_rcu(); 3820 kfree(old_disk_conf); 3821 3822 drbd_info(device, "Peer sets u_size to %lu sectors\n", 3823 (unsigned long)my_usize); 3824 } 3825 3826 put_ldev(device); 3827 } 3828 3829 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size); 3830 /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size(). 3831 In case we cleared the QUEUE_FLAG_DISCARD from our queue in 3832 drbd_reconsider_max_bio_size(), we can be sure that after 3833 drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */ 3834 3835 ddsf = be16_to_cpu(p->dds_flags); 3836 if (get_ldev(device)) { 3837 drbd_reconsider_max_bio_size(device, device->ldev); 3838 dd = drbd_determine_dev_size(device, ddsf, NULL); 3839 put_ldev(device); 3840 if (dd == DS_ERROR) 3841 return -EIO; 3842 drbd_md_sync(device); 3843 } else { 3844 /* 3845 * I am diskless, need to accept the peer's *current* size. 3846 * I must NOT accept the peers backing disk size, 3847 * it may have been larger than mine all along... 3848 * 3849 * At this point, the peer knows more about my disk, or at 3850 * least about what we last agreed upon, than myself. 3851 * So if his c_size is less than his d_size, the most likely 3852 * reason is that *my* d_size was smaller last time we checked. 3853 * 3854 * However, if he sends a zero current size, 3855 * take his (user-capped or) backing disk size anyways. 3856 */ 3857 drbd_reconsider_max_bio_size(device, NULL); 3858 drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size); 3859 } 3860 3861 if (get_ldev(device)) { 3862 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) { 3863 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev); 3864 ldsc = 1; 3865 } 3866 3867 put_ldev(device); 3868 } 3869 3870 if (device->state.conn > C_WF_REPORT_PARAMS) { 3871 if (be64_to_cpu(p->c_size) != 3872 drbd_get_capacity(device->this_bdev) || ldsc) { 3873 /* we have different sizes, probably peer 3874 * needs to know my new size... */ 3875 drbd_send_sizes(peer_device, 0, ddsf); 3876 } 3877 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) || 3878 (dd == DS_GREW && device->state.conn == C_CONNECTED)) { 3879 if (device->state.pdsk >= D_INCONSISTENT && 3880 device->state.disk >= D_INCONSISTENT) { 3881 if (ddsf & DDSF_NO_RESYNC) 3882 drbd_info(device, "Resync of new storage suppressed with --assume-clean\n"); 3883 else 3884 resync_after_online_grow(device); 3885 } else 3886 set_bit(RESYNC_AFTER_NEG, &device->flags); 3887 } 3888 } 3889 3890 return 0; 3891 } 3892 3893 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi) 3894 { 3895 struct drbd_peer_device *peer_device; 3896 struct drbd_device *device; 3897 struct p_uuids *p = pi->data; 3898 u64 *p_uuid; 3899 int i, updated_uuids = 0; 3900 3901 peer_device = conn_peer_device(connection, pi->vnr); 3902 if (!peer_device) 3903 return config_unknown_volume(connection, pi); 3904 device = peer_device->device; 3905 3906 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO); 3907 if (!p_uuid) { 3908 drbd_err(device, "kmalloc of p_uuid failed\n"); 3909 return false; 3910 } 3911 3912 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++) 3913 p_uuid[i] = be64_to_cpu(p->uuid[i]); 3914 3915 kfree(device->p_uuid); 3916 device->p_uuid = p_uuid; 3917 3918 if (device->state.conn < C_CONNECTED && 3919 device->state.disk < D_INCONSISTENT && 3920 device->state.role == R_PRIMARY && 3921 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) { 3922 drbd_err(device, "Can only connect to data with current UUID=%016llX\n", 3923 (unsigned long long)device->ed_uuid); 3924 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 3925 return -EIO; 3926 } 3927 3928 if (get_ldev(device)) { 3929 int skip_initial_sync = 3930 device->state.conn == C_CONNECTED && 3931 peer_device->connection->agreed_pro_version >= 90 && 3932 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED && 3933 (p_uuid[UI_FLAGS] & 8); 3934 if (skip_initial_sync) { 3935 drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n"); 3936 drbd_bitmap_io(device, &drbd_bmio_clear_n_write, 3937 "clear_n_write from receive_uuids", 3938 BM_LOCKED_TEST_ALLOWED); 3939 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]); 3940 _drbd_uuid_set(device, UI_BITMAP, 0); 3941 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE), 3942 CS_VERBOSE, NULL); 3943 drbd_md_sync(device); 3944 updated_uuids = 1; 3945 } 3946 put_ldev(device); 3947 } else if (device->state.disk < D_INCONSISTENT && 3948 device->state.role == R_PRIMARY) { 3949 /* I am a diskless primary, the peer just created a new current UUID 3950 for me. */ 3951 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]); 3952 } 3953 3954 /* Before we test for the disk state, we should wait until an eventually 3955 ongoing cluster wide state change is finished. That is important if 3956 we are primary and are detaching from our disk. We need to see the 3957 new disk state... */ 3958 mutex_lock(device->state_mutex); 3959 mutex_unlock(device->state_mutex); 3960 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT) 3961 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]); 3962 3963 if (updated_uuids) 3964 drbd_print_uuids(device, "receiver updated UUIDs to"); 3965 3966 return 0; 3967 } 3968 3969 /** 3970 * convert_state() - Converts the peer's view of the cluster state to our point of view 3971 * @ps: The state as seen by the peer. 3972 */ 3973 static union drbd_state convert_state(union drbd_state ps) 3974 { 3975 union drbd_state ms; 3976 3977 static enum drbd_conns c_tab[] = { 3978 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS, 3979 [C_CONNECTED] = C_CONNECTED, 3980 3981 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T, 3982 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S, 3983 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */ 3984 [C_VERIFY_S] = C_VERIFY_T, 3985 [C_MASK] = C_MASK, 3986 }; 3987 3988 ms.i = ps.i; 3989 3990 ms.conn = c_tab[ps.conn]; 3991 ms.peer = ps.role; 3992 ms.role = ps.peer; 3993 ms.pdsk = ps.disk; 3994 ms.disk = ps.pdsk; 3995 ms.peer_isp = (ps.aftr_isp | ps.user_isp); 3996 3997 return ms; 3998 } 3999 4000 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi) 4001 { 4002 struct drbd_peer_device *peer_device; 4003 struct drbd_device *device; 4004 struct p_req_state *p = pi->data; 4005 union drbd_state mask, val; 4006 enum drbd_state_rv rv; 4007 4008 peer_device = conn_peer_device(connection, pi->vnr); 4009 if (!peer_device) 4010 return -EIO; 4011 device = peer_device->device; 4012 4013 mask.i = be32_to_cpu(p->mask); 4014 val.i = be32_to_cpu(p->val); 4015 4016 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) && 4017 mutex_is_locked(device->state_mutex)) { 4018 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG); 4019 return 0; 4020 } 4021 4022 mask = convert_state(mask); 4023 val = convert_state(val); 4024 4025 rv = drbd_change_state(device, CS_VERBOSE, mask, val); 4026 drbd_send_sr_reply(peer_device, rv); 4027 4028 drbd_md_sync(device); 4029 4030 return 0; 4031 } 4032 4033 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi) 4034 { 4035 struct p_req_state *p = pi->data; 4036 union drbd_state mask, val; 4037 enum drbd_state_rv rv; 4038 4039 mask.i = be32_to_cpu(p->mask); 4040 val.i = be32_to_cpu(p->val); 4041 4042 if (test_bit(RESOLVE_CONFLICTS, &connection->flags) && 4043 mutex_is_locked(&connection->cstate_mutex)) { 4044 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG); 4045 return 0; 4046 } 4047 4048 mask = convert_state(mask); 4049 val = convert_state(val); 4050 4051 rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL); 4052 conn_send_sr_reply(connection, rv); 4053 4054 return 0; 4055 } 4056 4057 static int receive_state(struct drbd_connection *connection, struct packet_info *pi) 4058 { 4059 struct drbd_peer_device *peer_device; 4060 struct drbd_device *device; 4061 struct p_state *p = pi->data; 4062 union drbd_state os, ns, peer_state; 4063 enum drbd_disk_state real_peer_disk; 4064 enum chg_state_flags cs_flags; 4065 int rv; 4066 4067 peer_device = conn_peer_device(connection, pi->vnr); 4068 if (!peer_device) 4069 return config_unknown_volume(connection, pi); 4070 device = peer_device->device; 4071 4072 peer_state.i = be32_to_cpu(p->state); 4073 4074 real_peer_disk = peer_state.disk; 4075 if (peer_state.disk == D_NEGOTIATING) { 4076 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT; 4077 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk)); 4078 } 4079 4080 spin_lock_irq(&device->resource->req_lock); 4081 retry: 4082 os = ns = drbd_read_state(device); 4083 spin_unlock_irq(&device->resource->req_lock); 4084 4085 /* If some other part of the code (ack_receiver thread, timeout) 4086 * already decided to close the connection again, 4087 * we must not "re-establish" it here. */ 4088 if (os.conn <= C_TEAR_DOWN) 4089 return -ECONNRESET; 4090 4091 /* If this is the "end of sync" confirmation, usually the peer disk 4092 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits 4093 * set) resync started in PausedSyncT, or if the timing of pause-/ 4094 * unpause-sync events has been "just right", the peer disk may 4095 * transition from D_CONSISTENT to D_UP_TO_DATE as well. 4096 */ 4097 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) && 4098 real_peer_disk == D_UP_TO_DATE && 4099 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) { 4100 /* If we are (becoming) SyncSource, but peer is still in sync 4101 * preparation, ignore its uptodate-ness to avoid flapping, it 4102 * will change to inconsistent once the peer reaches active 4103 * syncing states. 4104 * It may have changed syncer-paused flags, however, so we 4105 * cannot ignore this completely. */ 4106 if (peer_state.conn > C_CONNECTED && 4107 peer_state.conn < C_SYNC_SOURCE) 4108 real_peer_disk = D_INCONSISTENT; 4109 4110 /* if peer_state changes to connected at the same time, 4111 * it explicitly notifies us that it finished resync. 4112 * Maybe we should finish it up, too? */ 4113 else if (os.conn >= C_SYNC_SOURCE && 4114 peer_state.conn == C_CONNECTED) { 4115 if (drbd_bm_total_weight(device) <= device->rs_failed) 4116 drbd_resync_finished(device); 4117 return 0; 4118 } 4119 } 4120 4121 /* explicit verify finished notification, stop sector reached. */ 4122 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE && 4123 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) { 4124 ov_out_of_sync_print(device); 4125 drbd_resync_finished(device); 4126 return 0; 4127 } 4128 4129 /* peer says his disk is inconsistent, while we think it is uptodate, 4130 * and this happens while the peer still thinks we have a sync going on, 4131 * but we think we are already done with the sync. 4132 * We ignore this to avoid flapping pdsk. 4133 * This should not happen, if the peer is a recent version of drbd. */ 4134 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT && 4135 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE) 4136 real_peer_disk = D_UP_TO_DATE; 4137 4138 if (ns.conn == C_WF_REPORT_PARAMS) 4139 ns.conn = C_CONNECTED; 4140 4141 if (peer_state.conn == C_AHEAD) 4142 ns.conn = C_BEHIND; 4143 4144 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING && 4145 get_ldev_if_state(device, D_NEGOTIATING)) { 4146 int cr; /* consider resync */ 4147 4148 /* if we established a new connection */ 4149 cr = (os.conn < C_CONNECTED); 4150 /* if we had an established connection 4151 * and one of the nodes newly attaches a disk */ 4152 cr |= (os.conn == C_CONNECTED && 4153 (peer_state.disk == D_NEGOTIATING || 4154 os.disk == D_NEGOTIATING)); 4155 /* if we have both been inconsistent, and the peer has been 4156 * forced to be UpToDate with --overwrite-data */ 4157 cr |= test_bit(CONSIDER_RESYNC, &device->flags); 4158 /* if we had been plain connected, and the admin requested to 4159 * start a sync by "invalidate" or "invalidate-remote" */ 4160 cr |= (os.conn == C_CONNECTED && 4161 (peer_state.conn >= C_STARTING_SYNC_S && 4162 peer_state.conn <= C_WF_BITMAP_T)); 4163 4164 if (cr) 4165 ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk); 4166 4167 put_ldev(device); 4168 if (ns.conn == C_MASK) { 4169 ns.conn = C_CONNECTED; 4170 if (device->state.disk == D_NEGOTIATING) { 4171 drbd_force_state(device, NS(disk, D_FAILED)); 4172 } else if (peer_state.disk == D_NEGOTIATING) { 4173 drbd_err(device, "Disk attach process on the peer node was aborted.\n"); 4174 peer_state.disk = D_DISKLESS; 4175 real_peer_disk = D_DISKLESS; 4176 } else { 4177 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags)) 4178 return -EIO; 4179 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS); 4180 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 4181 return -EIO; 4182 } 4183 } 4184 } 4185 4186 spin_lock_irq(&device->resource->req_lock); 4187 if (os.i != drbd_read_state(device).i) 4188 goto retry; 4189 clear_bit(CONSIDER_RESYNC, &device->flags); 4190 ns.peer = peer_state.role; 4191 ns.pdsk = real_peer_disk; 4192 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp); 4193 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING) 4194 ns.disk = device->new_state_tmp.disk; 4195 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD); 4196 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED && 4197 test_bit(NEW_CUR_UUID, &device->flags)) { 4198 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this 4199 for temporal network outages! */ 4200 spin_unlock_irq(&device->resource->req_lock); 4201 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n"); 4202 tl_clear(peer_device->connection); 4203 drbd_uuid_new_current(device); 4204 clear_bit(NEW_CUR_UUID, &device->flags); 4205 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD); 4206 return -EIO; 4207 } 4208 rv = _drbd_set_state(device, ns, cs_flags, NULL); 4209 ns = drbd_read_state(device); 4210 spin_unlock_irq(&device->resource->req_lock); 4211 4212 if (rv < SS_SUCCESS) { 4213 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 4214 return -EIO; 4215 } 4216 4217 if (os.conn > C_WF_REPORT_PARAMS) { 4218 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED && 4219 peer_state.disk != D_NEGOTIATING ) { 4220 /* we want resync, peer has not yet decided to sync... */ 4221 /* Nowadays only used when forcing a node into primary role and 4222 setting its disk to UpToDate with that */ 4223 drbd_send_uuids(peer_device); 4224 drbd_send_current_state(peer_device); 4225 } 4226 } 4227 4228 clear_bit(DISCARD_MY_DATA, &device->flags); 4229 4230 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */ 4231 4232 return 0; 4233 } 4234 4235 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi) 4236 { 4237 struct drbd_peer_device *peer_device; 4238 struct drbd_device *device; 4239 struct p_rs_uuid *p = pi->data; 4240 4241 peer_device = conn_peer_device(connection, pi->vnr); 4242 if (!peer_device) 4243 return -EIO; 4244 device = peer_device->device; 4245 4246 wait_event(device->misc_wait, 4247 device->state.conn == C_WF_SYNC_UUID || 4248 device->state.conn == C_BEHIND || 4249 device->state.conn < C_CONNECTED || 4250 device->state.disk < D_NEGOTIATING); 4251 4252 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */ 4253 4254 /* Here the _drbd_uuid_ functions are right, current should 4255 _not_ be rotated into the history */ 4256 if (get_ldev_if_state(device, D_NEGOTIATING)) { 4257 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid)); 4258 _drbd_uuid_set(device, UI_BITMAP, 0UL); 4259 4260 drbd_print_uuids(device, "updated sync uuid"); 4261 drbd_start_resync(device, C_SYNC_TARGET); 4262 4263 put_ldev(device); 4264 } else 4265 drbd_err(device, "Ignoring SyncUUID packet!\n"); 4266 4267 return 0; 4268 } 4269 4270 /** 4271 * receive_bitmap_plain 4272 * 4273 * Return 0 when done, 1 when another iteration is needed, and a negative error 4274 * code upon failure. 4275 */ 4276 static int 4277 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size, 4278 unsigned long *p, struct bm_xfer_ctx *c) 4279 { 4280 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - 4281 drbd_header_size(peer_device->connection); 4282 unsigned int num_words = min_t(size_t, data_size / sizeof(*p), 4283 c->bm_words - c->word_offset); 4284 unsigned int want = num_words * sizeof(*p); 4285 int err; 4286 4287 if (want != size) { 4288 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size); 4289 return -EIO; 4290 } 4291 if (want == 0) 4292 return 0; 4293 err = drbd_recv_all(peer_device->connection, p, want); 4294 if (err) 4295 return err; 4296 4297 drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p); 4298 4299 c->word_offset += num_words; 4300 c->bit_offset = c->word_offset * BITS_PER_LONG; 4301 if (c->bit_offset > c->bm_bits) 4302 c->bit_offset = c->bm_bits; 4303 4304 return 1; 4305 } 4306 4307 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p) 4308 { 4309 return (enum drbd_bitmap_code)(p->encoding & 0x0f); 4310 } 4311 4312 static int dcbp_get_start(struct p_compressed_bm *p) 4313 { 4314 return (p->encoding & 0x80) != 0; 4315 } 4316 4317 static int dcbp_get_pad_bits(struct p_compressed_bm *p) 4318 { 4319 return (p->encoding >> 4) & 0x7; 4320 } 4321 4322 /** 4323 * recv_bm_rle_bits 4324 * 4325 * Return 0 when done, 1 when another iteration is needed, and a negative error 4326 * code upon failure. 4327 */ 4328 static int 4329 recv_bm_rle_bits(struct drbd_peer_device *peer_device, 4330 struct p_compressed_bm *p, 4331 struct bm_xfer_ctx *c, 4332 unsigned int len) 4333 { 4334 struct bitstream bs; 4335 u64 look_ahead; 4336 u64 rl; 4337 u64 tmp; 4338 unsigned long s = c->bit_offset; 4339 unsigned long e; 4340 int toggle = dcbp_get_start(p); 4341 int have; 4342 int bits; 4343 4344 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p)); 4345 4346 bits = bitstream_get_bits(&bs, &look_ahead, 64); 4347 if (bits < 0) 4348 return -EIO; 4349 4350 for (have = bits; have > 0; s += rl, toggle = !toggle) { 4351 bits = vli_decode_bits(&rl, look_ahead); 4352 if (bits <= 0) 4353 return -EIO; 4354 4355 if (toggle) { 4356 e = s + rl -1; 4357 if (e >= c->bm_bits) { 4358 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e); 4359 return -EIO; 4360 } 4361 _drbd_bm_set_bits(peer_device->device, s, e); 4362 } 4363 4364 if (have < bits) { 4365 drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n", 4366 have, bits, look_ahead, 4367 (unsigned int)(bs.cur.b - p->code), 4368 (unsigned int)bs.buf_len); 4369 return -EIO; 4370 } 4371 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */ 4372 if (likely(bits < 64)) 4373 look_ahead >>= bits; 4374 else 4375 look_ahead = 0; 4376 have -= bits; 4377 4378 bits = bitstream_get_bits(&bs, &tmp, 64 - have); 4379 if (bits < 0) 4380 return -EIO; 4381 look_ahead |= tmp << have; 4382 have += bits; 4383 } 4384 4385 c->bit_offset = s; 4386 bm_xfer_ctx_bit_to_word_offset(c); 4387 4388 return (s != c->bm_bits); 4389 } 4390 4391 /** 4392 * decode_bitmap_c 4393 * 4394 * Return 0 when done, 1 when another iteration is needed, and a negative error 4395 * code upon failure. 4396 */ 4397 static int 4398 decode_bitmap_c(struct drbd_peer_device *peer_device, 4399 struct p_compressed_bm *p, 4400 struct bm_xfer_ctx *c, 4401 unsigned int len) 4402 { 4403 if (dcbp_get_code(p) == RLE_VLI_Bits) 4404 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p)); 4405 4406 /* other variants had been implemented for evaluation, 4407 * but have been dropped as this one turned out to be "best" 4408 * during all our tests. */ 4409 4410 drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding); 4411 conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD); 4412 return -EIO; 4413 } 4414 4415 void INFO_bm_xfer_stats(struct drbd_device *device, 4416 const char *direction, struct bm_xfer_ctx *c) 4417 { 4418 /* what would it take to transfer it "plaintext" */ 4419 unsigned int header_size = drbd_header_size(first_peer_device(device)->connection); 4420 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size; 4421 unsigned int plain = 4422 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) + 4423 c->bm_words * sizeof(unsigned long); 4424 unsigned int total = c->bytes[0] + c->bytes[1]; 4425 unsigned int r; 4426 4427 /* total can not be zero. but just in case: */ 4428 if (total == 0) 4429 return; 4430 4431 /* don't report if not compressed */ 4432 if (total >= plain) 4433 return; 4434 4435 /* total < plain. check for overflow, still */ 4436 r = (total > UINT_MAX/1000) ? (total / (plain/1000)) 4437 : (1000 * total / plain); 4438 4439 if (r > 1000) 4440 r = 1000; 4441 4442 r = 1000 - r; 4443 drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), " 4444 "total %u; compression: %u.%u%%\n", 4445 direction, 4446 c->bytes[1], c->packets[1], 4447 c->bytes[0], c->packets[0], 4448 total, r/10, r % 10); 4449 } 4450 4451 /* Since we are processing the bitfield from lower addresses to higher, 4452 it does not matter if the process it in 32 bit chunks or 64 bit 4453 chunks as long as it is little endian. (Understand it as byte stream, 4454 beginning with the lowest byte...) If we would use big endian 4455 we would need to process it from the highest address to the lowest, 4456 in order to be agnostic to the 32 vs 64 bits issue. 4457 4458 returns 0 on failure, 1 if we successfully received it. */ 4459 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi) 4460 { 4461 struct drbd_peer_device *peer_device; 4462 struct drbd_device *device; 4463 struct bm_xfer_ctx c; 4464 int err; 4465 4466 peer_device = conn_peer_device(connection, pi->vnr); 4467 if (!peer_device) 4468 return -EIO; 4469 device = peer_device->device; 4470 4471 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED); 4472 /* you are supposed to send additional out-of-sync information 4473 * if you actually set bits during this phase */ 4474 4475 c = (struct bm_xfer_ctx) { 4476 .bm_bits = drbd_bm_bits(device), 4477 .bm_words = drbd_bm_words(device), 4478 }; 4479 4480 for(;;) { 4481 if (pi->cmd == P_BITMAP) 4482 err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c); 4483 else if (pi->cmd == P_COMPRESSED_BITMAP) { 4484 /* MAYBE: sanity check that we speak proto >= 90, 4485 * and the feature is enabled! */ 4486 struct p_compressed_bm *p = pi->data; 4487 4488 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) { 4489 drbd_err(device, "ReportCBitmap packet too large\n"); 4490 err = -EIO; 4491 goto out; 4492 } 4493 if (pi->size <= sizeof(*p)) { 4494 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size); 4495 err = -EIO; 4496 goto out; 4497 } 4498 err = drbd_recv_all(peer_device->connection, p, pi->size); 4499 if (err) 4500 goto out; 4501 err = decode_bitmap_c(peer_device, p, &c, pi->size); 4502 } else { 4503 drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd); 4504 err = -EIO; 4505 goto out; 4506 } 4507 4508 c.packets[pi->cmd == P_BITMAP]++; 4509 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size; 4510 4511 if (err <= 0) { 4512 if (err < 0) 4513 goto out; 4514 break; 4515 } 4516 err = drbd_recv_header(peer_device->connection, pi); 4517 if (err) 4518 goto out; 4519 } 4520 4521 INFO_bm_xfer_stats(device, "receive", &c); 4522 4523 if (device->state.conn == C_WF_BITMAP_T) { 4524 enum drbd_state_rv rv; 4525 4526 err = drbd_send_bitmap(device); 4527 if (err) 4528 goto out; 4529 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */ 4530 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE); 4531 D_ASSERT(device, rv == SS_SUCCESS); 4532 } else if (device->state.conn != C_WF_BITMAP_S) { 4533 /* admin may have requested C_DISCONNECTING, 4534 * other threads may have noticed network errors */ 4535 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n", 4536 drbd_conn_str(device->state.conn)); 4537 } 4538 err = 0; 4539 4540 out: 4541 drbd_bm_unlock(device); 4542 if (!err && device->state.conn == C_WF_BITMAP_S) 4543 drbd_start_resync(device, C_SYNC_SOURCE); 4544 return err; 4545 } 4546 4547 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi) 4548 { 4549 drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n", 4550 pi->cmd, pi->size); 4551 4552 return ignore_remaining_packet(connection, pi); 4553 } 4554 4555 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi) 4556 { 4557 /* Make sure we've acked all the TCP data associated 4558 * with the data requests being unplugged */ 4559 drbd_tcp_quickack(connection->data.socket); 4560 4561 return 0; 4562 } 4563 4564 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi) 4565 { 4566 struct drbd_peer_device *peer_device; 4567 struct drbd_device *device; 4568 struct p_block_desc *p = pi->data; 4569 4570 peer_device = conn_peer_device(connection, pi->vnr); 4571 if (!peer_device) 4572 return -EIO; 4573 device = peer_device->device; 4574 4575 switch (device->state.conn) { 4576 case C_WF_SYNC_UUID: 4577 case C_WF_BITMAP_T: 4578 case C_BEHIND: 4579 break; 4580 default: 4581 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n", 4582 drbd_conn_str(device->state.conn)); 4583 } 4584 4585 drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize)); 4586 4587 return 0; 4588 } 4589 4590 struct data_cmd { 4591 int expect_payload; 4592 size_t pkt_size; 4593 int (*fn)(struct drbd_connection *, struct packet_info *); 4594 }; 4595 4596 static struct data_cmd drbd_cmd_handler[] = { 4597 [P_DATA] = { 1, sizeof(struct p_data), receive_Data }, 4598 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply }, 4599 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } , 4600 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } , 4601 [P_BITMAP] = { 1, 0, receive_bitmap } , 4602 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } , 4603 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote }, 4604 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4605 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4606 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam }, 4607 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam }, 4608 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol }, 4609 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids }, 4610 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes }, 4611 [P_STATE] = { 0, sizeof(struct p_state), receive_state }, 4612 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state }, 4613 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid }, 4614 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4615 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest }, 4616 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest }, 4617 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip }, 4618 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync }, 4619 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state }, 4620 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol }, 4621 [P_TRIM] = { 0, sizeof(struct p_trim), receive_Data }, 4622 }; 4623 4624 static void drbdd(struct drbd_connection *connection) 4625 { 4626 struct packet_info pi; 4627 size_t shs; /* sub header size */ 4628 int err; 4629 4630 while (get_t_state(&connection->receiver) == RUNNING) { 4631 struct data_cmd *cmd; 4632 4633 drbd_thread_current_set_cpu(&connection->receiver); 4634 update_receiver_timing_details(connection, drbd_recv_header); 4635 if (drbd_recv_header(connection, &pi)) 4636 goto err_out; 4637 4638 cmd = &drbd_cmd_handler[pi.cmd]; 4639 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) { 4640 drbd_err(connection, "Unexpected data packet %s (0x%04x)", 4641 cmdname(pi.cmd), pi.cmd); 4642 goto err_out; 4643 } 4644 4645 shs = cmd->pkt_size; 4646 if (pi.size > shs && !cmd->expect_payload) { 4647 drbd_err(connection, "No payload expected %s l:%d\n", 4648 cmdname(pi.cmd), pi.size); 4649 goto err_out; 4650 } 4651 4652 if (shs) { 4653 update_receiver_timing_details(connection, drbd_recv_all_warn); 4654 err = drbd_recv_all_warn(connection, pi.data, shs); 4655 if (err) 4656 goto err_out; 4657 pi.size -= shs; 4658 } 4659 4660 update_receiver_timing_details(connection, cmd->fn); 4661 err = cmd->fn(connection, &pi); 4662 if (err) { 4663 drbd_err(connection, "error receiving %s, e: %d l: %d!\n", 4664 cmdname(pi.cmd), err, pi.size); 4665 goto err_out; 4666 } 4667 } 4668 return; 4669 4670 err_out: 4671 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD); 4672 } 4673 4674 static void conn_disconnect(struct drbd_connection *connection) 4675 { 4676 struct drbd_peer_device *peer_device; 4677 enum drbd_conns oc; 4678 int vnr; 4679 4680 if (connection->cstate == C_STANDALONE) 4681 return; 4682 4683 /* We are about to start the cleanup after connection loss. 4684 * Make sure drbd_make_request knows about that. 4685 * Usually we should be in some network failure state already, 4686 * but just in case we are not, we fix it up here. 4687 */ 4688 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 4689 4690 /* ack_receiver does not clean up anything. it must not interfere, either */ 4691 drbd_thread_stop(&connection->ack_receiver); 4692 if (connection->ack_sender) { 4693 destroy_workqueue(connection->ack_sender); 4694 connection->ack_sender = NULL; 4695 } 4696 drbd_free_sock(connection); 4697 4698 rcu_read_lock(); 4699 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 4700 struct drbd_device *device = peer_device->device; 4701 kref_get(&device->kref); 4702 rcu_read_unlock(); 4703 drbd_disconnected(peer_device); 4704 kref_put(&device->kref, drbd_destroy_device); 4705 rcu_read_lock(); 4706 } 4707 rcu_read_unlock(); 4708 4709 if (!list_empty(&connection->current_epoch->list)) 4710 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n"); 4711 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */ 4712 atomic_set(&connection->current_epoch->epoch_size, 0); 4713 connection->send.seen_any_write_yet = false; 4714 4715 drbd_info(connection, "Connection closed\n"); 4716 4717 if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN) 4718 conn_try_outdate_peer_async(connection); 4719 4720 spin_lock_irq(&connection->resource->req_lock); 4721 oc = connection->cstate; 4722 if (oc >= C_UNCONNECTED) 4723 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE); 4724 4725 spin_unlock_irq(&connection->resource->req_lock); 4726 4727 if (oc == C_DISCONNECTING) 4728 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD); 4729 } 4730 4731 static int drbd_disconnected(struct drbd_peer_device *peer_device) 4732 { 4733 struct drbd_device *device = peer_device->device; 4734 unsigned int i; 4735 4736 /* wait for current activity to cease. */ 4737 spin_lock_irq(&device->resource->req_lock); 4738 _drbd_wait_ee_list_empty(device, &device->active_ee); 4739 _drbd_wait_ee_list_empty(device, &device->sync_ee); 4740 _drbd_wait_ee_list_empty(device, &device->read_ee); 4741 spin_unlock_irq(&device->resource->req_lock); 4742 4743 /* We do not have data structures that would allow us to 4744 * get the rs_pending_cnt down to 0 again. 4745 * * On C_SYNC_TARGET we do not have any data structures describing 4746 * the pending RSDataRequest's we have sent. 4747 * * On C_SYNC_SOURCE there is no data structure that tracks 4748 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget. 4749 * And no, it is not the sum of the reference counts in the 4750 * resync_LRU. The resync_LRU tracks the whole operation including 4751 * the disk-IO, while the rs_pending_cnt only tracks the blocks 4752 * on the fly. */ 4753 drbd_rs_cancel_all(device); 4754 device->rs_total = 0; 4755 device->rs_failed = 0; 4756 atomic_set(&device->rs_pending_cnt, 0); 4757 wake_up(&device->misc_wait); 4758 4759 del_timer_sync(&device->resync_timer); 4760 resync_timer_fn((unsigned long)device); 4761 4762 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier, 4763 * w_make_resync_request etc. which may still be on the worker queue 4764 * to be "canceled" */ 4765 drbd_flush_workqueue(&peer_device->connection->sender_work); 4766 4767 drbd_finish_peer_reqs(device); 4768 4769 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs() 4770 might have issued a work again. The one before drbd_finish_peer_reqs() is 4771 necessary to reclain net_ee in drbd_finish_peer_reqs(). */ 4772 drbd_flush_workqueue(&peer_device->connection->sender_work); 4773 4774 /* need to do it again, drbd_finish_peer_reqs() may have populated it 4775 * again via drbd_try_clear_on_disk_bm(). */ 4776 drbd_rs_cancel_all(device); 4777 4778 kfree(device->p_uuid); 4779 device->p_uuid = NULL; 4780 4781 if (!drbd_suspended(device)) 4782 tl_clear(peer_device->connection); 4783 4784 drbd_md_sync(device); 4785 4786 /* serialize with bitmap writeout triggered by the state change, 4787 * if any. */ 4788 wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags)); 4789 4790 /* tcp_close and release of sendpage pages can be deferred. I don't 4791 * want to use SO_LINGER, because apparently it can be deferred for 4792 * more than 20 seconds (longest time I checked). 4793 * 4794 * Actually we don't care for exactly when the network stack does its 4795 * put_page(), but release our reference on these pages right here. 4796 */ 4797 i = drbd_free_peer_reqs(device, &device->net_ee); 4798 if (i) 4799 drbd_info(device, "net_ee not empty, killed %u entries\n", i); 4800 i = atomic_read(&device->pp_in_use_by_net); 4801 if (i) 4802 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i); 4803 i = atomic_read(&device->pp_in_use); 4804 if (i) 4805 drbd_info(device, "pp_in_use = %d, expected 0\n", i); 4806 4807 D_ASSERT(device, list_empty(&device->read_ee)); 4808 D_ASSERT(device, list_empty(&device->active_ee)); 4809 D_ASSERT(device, list_empty(&device->sync_ee)); 4810 D_ASSERT(device, list_empty(&device->done_ee)); 4811 4812 return 0; 4813 } 4814 4815 /* 4816 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version 4817 * we can agree on is stored in agreed_pro_version. 4818 * 4819 * feature flags and the reserved array should be enough room for future 4820 * enhancements of the handshake protocol, and possible plugins... 4821 * 4822 * for now, they are expected to be zero, but ignored. 4823 */ 4824 static int drbd_send_features(struct drbd_connection *connection) 4825 { 4826 struct drbd_socket *sock; 4827 struct p_connection_features *p; 4828 4829 sock = &connection->data; 4830 p = conn_prepare_command(connection, sock); 4831 if (!p) 4832 return -EIO; 4833 memset(p, 0, sizeof(*p)); 4834 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN); 4835 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX); 4836 p->feature_flags = cpu_to_be32(PRO_FEATURES); 4837 return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0); 4838 } 4839 4840 /* 4841 * return values: 4842 * 1 yes, we have a valid connection 4843 * 0 oops, did not work out, please try again 4844 * -1 peer talks different language, 4845 * no point in trying again, please go standalone. 4846 */ 4847 static int drbd_do_features(struct drbd_connection *connection) 4848 { 4849 /* ASSERT current == connection->receiver ... */ 4850 struct p_connection_features *p; 4851 const int expect = sizeof(struct p_connection_features); 4852 struct packet_info pi; 4853 int err; 4854 4855 err = drbd_send_features(connection); 4856 if (err) 4857 return 0; 4858 4859 err = drbd_recv_header(connection, &pi); 4860 if (err) 4861 return 0; 4862 4863 if (pi.cmd != P_CONNECTION_FEATURES) { 4864 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n", 4865 cmdname(pi.cmd), pi.cmd); 4866 return -1; 4867 } 4868 4869 if (pi.size != expect) { 4870 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n", 4871 expect, pi.size); 4872 return -1; 4873 } 4874 4875 p = pi.data; 4876 err = drbd_recv_all_warn(connection, p, expect); 4877 if (err) 4878 return 0; 4879 4880 p->protocol_min = be32_to_cpu(p->protocol_min); 4881 p->protocol_max = be32_to_cpu(p->protocol_max); 4882 if (p->protocol_max == 0) 4883 p->protocol_max = p->protocol_min; 4884 4885 if (PRO_VERSION_MAX < p->protocol_min || 4886 PRO_VERSION_MIN > p->protocol_max) 4887 goto incompat; 4888 4889 connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max); 4890 connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags); 4891 4892 drbd_info(connection, "Handshake successful: " 4893 "Agreed network protocol version %d\n", connection->agreed_pro_version); 4894 4895 drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n", 4896 connection->agreed_features & FF_TRIM ? " " : " not "); 4897 4898 return 1; 4899 4900 incompat: 4901 drbd_err(connection, "incompatible DRBD dialects: " 4902 "I support %d-%d, peer supports %d-%d\n", 4903 PRO_VERSION_MIN, PRO_VERSION_MAX, 4904 p->protocol_min, p->protocol_max); 4905 return -1; 4906 } 4907 4908 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE) 4909 static int drbd_do_auth(struct drbd_connection *connection) 4910 { 4911 drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n"); 4912 drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n"); 4913 return -1; 4914 } 4915 #else 4916 #define CHALLENGE_LEN 64 4917 4918 /* Return value: 4919 1 - auth succeeded, 4920 0 - failed, try again (network error), 4921 -1 - auth failed, don't try again. 4922 */ 4923 4924 static int drbd_do_auth(struct drbd_connection *connection) 4925 { 4926 struct drbd_socket *sock; 4927 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */ 4928 struct scatterlist sg; 4929 char *response = NULL; 4930 char *right_response = NULL; 4931 char *peers_ch = NULL; 4932 unsigned int key_len; 4933 char secret[SHARED_SECRET_MAX]; /* 64 byte */ 4934 unsigned int resp_size; 4935 struct hash_desc desc; 4936 struct packet_info pi; 4937 struct net_conf *nc; 4938 int err, rv; 4939 4940 /* FIXME: Put the challenge/response into the preallocated socket buffer. */ 4941 4942 rcu_read_lock(); 4943 nc = rcu_dereference(connection->net_conf); 4944 key_len = strlen(nc->shared_secret); 4945 memcpy(secret, nc->shared_secret, key_len); 4946 rcu_read_unlock(); 4947 4948 desc.tfm = connection->cram_hmac_tfm; 4949 desc.flags = 0; 4950 4951 rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len); 4952 if (rv) { 4953 drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv); 4954 rv = -1; 4955 goto fail; 4956 } 4957 4958 get_random_bytes(my_challenge, CHALLENGE_LEN); 4959 4960 sock = &connection->data; 4961 if (!conn_prepare_command(connection, sock)) { 4962 rv = 0; 4963 goto fail; 4964 } 4965 rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0, 4966 my_challenge, CHALLENGE_LEN); 4967 if (!rv) 4968 goto fail; 4969 4970 err = drbd_recv_header(connection, &pi); 4971 if (err) { 4972 rv = 0; 4973 goto fail; 4974 } 4975 4976 if (pi.cmd != P_AUTH_CHALLENGE) { 4977 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n", 4978 cmdname(pi.cmd), pi.cmd); 4979 rv = 0; 4980 goto fail; 4981 } 4982 4983 if (pi.size > CHALLENGE_LEN * 2) { 4984 drbd_err(connection, "expected AuthChallenge payload too big.\n"); 4985 rv = -1; 4986 goto fail; 4987 } 4988 4989 if (pi.size < CHALLENGE_LEN) { 4990 drbd_err(connection, "AuthChallenge payload too small.\n"); 4991 rv = -1; 4992 goto fail; 4993 } 4994 4995 peers_ch = kmalloc(pi.size, GFP_NOIO); 4996 if (peers_ch == NULL) { 4997 drbd_err(connection, "kmalloc of peers_ch failed\n"); 4998 rv = -1; 4999 goto fail; 5000 } 5001 5002 err = drbd_recv_all_warn(connection, peers_ch, pi.size); 5003 if (err) { 5004 rv = 0; 5005 goto fail; 5006 } 5007 5008 if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) { 5009 drbd_err(connection, "Peer presented the same challenge!\n"); 5010 rv = -1; 5011 goto fail; 5012 } 5013 5014 resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm); 5015 response = kmalloc(resp_size, GFP_NOIO); 5016 if (response == NULL) { 5017 drbd_err(connection, "kmalloc of response failed\n"); 5018 rv = -1; 5019 goto fail; 5020 } 5021 5022 sg_init_table(&sg, 1); 5023 sg_set_buf(&sg, peers_ch, pi.size); 5024 5025 rv = crypto_hash_digest(&desc, &sg, sg.length, response); 5026 if (rv) { 5027 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv); 5028 rv = -1; 5029 goto fail; 5030 } 5031 5032 if (!conn_prepare_command(connection, sock)) { 5033 rv = 0; 5034 goto fail; 5035 } 5036 rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0, 5037 response, resp_size); 5038 if (!rv) 5039 goto fail; 5040 5041 err = drbd_recv_header(connection, &pi); 5042 if (err) { 5043 rv = 0; 5044 goto fail; 5045 } 5046 5047 if (pi.cmd != P_AUTH_RESPONSE) { 5048 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n", 5049 cmdname(pi.cmd), pi.cmd); 5050 rv = 0; 5051 goto fail; 5052 } 5053 5054 if (pi.size != resp_size) { 5055 drbd_err(connection, "expected AuthResponse payload of wrong size\n"); 5056 rv = 0; 5057 goto fail; 5058 } 5059 5060 err = drbd_recv_all_warn(connection, response , resp_size); 5061 if (err) { 5062 rv = 0; 5063 goto fail; 5064 } 5065 5066 right_response = kmalloc(resp_size, GFP_NOIO); 5067 if (right_response == NULL) { 5068 drbd_err(connection, "kmalloc of right_response failed\n"); 5069 rv = -1; 5070 goto fail; 5071 } 5072 5073 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN); 5074 5075 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response); 5076 if (rv) { 5077 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv); 5078 rv = -1; 5079 goto fail; 5080 } 5081 5082 rv = !memcmp(response, right_response, resp_size); 5083 5084 if (rv) 5085 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n", 5086 resp_size); 5087 else 5088 rv = -1; 5089 5090 fail: 5091 kfree(peers_ch); 5092 kfree(response); 5093 kfree(right_response); 5094 5095 return rv; 5096 } 5097 #endif 5098 5099 int drbd_receiver(struct drbd_thread *thi) 5100 { 5101 struct drbd_connection *connection = thi->connection; 5102 int h; 5103 5104 drbd_info(connection, "receiver (re)started\n"); 5105 5106 do { 5107 h = conn_connect(connection); 5108 if (h == 0) { 5109 conn_disconnect(connection); 5110 schedule_timeout_interruptible(HZ); 5111 } 5112 if (h == -1) { 5113 drbd_warn(connection, "Discarding network configuration.\n"); 5114 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 5115 } 5116 } while (h == 0); 5117 5118 if (h > 0) 5119 drbdd(connection); 5120 5121 conn_disconnect(connection); 5122 5123 drbd_info(connection, "receiver terminated\n"); 5124 return 0; 5125 } 5126 5127 /* ********* acknowledge sender ******** */ 5128 5129 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi) 5130 { 5131 struct p_req_state_reply *p = pi->data; 5132 int retcode = be32_to_cpu(p->retcode); 5133 5134 if (retcode >= SS_SUCCESS) { 5135 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags); 5136 } else { 5137 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags); 5138 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n", 5139 drbd_set_st_err_str(retcode), retcode); 5140 } 5141 wake_up(&connection->ping_wait); 5142 5143 return 0; 5144 } 5145 5146 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi) 5147 { 5148 struct drbd_peer_device *peer_device; 5149 struct drbd_device *device; 5150 struct p_req_state_reply *p = pi->data; 5151 int retcode = be32_to_cpu(p->retcode); 5152 5153 peer_device = conn_peer_device(connection, pi->vnr); 5154 if (!peer_device) 5155 return -EIO; 5156 device = peer_device->device; 5157 5158 if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) { 5159 D_ASSERT(device, connection->agreed_pro_version < 100); 5160 return got_conn_RqSReply(connection, pi); 5161 } 5162 5163 if (retcode >= SS_SUCCESS) { 5164 set_bit(CL_ST_CHG_SUCCESS, &device->flags); 5165 } else { 5166 set_bit(CL_ST_CHG_FAIL, &device->flags); 5167 drbd_err(device, "Requested state change failed by peer: %s (%d)\n", 5168 drbd_set_st_err_str(retcode), retcode); 5169 } 5170 wake_up(&device->state_wait); 5171 5172 return 0; 5173 } 5174 5175 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi) 5176 { 5177 return drbd_send_ping_ack(connection); 5178 5179 } 5180 5181 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi) 5182 { 5183 /* restore idle timeout */ 5184 connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ; 5185 if (!test_and_set_bit(GOT_PING_ACK, &connection->flags)) 5186 wake_up(&connection->ping_wait); 5187 5188 return 0; 5189 } 5190 5191 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi) 5192 { 5193 struct drbd_peer_device *peer_device; 5194 struct drbd_device *device; 5195 struct p_block_ack *p = pi->data; 5196 sector_t sector = be64_to_cpu(p->sector); 5197 int blksize = be32_to_cpu(p->blksize); 5198 5199 peer_device = conn_peer_device(connection, pi->vnr); 5200 if (!peer_device) 5201 return -EIO; 5202 device = peer_device->device; 5203 5204 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89); 5205 5206 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5207 5208 if (get_ldev(device)) { 5209 drbd_rs_complete_io(device, sector); 5210 drbd_set_in_sync(device, sector, blksize); 5211 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */ 5212 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT); 5213 put_ldev(device); 5214 } 5215 dec_rs_pending(device); 5216 atomic_add(blksize >> 9, &device->rs_sect_in); 5217 5218 return 0; 5219 } 5220 5221 static int 5222 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector, 5223 struct rb_root *root, const char *func, 5224 enum drbd_req_event what, bool missing_ok) 5225 { 5226 struct drbd_request *req; 5227 struct bio_and_error m; 5228 5229 spin_lock_irq(&device->resource->req_lock); 5230 req = find_request(device, root, id, sector, missing_ok, func); 5231 if (unlikely(!req)) { 5232 spin_unlock_irq(&device->resource->req_lock); 5233 return -EIO; 5234 } 5235 __req_mod(req, what, &m); 5236 spin_unlock_irq(&device->resource->req_lock); 5237 5238 if (m.bio) 5239 complete_master_bio(device, &m); 5240 return 0; 5241 } 5242 5243 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi) 5244 { 5245 struct drbd_peer_device *peer_device; 5246 struct drbd_device *device; 5247 struct p_block_ack *p = pi->data; 5248 sector_t sector = be64_to_cpu(p->sector); 5249 int blksize = be32_to_cpu(p->blksize); 5250 enum drbd_req_event what; 5251 5252 peer_device = conn_peer_device(connection, pi->vnr); 5253 if (!peer_device) 5254 return -EIO; 5255 device = peer_device->device; 5256 5257 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5258 5259 if (p->block_id == ID_SYNCER) { 5260 drbd_set_in_sync(device, sector, blksize); 5261 dec_rs_pending(device); 5262 return 0; 5263 } 5264 switch (pi->cmd) { 5265 case P_RS_WRITE_ACK: 5266 what = WRITE_ACKED_BY_PEER_AND_SIS; 5267 break; 5268 case P_WRITE_ACK: 5269 what = WRITE_ACKED_BY_PEER; 5270 break; 5271 case P_RECV_ACK: 5272 what = RECV_ACKED_BY_PEER; 5273 break; 5274 case P_SUPERSEDED: 5275 what = CONFLICT_RESOLVED; 5276 break; 5277 case P_RETRY_WRITE: 5278 what = POSTPONE_WRITE; 5279 break; 5280 default: 5281 BUG(); 5282 } 5283 5284 return validate_req_change_req_state(device, p->block_id, sector, 5285 &device->write_requests, __func__, 5286 what, false); 5287 } 5288 5289 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi) 5290 { 5291 struct drbd_peer_device *peer_device; 5292 struct drbd_device *device; 5293 struct p_block_ack *p = pi->data; 5294 sector_t sector = be64_to_cpu(p->sector); 5295 int size = be32_to_cpu(p->blksize); 5296 int err; 5297 5298 peer_device = conn_peer_device(connection, pi->vnr); 5299 if (!peer_device) 5300 return -EIO; 5301 device = peer_device->device; 5302 5303 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5304 5305 if (p->block_id == ID_SYNCER) { 5306 dec_rs_pending(device); 5307 drbd_rs_failed_io(device, sector, size); 5308 return 0; 5309 } 5310 5311 err = validate_req_change_req_state(device, p->block_id, sector, 5312 &device->write_requests, __func__, 5313 NEG_ACKED, true); 5314 if (err) { 5315 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs. 5316 The master bio might already be completed, therefore the 5317 request is no longer in the collision hash. */ 5318 /* In Protocol B we might already have got a P_RECV_ACK 5319 but then get a P_NEG_ACK afterwards. */ 5320 drbd_set_out_of_sync(device, sector, size); 5321 } 5322 return 0; 5323 } 5324 5325 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi) 5326 { 5327 struct drbd_peer_device *peer_device; 5328 struct drbd_device *device; 5329 struct p_block_ack *p = pi->data; 5330 sector_t sector = be64_to_cpu(p->sector); 5331 5332 peer_device = conn_peer_device(connection, pi->vnr); 5333 if (!peer_device) 5334 return -EIO; 5335 device = peer_device->device; 5336 5337 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5338 5339 drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n", 5340 (unsigned long long)sector, be32_to_cpu(p->blksize)); 5341 5342 return validate_req_change_req_state(device, p->block_id, sector, 5343 &device->read_requests, __func__, 5344 NEG_ACKED, false); 5345 } 5346 5347 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi) 5348 { 5349 struct drbd_peer_device *peer_device; 5350 struct drbd_device *device; 5351 sector_t sector; 5352 int size; 5353 struct p_block_ack *p = pi->data; 5354 5355 peer_device = conn_peer_device(connection, pi->vnr); 5356 if (!peer_device) 5357 return -EIO; 5358 device = peer_device->device; 5359 5360 sector = be64_to_cpu(p->sector); 5361 size = be32_to_cpu(p->blksize); 5362 5363 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5364 5365 dec_rs_pending(device); 5366 5367 if (get_ldev_if_state(device, D_FAILED)) { 5368 drbd_rs_complete_io(device, sector); 5369 switch (pi->cmd) { 5370 case P_NEG_RS_DREPLY: 5371 drbd_rs_failed_io(device, sector, size); 5372 case P_RS_CANCEL: 5373 break; 5374 default: 5375 BUG(); 5376 } 5377 put_ldev(device); 5378 } 5379 5380 return 0; 5381 } 5382 5383 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi) 5384 { 5385 struct p_barrier_ack *p = pi->data; 5386 struct drbd_peer_device *peer_device; 5387 int vnr; 5388 5389 tl_release(connection, p->barrier, be32_to_cpu(p->set_size)); 5390 5391 rcu_read_lock(); 5392 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 5393 struct drbd_device *device = peer_device->device; 5394 5395 if (device->state.conn == C_AHEAD && 5396 atomic_read(&device->ap_in_flight) == 0 && 5397 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) { 5398 device->start_resync_timer.expires = jiffies + HZ; 5399 add_timer(&device->start_resync_timer); 5400 } 5401 } 5402 rcu_read_unlock(); 5403 5404 return 0; 5405 } 5406 5407 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi) 5408 { 5409 struct drbd_peer_device *peer_device; 5410 struct drbd_device *device; 5411 struct p_block_ack *p = pi->data; 5412 struct drbd_device_work *dw; 5413 sector_t sector; 5414 int size; 5415 5416 peer_device = conn_peer_device(connection, pi->vnr); 5417 if (!peer_device) 5418 return -EIO; 5419 device = peer_device->device; 5420 5421 sector = be64_to_cpu(p->sector); 5422 size = be32_to_cpu(p->blksize); 5423 5424 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5425 5426 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC) 5427 drbd_ov_out_of_sync_found(device, sector, size); 5428 else 5429 ov_out_of_sync_print(device); 5430 5431 if (!get_ldev(device)) 5432 return 0; 5433 5434 drbd_rs_complete_io(device, sector); 5435 dec_rs_pending(device); 5436 5437 --device->ov_left; 5438 5439 /* let's advance progress step marks only for every other megabyte */ 5440 if ((device->ov_left & 0x200) == 0x200) 5441 drbd_advance_rs_marks(device, device->ov_left); 5442 5443 if (device->ov_left == 0) { 5444 dw = kmalloc(sizeof(*dw), GFP_NOIO); 5445 if (dw) { 5446 dw->w.cb = w_ov_finished; 5447 dw->device = device; 5448 drbd_queue_work(&peer_device->connection->sender_work, &dw->w); 5449 } else { 5450 drbd_err(device, "kmalloc(dw) failed."); 5451 ov_out_of_sync_print(device); 5452 drbd_resync_finished(device); 5453 } 5454 } 5455 put_ldev(device); 5456 return 0; 5457 } 5458 5459 static int got_skip(struct drbd_connection *connection, struct packet_info *pi) 5460 { 5461 return 0; 5462 } 5463 5464 struct meta_sock_cmd { 5465 size_t pkt_size; 5466 int (*fn)(struct drbd_connection *connection, struct packet_info *); 5467 }; 5468 5469 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout) 5470 { 5471 long t; 5472 struct net_conf *nc; 5473 5474 rcu_read_lock(); 5475 nc = rcu_dereference(connection->net_conf); 5476 t = ping_timeout ? nc->ping_timeo : nc->ping_int; 5477 rcu_read_unlock(); 5478 5479 t *= HZ; 5480 if (ping_timeout) 5481 t /= 10; 5482 5483 connection->meta.socket->sk->sk_rcvtimeo = t; 5484 } 5485 5486 static void set_ping_timeout(struct drbd_connection *connection) 5487 { 5488 set_rcvtimeo(connection, 1); 5489 } 5490 5491 static void set_idle_timeout(struct drbd_connection *connection) 5492 { 5493 set_rcvtimeo(connection, 0); 5494 } 5495 5496 static struct meta_sock_cmd ack_receiver_tbl[] = { 5497 [P_PING] = { 0, got_Ping }, 5498 [P_PING_ACK] = { 0, got_PingAck }, 5499 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5500 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5501 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5502 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck }, 5503 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck }, 5504 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply }, 5505 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply }, 5506 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult }, 5507 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck }, 5508 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply }, 5509 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync }, 5510 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip }, 5511 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply }, 5512 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply }, 5513 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck }, 5514 }; 5515 5516 int drbd_ack_receiver(struct drbd_thread *thi) 5517 { 5518 struct drbd_connection *connection = thi->connection; 5519 struct meta_sock_cmd *cmd = NULL; 5520 struct packet_info pi; 5521 unsigned long pre_recv_jif; 5522 int rv; 5523 void *buf = connection->meta.rbuf; 5524 int received = 0; 5525 unsigned int header_size = drbd_header_size(connection); 5526 int expect = header_size; 5527 bool ping_timeout_active = false; 5528 struct sched_param param = { .sched_priority = 2 }; 5529 5530 rv = sched_setscheduler(current, SCHED_RR, ¶m); 5531 if (rv < 0) 5532 drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv); 5533 5534 while (get_t_state(thi) == RUNNING) { 5535 drbd_thread_current_set_cpu(thi); 5536 5537 conn_reclaim_net_peer_reqs(connection); 5538 5539 if (test_and_clear_bit(SEND_PING, &connection->flags)) { 5540 if (drbd_send_ping(connection)) { 5541 drbd_err(connection, "drbd_send_ping has failed\n"); 5542 goto reconnect; 5543 } 5544 set_ping_timeout(connection); 5545 ping_timeout_active = true; 5546 } 5547 5548 pre_recv_jif = jiffies; 5549 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0); 5550 5551 /* Note: 5552 * -EINTR (on meta) we got a signal 5553 * -EAGAIN (on meta) rcvtimeo expired 5554 * -ECONNRESET other side closed the connection 5555 * -ERESTARTSYS (on data) we got a signal 5556 * rv < 0 other than above: unexpected error! 5557 * rv == expected: full header or command 5558 * rv < expected: "woken" by signal during receive 5559 * rv == 0 : "connection shut down by peer" 5560 */ 5561 if (likely(rv > 0)) { 5562 received += rv; 5563 buf += rv; 5564 } else if (rv == 0) { 5565 if (test_bit(DISCONNECT_SENT, &connection->flags)) { 5566 long t; 5567 rcu_read_lock(); 5568 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10; 5569 rcu_read_unlock(); 5570 5571 t = wait_event_timeout(connection->ping_wait, 5572 connection->cstate < C_WF_REPORT_PARAMS, 5573 t); 5574 if (t) 5575 break; 5576 } 5577 drbd_err(connection, "meta connection shut down by peer.\n"); 5578 goto reconnect; 5579 } else if (rv == -EAGAIN) { 5580 /* If the data socket received something meanwhile, 5581 * that is good enough: peer is still alive. */ 5582 if (time_after(connection->last_received, pre_recv_jif)) 5583 continue; 5584 if (ping_timeout_active) { 5585 drbd_err(connection, "PingAck did not arrive in time.\n"); 5586 goto reconnect; 5587 } 5588 set_bit(SEND_PING, &connection->flags); 5589 continue; 5590 } else if (rv == -EINTR) { 5591 /* maybe drbd_thread_stop(): the while condition will notice. 5592 * maybe woken for send_ping: we'll send a ping above, 5593 * and change the rcvtimeo */ 5594 flush_signals(current); 5595 continue; 5596 } else { 5597 drbd_err(connection, "sock_recvmsg returned %d\n", rv); 5598 goto reconnect; 5599 } 5600 5601 if (received == expect && cmd == NULL) { 5602 if (decode_header(connection, connection->meta.rbuf, &pi)) 5603 goto reconnect; 5604 cmd = &ack_receiver_tbl[pi.cmd]; 5605 if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) { 5606 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n", 5607 cmdname(pi.cmd), pi.cmd); 5608 goto disconnect; 5609 } 5610 expect = header_size + cmd->pkt_size; 5611 if (pi.size != expect - header_size) { 5612 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n", 5613 pi.cmd, pi.size); 5614 goto reconnect; 5615 } 5616 } 5617 if (received == expect) { 5618 bool err; 5619 5620 err = cmd->fn(connection, &pi); 5621 if (err) { 5622 drbd_err(connection, "%pf failed\n", cmd->fn); 5623 goto reconnect; 5624 } 5625 5626 connection->last_received = jiffies; 5627 5628 if (cmd == &ack_receiver_tbl[P_PING_ACK]) { 5629 set_idle_timeout(connection); 5630 ping_timeout_active = false; 5631 } 5632 5633 buf = connection->meta.rbuf; 5634 received = 0; 5635 expect = header_size; 5636 cmd = NULL; 5637 } 5638 } 5639 5640 if (0) { 5641 reconnect: 5642 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 5643 conn_md_sync(connection); 5644 } 5645 if (0) { 5646 disconnect: 5647 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 5648 } 5649 5650 drbd_info(connection, "ack_receiver terminated\n"); 5651 5652 return 0; 5653 } 5654 5655 void drbd_send_acks_wf(struct work_struct *ws) 5656 { 5657 struct drbd_peer_device *peer_device = 5658 container_of(ws, struct drbd_peer_device, send_acks_work); 5659 struct drbd_connection *connection = peer_device->connection; 5660 struct drbd_device *device = peer_device->device; 5661 struct net_conf *nc; 5662 int tcp_cork, err; 5663 5664 rcu_read_lock(); 5665 nc = rcu_dereference(connection->net_conf); 5666 tcp_cork = nc->tcp_cork; 5667 rcu_read_unlock(); 5668 5669 if (tcp_cork) 5670 drbd_tcp_cork(connection->meta.socket); 5671 5672 err = drbd_finish_peer_reqs(device); 5673 kref_put(&device->kref, drbd_destroy_device); 5674 /* get is in drbd_endio_write_sec_final(). That is necessary to keep the 5675 struct work_struct send_acks_work alive, which is in the peer_device object */ 5676 5677 if (err) { 5678 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 5679 return; 5680 } 5681 5682 if (tcp_cork) 5683 drbd_tcp_uncork(connection->meta.socket); 5684 5685 return; 5686 } 5687