1 /* 2 drbd_req.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 24 */ 25 26 #include <linux/module.h> 27 28 #include <linux/slab.h> 29 #include <linux/drbd.h> 30 #include "drbd_int.h" 31 #include "drbd_req.h" 32 33 34 static bool drbd_may_do_local_read(struct drbd_device *device, sector_t sector, int size); 35 36 /* Update disk stats at start of I/O request */ 37 static void _drbd_start_io_acct(struct drbd_device *device, struct drbd_request *req) 38 { 39 generic_start_io_acct(bio_data_dir(req->master_bio), req->i.size >> 9, 40 &device->vdisk->part0); 41 } 42 43 /* Update disk stats when completing request upwards */ 44 static void _drbd_end_io_acct(struct drbd_device *device, struct drbd_request *req) 45 { 46 generic_end_io_acct(bio_data_dir(req->master_bio), 47 &device->vdisk->part0, req->start_jif); 48 } 49 50 static struct drbd_request *drbd_req_new(struct drbd_device *device, struct bio *bio_src) 51 { 52 struct drbd_request *req; 53 54 req = mempool_alloc(drbd_request_mempool, GFP_NOIO); 55 if (!req) 56 return NULL; 57 memset(req, 0, sizeof(*req)); 58 59 drbd_req_make_private_bio(req, bio_src); 60 req->rq_state = (bio_data_dir(bio_src) == WRITE ? RQ_WRITE : 0) 61 | (bio_op(bio_src) == REQ_OP_WRITE_SAME ? RQ_WSAME : 0) 62 | (bio_op(bio_src) == REQ_OP_WRITE_ZEROES ? RQ_UNMAP : 0) 63 | (bio_op(bio_src) == REQ_OP_DISCARD ? RQ_UNMAP : 0); 64 req->device = device; 65 req->master_bio = bio_src; 66 req->epoch = 0; 67 68 drbd_clear_interval(&req->i); 69 req->i.sector = bio_src->bi_iter.bi_sector; 70 req->i.size = bio_src->bi_iter.bi_size; 71 req->i.local = true; 72 req->i.waiting = false; 73 74 INIT_LIST_HEAD(&req->tl_requests); 75 INIT_LIST_HEAD(&req->w.list); 76 INIT_LIST_HEAD(&req->req_pending_master_completion); 77 INIT_LIST_HEAD(&req->req_pending_local); 78 79 /* one reference to be put by __drbd_make_request */ 80 atomic_set(&req->completion_ref, 1); 81 /* one kref as long as completion_ref > 0 */ 82 kref_init(&req->kref); 83 return req; 84 } 85 86 static void drbd_remove_request_interval(struct rb_root *root, 87 struct drbd_request *req) 88 { 89 struct drbd_device *device = req->device; 90 struct drbd_interval *i = &req->i; 91 92 drbd_remove_interval(root, i); 93 94 /* Wake up any processes waiting for this request to complete. */ 95 if (i->waiting) 96 wake_up(&device->misc_wait); 97 } 98 99 void drbd_req_destroy(struct kref *kref) 100 { 101 struct drbd_request *req = container_of(kref, struct drbd_request, kref); 102 struct drbd_device *device = req->device; 103 const unsigned s = req->rq_state; 104 105 if ((req->master_bio && !(s & RQ_POSTPONED)) || 106 atomic_read(&req->completion_ref) || 107 (s & RQ_LOCAL_PENDING) || 108 ((s & RQ_NET_MASK) && !(s & RQ_NET_DONE))) { 109 drbd_err(device, "drbd_req_destroy: Logic BUG rq_state = 0x%x, completion_ref = %d\n", 110 s, atomic_read(&req->completion_ref)); 111 return; 112 } 113 114 /* If called from mod_rq_state (expected normal case) or 115 * drbd_send_and_submit (the less likely normal path), this holds the 116 * req_lock, and req->tl_requests will typicaly be on ->transfer_log, 117 * though it may be still empty (never added to the transfer log). 118 * 119 * If called from do_retry(), we do NOT hold the req_lock, but we are 120 * still allowed to unconditionally list_del(&req->tl_requests), 121 * because it will be on a local on-stack list only. */ 122 list_del_init(&req->tl_requests); 123 124 /* finally remove the request from the conflict detection 125 * respective block_id verification interval tree. */ 126 if (!drbd_interval_empty(&req->i)) { 127 struct rb_root *root; 128 129 if (s & RQ_WRITE) 130 root = &device->write_requests; 131 else 132 root = &device->read_requests; 133 drbd_remove_request_interval(root, req); 134 } else if (s & (RQ_NET_MASK & ~RQ_NET_DONE) && req->i.size != 0) 135 drbd_err(device, "drbd_req_destroy: Logic BUG: interval empty, but: rq_state=0x%x, sect=%llu, size=%u\n", 136 s, (unsigned long long)req->i.sector, req->i.size); 137 138 /* if it was a write, we may have to set the corresponding 139 * bit(s) out-of-sync first. If it had a local part, we need to 140 * release the reference to the activity log. */ 141 if (s & RQ_WRITE) { 142 /* Set out-of-sync unless both OK flags are set 143 * (local only or remote failed). 144 * Other places where we set out-of-sync: 145 * READ with local io-error */ 146 147 /* There is a special case: 148 * we may notice late that IO was suspended, 149 * and postpone, or schedule for retry, a write, 150 * before it even was submitted or sent. 151 * In that case we do not want to touch the bitmap at all. 152 */ 153 if ((s & (RQ_POSTPONED|RQ_LOCAL_MASK|RQ_NET_MASK)) != RQ_POSTPONED) { 154 if (!(s & RQ_NET_OK) || !(s & RQ_LOCAL_OK)) 155 drbd_set_out_of_sync(device, req->i.sector, req->i.size); 156 157 if ((s & RQ_NET_OK) && (s & RQ_LOCAL_OK) && (s & RQ_NET_SIS)) 158 drbd_set_in_sync(device, req->i.sector, req->i.size); 159 } 160 161 /* one might be tempted to move the drbd_al_complete_io 162 * to the local io completion callback drbd_request_endio. 163 * but, if this was a mirror write, we may only 164 * drbd_al_complete_io after this is RQ_NET_DONE, 165 * otherwise the extent could be dropped from the al 166 * before it has actually been written on the peer. 167 * if we crash before our peer knows about the request, 168 * but after the extent has been dropped from the al, 169 * we would forget to resync the corresponding extent. 170 */ 171 if (s & RQ_IN_ACT_LOG) { 172 if (get_ldev_if_state(device, D_FAILED)) { 173 drbd_al_complete_io(device, &req->i); 174 put_ldev(device); 175 } else if (__ratelimit(&drbd_ratelimit_state)) { 176 drbd_warn(device, "Should have called drbd_al_complete_io(, %llu, %u), " 177 "but my Disk seems to have failed :(\n", 178 (unsigned long long) req->i.sector, req->i.size); 179 } 180 } 181 } 182 183 mempool_free(req, drbd_request_mempool); 184 } 185 186 static void wake_all_senders(struct drbd_connection *connection) 187 { 188 wake_up(&connection->sender_work.q_wait); 189 } 190 191 /* must hold resource->req_lock */ 192 void start_new_tl_epoch(struct drbd_connection *connection) 193 { 194 /* no point closing an epoch, if it is empty, anyways. */ 195 if (connection->current_tle_writes == 0) 196 return; 197 198 connection->current_tle_writes = 0; 199 atomic_inc(&connection->current_tle_nr); 200 wake_all_senders(connection); 201 } 202 203 void complete_master_bio(struct drbd_device *device, 204 struct bio_and_error *m) 205 { 206 m->bio->bi_error = m->error; 207 bio_endio(m->bio); 208 dec_ap_bio(device); 209 } 210 211 212 /* Helper for __req_mod(). 213 * Set m->bio to the master bio, if it is fit to be completed, 214 * or leave it alone (it is initialized to NULL in __req_mod), 215 * if it has already been completed, or cannot be completed yet. 216 * If m->bio is set, the error status to be returned is placed in m->error. 217 */ 218 static 219 void drbd_req_complete(struct drbd_request *req, struct bio_and_error *m) 220 { 221 const unsigned s = req->rq_state; 222 struct drbd_device *device = req->device; 223 int error, ok; 224 225 /* we must not complete the master bio, while it is 226 * still being processed by _drbd_send_zc_bio (drbd_send_dblock) 227 * not yet acknowledged by the peer 228 * not yet completed by the local io subsystem 229 * these flags may get cleared in any order by 230 * the worker, 231 * the receiver, 232 * the bio_endio completion callbacks. 233 */ 234 if ((s & RQ_LOCAL_PENDING && !(s & RQ_LOCAL_ABORTED)) || 235 (s & RQ_NET_QUEUED) || (s & RQ_NET_PENDING) || 236 (s & RQ_COMPLETION_SUSP)) { 237 drbd_err(device, "drbd_req_complete: Logic BUG rq_state = 0x%x\n", s); 238 return; 239 } 240 241 if (!req->master_bio) { 242 drbd_err(device, "drbd_req_complete: Logic BUG, master_bio == NULL!\n"); 243 return; 244 } 245 246 /* 247 * figure out whether to report success or failure. 248 * 249 * report success when at least one of the operations succeeded. 250 * or, to put the other way, 251 * only report failure, when both operations failed. 252 * 253 * what to do about the failures is handled elsewhere. 254 * what we need to do here is just: complete the master_bio. 255 * 256 * local completion error, if any, has been stored as ERR_PTR 257 * in private_bio within drbd_request_endio. 258 */ 259 ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK); 260 error = PTR_ERR(req->private_bio); 261 262 /* Before we can signal completion to the upper layers, 263 * we may need to close the current transfer log epoch. 264 * We are within the request lock, so we can simply compare 265 * the request epoch number with the current transfer log 266 * epoch number. If they match, increase the current_tle_nr, 267 * and reset the transfer log epoch write_cnt. 268 */ 269 if (op_is_write(bio_op(req->master_bio)) && 270 req->epoch == atomic_read(&first_peer_device(device)->connection->current_tle_nr)) 271 start_new_tl_epoch(first_peer_device(device)->connection); 272 273 /* Update disk stats */ 274 _drbd_end_io_acct(device, req); 275 276 /* If READ failed, 277 * have it be pushed back to the retry work queue, 278 * so it will re-enter __drbd_make_request(), 279 * and be re-assigned to a suitable local or remote path, 280 * or failed if we do not have access to good data anymore. 281 * 282 * Unless it was failed early by __drbd_make_request(), 283 * because no path was available, in which case 284 * it was not even added to the transfer_log. 285 * 286 * read-ahead may fail, and will not be retried. 287 * 288 * WRITE should have used all available paths already. 289 */ 290 if (!ok && 291 bio_op(req->master_bio) == REQ_OP_READ && 292 !(req->master_bio->bi_opf & REQ_RAHEAD) && 293 !list_empty(&req->tl_requests)) 294 req->rq_state |= RQ_POSTPONED; 295 296 if (!(req->rq_state & RQ_POSTPONED)) { 297 m->error = ok ? 0 : (error ?: -EIO); 298 m->bio = req->master_bio; 299 req->master_bio = NULL; 300 /* We leave it in the tree, to be able to verify later 301 * write-acks in protocol != C during resync. 302 * But we mark it as "complete", so it won't be counted as 303 * conflict in a multi-primary setup. */ 304 req->i.completed = true; 305 } 306 307 if (req->i.waiting) 308 wake_up(&device->misc_wait); 309 310 /* Either we are about to complete to upper layers, 311 * or we will restart this request. 312 * In either case, the request object will be destroyed soon, 313 * so better remove it from all lists. */ 314 list_del_init(&req->req_pending_master_completion); 315 } 316 317 /* still holds resource->req_lock */ 318 static void drbd_req_put_completion_ref(struct drbd_request *req, struct bio_and_error *m, int put) 319 { 320 struct drbd_device *device = req->device; 321 D_ASSERT(device, m || (req->rq_state & RQ_POSTPONED)); 322 323 if (!put) 324 return; 325 326 if (!atomic_sub_and_test(put, &req->completion_ref)) 327 return; 328 329 drbd_req_complete(req, m); 330 331 /* local completion may still come in later, 332 * we need to keep the req object around. */ 333 if (req->rq_state & RQ_LOCAL_ABORTED) 334 return; 335 336 if (req->rq_state & RQ_POSTPONED) { 337 /* don't destroy the req object just yet, 338 * but queue it for retry */ 339 drbd_restart_request(req); 340 return; 341 } 342 343 kref_put(&req->kref, drbd_req_destroy); 344 } 345 346 static void set_if_null_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req) 347 { 348 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; 349 if (!connection) 350 return; 351 if (connection->req_next == NULL) 352 connection->req_next = req; 353 } 354 355 static void advance_conn_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req) 356 { 357 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; 358 if (!connection) 359 return; 360 if (connection->req_next != req) 361 return; 362 list_for_each_entry_continue(req, &connection->transfer_log, tl_requests) { 363 const unsigned s = req->rq_state; 364 if (s & RQ_NET_QUEUED) 365 break; 366 } 367 if (&req->tl_requests == &connection->transfer_log) 368 req = NULL; 369 connection->req_next = req; 370 } 371 372 static void set_if_null_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req) 373 { 374 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; 375 if (!connection) 376 return; 377 if (connection->req_ack_pending == NULL) 378 connection->req_ack_pending = req; 379 } 380 381 static void advance_conn_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req) 382 { 383 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; 384 if (!connection) 385 return; 386 if (connection->req_ack_pending != req) 387 return; 388 list_for_each_entry_continue(req, &connection->transfer_log, tl_requests) { 389 const unsigned s = req->rq_state; 390 if ((s & RQ_NET_SENT) && (s & RQ_NET_PENDING)) 391 break; 392 } 393 if (&req->tl_requests == &connection->transfer_log) 394 req = NULL; 395 connection->req_ack_pending = req; 396 } 397 398 static void set_if_null_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req) 399 { 400 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; 401 if (!connection) 402 return; 403 if (connection->req_not_net_done == NULL) 404 connection->req_not_net_done = req; 405 } 406 407 static void advance_conn_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req) 408 { 409 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; 410 if (!connection) 411 return; 412 if (connection->req_not_net_done != req) 413 return; 414 list_for_each_entry_continue(req, &connection->transfer_log, tl_requests) { 415 const unsigned s = req->rq_state; 416 if ((s & RQ_NET_SENT) && !(s & RQ_NET_DONE)) 417 break; 418 } 419 if (&req->tl_requests == &connection->transfer_log) 420 req = NULL; 421 connection->req_not_net_done = req; 422 } 423 424 /* I'd like this to be the only place that manipulates 425 * req->completion_ref and req->kref. */ 426 static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m, 427 int clear, int set) 428 { 429 struct drbd_device *device = req->device; 430 struct drbd_peer_device *peer_device = first_peer_device(device); 431 unsigned s = req->rq_state; 432 int c_put = 0; 433 434 if (drbd_suspended(device) && !((s | clear) & RQ_COMPLETION_SUSP)) 435 set |= RQ_COMPLETION_SUSP; 436 437 /* apply */ 438 439 req->rq_state &= ~clear; 440 req->rq_state |= set; 441 442 /* no change? */ 443 if (req->rq_state == s) 444 return; 445 446 /* intent: get references */ 447 448 kref_get(&req->kref); 449 450 if (!(s & RQ_LOCAL_PENDING) && (set & RQ_LOCAL_PENDING)) 451 atomic_inc(&req->completion_ref); 452 453 if (!(s & RQ_NET_PENDING) && (set & RQ_NET_PENDING)) { 454 inc_ap_pending(device); 455 atomic_inc(&req->completion_ref); 456 } 457 458 if (!(s & RQ_NET_QUEUED) && (set & RQ_NET_QUEUED)) { 459 atomic_inc(&req->completion_ref); 460 set_if_null_req_next(peer_device, req); 461 } 462 463 if (!(s & RQ_EXP_BARR_ACK) && (set & RQ_EXP_BARR_ACK)) 464 kref_get(&req->kref); /* wait for the DONE */ 465 466 if (!(s & RQ_NET_SENT) && (set & RQ_NET_SENT)) { 467 /* potentially already completed in the ack_receiver thread */ 468 if (!(s & RQ_NET_DONE)) { 469 atomic_add(req->i.size >> 9, &device->ap_in_flight); 470 set_if_null_req_not_net_done(peer_device, req); 471 } 472 if (req->rq_state & RQ_NET_PENDING) 473 set_if_null_req_ack_pending(peer_device, req); 474 } 475 476 if (!(s & RQ_COMPLETION_SUSP) && (set & RQ_COMPLETION_SUSP)) 477 atomic_inc(&req->completion_ref); 478 479 /* progress: put references */ 480 481 if ((s & RQ_COMPLETION_SUSP) && (clear & RQ_COMPLETION_SUSP)) 482 ++c_put; 483 484 if (!(s & RQ_LOCAL_ABORTED) && (set & RQ_LOCAL_ABORTED)) { 485 D_ASSERT(device, req->rq_state & RQ_LOCAL_PENDING); 486 ++c_put; 487 } 488 489 if ((s & RQ_LOCAL_PENDING) && (clear & RQ_LOCAL_PENDING)) { 490 if (req->rq_state & RQ_LOCAL_ABORTED) 491 kref_put(&req->kref, drbd_req_destroy); 492 else 493 ++c_put; 494 list_del_init(&req->req_pending_local); 495 } 496 497 if ((s & RQ_NET_PENDING) && (clear & RQ_NET_PENDING)) { 498 dec_ap_pending(device); 499 ++c_put; 500 req->acked_jif = jiffies; 501 advance_conn_req_ack_pending(peer_device, req); 502 } 503 504 if ((s & RQ_NET_QUEUED) && (clear & RQ_NET_QUEUED)) { 505 ++c_put; 506 advance_conn_req_next(peer_device, req); 507 } 508 509 if (!(s & RQ_NET_DONE) && (set & RQ_NET_DONE)) { 510 if (s & RQ_NET_SENT) 511 atomic_sub(req->i.size >> 9, &device->ap_in_flight); 512 if (s & RQ_EXP_BARR_ACK) 513 kref_put(&req->kref, drbd_req_destroy); 514 req->net_done_jif = jiffies; 515 516 /* in ahead/behind mode, or just in case, 517 * before we finally destroy this request, 518 * the caching pointers must not reference it anymore */ 519 advance_conn_req_next(peer_device, req); 520 advance_conn_req_ack_pending(peer_device, req); 521 advance_conn_req_not_net_done(peer_device, req); 522 } 523 524 /* potentially complete and destroy */ 525 526 /* If we made progress, retry conflicting peer requests, if any. */ 527 if (req->i.waiting) 528 wake_up(&device->misc_wait); 529 530 drbd_req_put_completion_ref(req, m, c_put); 531 kref_put(&req->kref, drbd_req_destroy); 532 } 533 534 static void drbd_report_io_error(struct drbd_device *device, struct drbd_request *req) 535 { 536 char b[BDEVNAME_SIZE]; 537 538 if (!__ratelimit(&drbd_ratelimit_state)) 539 return; 540 541 drbd_warn(device, "local %s IO error sector %llu+%u on %s\n", 542 (req->rq_state & RQ_WRITE) ? "WRITE" : "READ", 543 (unsigned long long)req->i.sector, 544 req->i.size >> 9, 545 bdevname(device->ldev->backing_bdev, b)); 546 } 547 548 /* Helper for HANDED_OVER_TO_NETWORK. 549 * Is this a protocol A write (neither WRITE_ACK nor RECEIVE_ACK expected)? 550 * Is it also still "PENDING"? 551 * --> If so, clear PENDING and set NET_OK below. 552 * If it is a protocol A write, but not RQ_PENDING anymore, neg-ack was faster 553 * (and we must not set RQ_NET_OK) */ 554 static inline bool is_pending_write_protocol_A(struct drbd_request *req) 555 { 556 return (req->rq_state & 557 (RQ_WRITE|RQ_NET_PENDING|RQ_EXP_WRITE_ACK|RQ_EXP_RECEIVE_ACK)) 558 == (RQ_WRITE|RQ_NET_PENDING); 559 } 560 561 /* obviously this could be coded as many single functions 562 * instead of one huge switch, 563 * or by putting the code directly in the respective locations 564 * (as it has been before). 565 * 566 * but having it this way 567 * enforces that it is all in this one place, where it is easier to audit, 568 * it makes it obvious that whatever "event" "happens" to a request should 569 * happen "atomically" within the req_lock, 570 * and it enforces that we have to think in a very structured manner 571 * about the "events" that may happen to a request during its life time ... 572 */ 573 int __req_mod(struct drbd_request *req, enum drbd_req_event what, 574 struct bio_and_error *m) 575 { 576 struct drbd_device *const device = req->device; 577 struct drbd_peer_device *const peer_device = first_peer_device(device); 578 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL; 579 struct net_conf *nc; 580 int p, rv = 0; 581 582 if (m) 583 m->bio = NULL; 584 585 switch (what) { 586 default: 587 drbd_err(device, "LOGIC BUG in %s:%u\n", __FILE__ , __LINE__); 588 break; 589 590 /* does not happen... 591 * initialization done in drbd_req_new 592 case CREATED: 593 break; 594 */ 595 596 case TO_BE_SENT: /* via network */ 597 /* reached via __drbd_make_request 598 * and from w_read_retry_remote */ 599 D_ASSERT(device, !(req->rq_state & RQ_NET_MASK)); 600 rcu_read_lock(); 601 nc = rcu_dereference(connection->net_conf); 602 p = nc->wire_protocol; 603 rcu_read_unlock(); 604 req->rq_state |= 605 p == DRBD_PROT_C ? RQ_EXP_WRITE_ACK : 606 p == DRBD_PROT_B ? RQ_EXP_RECEIVE_ACK : 0; 607 mod_rq_state(req, m, 0, RQ_NET_PENDING); 608 break; 609 610 case TO_BE_SUBMITTED: /* locally */ 611 /* reached via __drbd_make_request */ 612 D_ASSERT(device, !(req->rq_state & RQ_LOCAL_MASK)); 613 mod_rq_state(req, m, 0, RQ_LOCAL_PENDING); 614 break; 615 616 case COMPLETED_OK: 617 if (req->rq_state & RQ_WRITE) 618 device->writ_cnt += req->i.size >> 9; 619 else 620 device->read_cnt += req->i.size >> 9; 621 622 mod_rq_state(req, m, RQ_LOCAL_PENDING, 623 RQ_LOCAL_COMPLETED|RQ_LOCAL_OK); 624 break; 625 626 case ABORT_DISK_IO: 627 mod_rq_state(req, m, 0, RQ_LOCAL_ABORTED); 628 break; 629 630 case WRITE_COMPLETED_WITH_ERROR: 631 drbd_report_io_error(device, req); 632 __drbd_chk_io_error(device, DRBD_WRITE_ERROR); 633 mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED); 634 break; 635 636 case READ_COMPLETED_WITH_ERROR: 637 drbd_set_out_of_sync(device, req->i.sector, req->i.size); 638 drbd_report_io_error(device, req); 639 __drbd_chk_io_error(device, DRBD_READ_ERROR); 640 /* fall through. */ 641 case READ_AHEAD_COMPLETED_WITH_ERROR: 642 /* it is legal to fail read-ahead, no __drbd_chk_io_error in that case. */ 643 mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED); 644 break; 645 646 case DISCARD_COMPLETED_NOTSUPP: 647 case DISCARD_COMPLETED_WITH_ERROR: 648 /* I'd rather not detach from local disk just because it 649 * failed a REQ_DISCARD. */ 650 mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED); 651 break; 652 653 case QUEUE_FOR_NET_READ: 654 /* READ, and 655 * no local disk, 656 * or target area marked as invalid, 657 * or just got an io-error. */ 658 /* from __drbd_make_request 659 * or from bio_endio during read io-error recovery */ 660 661 /* So we can verify the handle in the answer packet. 662 * Corresponding drbd_remove_request_interval is in 663 * drbd_req_complete() */ 664 D_ASSERT(device, drbd_interval_empty(&req->i)); 665 drbd_insert_interval(&device->read_requests, &req->i); 666 667 set_bit(UNPLUG_REMOTE, &device->flags); 668 669 D_ASSERT(device, req->rq_state & RQ_NET_PENDING); 670 D_ASSERT(device, (req->rq_state & RQ_LOCAL_MASK) == 0); 671 mod_rq_state(req, m, 0, RQ_NET_QUEUED); 672 req->w.cb = w_send_read_req; 673 drbd_queue_work(&connection->sender_work, 674 &req->w); 675 break; 676 677 case QUEUE_FOR_NET_WRITE: 678 /* assert something? */ 679 /* from __drbd_make_request only */ 680 681 /* Corresponding drbd_remove_request_interval is in 682 * drbd_req_complete() */ 683 D_ASSERT(device, drbd_interval_empty(&req->i)); 684 drbd_insert_interval(&device->write_requests, &req->i); 685 686 /* NOTE 687 * In case the req ended up on the transfer log before being 688 * queued on the worker, it could lead to this request being 689 * missed during cleanup after connection loss. 690 * So we have to do both operations here, 691 * within the same lock that protects the transfer log. 692 * 693 * _req_add_to_epoch(req); this has to be after the 694 * _maybe_start_new_epoch(req); which happened in 695 * __drbd_make_request, because we now may set the bit 696 * again ourselves to close the current epoch. 697 * 698 * Add req to the (now) current epoch (barrier). */ 699 700 /* otherwise we may lose an unplug, which may cause some remote 701 * io-scheduler timeout to expire, increasing maximum latency, 702 * hurting performance. */ 703 set_bit(UNPLUG_REMOTE, &device->flags); 704 705 /* queue work item to send data */ 706 D_ASSERT(device, req->rq_state & RQ_NET_PENDING); 707 mod_rq_state(req, m, 0, RQ_NET_QUEUED|RQ_EXP_BARR_ACK); 708 req->w.cb = w_send_dblock; 709 drbd_queue_work(&connection->sender_work, 710 &req->w); 711 712 /* close the epoch, in case it outgrew the limit */ 713 rcu_read_lock(); 714 nc = rcu_dereference(connection->net_conf); 715 p = nc->max_epoch_size; 716 rcu_read_unlock(); 717 if (connection->current_tle_writes >= p) 718 start_new_tl_epoch(connection); 719 720 break; 721 722 case QUEUE_FOR_SEND_OOS: 723 mod_rq_state(req, m, 0, RQ_NET_QUEUED); 724 req->w.cb = w_send_out_of_sync; 725 drbd_queue_work(&connection->sender_work, 726 &req->w); 727 break; 728 729 case READ_RETRY_REMOTE_CANCELED: 730 case SEND_CANCELED: 731 case SEND_FAILED: 732 /* real cleanup will be done from tl_clear. just update flags 733 * so it is no longer marked as on the worker queue */ 734 mod_rq_state(req, m, RQ_NET_QUEUED, 0); 735 break; 736 737 case HANDED_OVER_TO_NETWORK: 738 /* assert something? */ 739 if (is_pending_write_protocol_A(req)) 740 /* this is what is dangerous about protocol A: 741 * pretend it was successfully written on the peer. */ 742 mod_rq_state(req, m, RQ_NET_QUEUED|RQ_NET_PENDING, 743 RQ_NET_SENT|RQ_NET_OK); 744 else 745 mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_SENT); 746 /* It is still not yet RQ_NET_DONE until the 747 * corresponding epoch barrier got acked as well, 748 * so we know what to dirty on connection loss. */ 749 break; 750 751 case OOS_HANDED_TO_NETWORK: 752 /* Was not set PENDING, no longer QUEUED, so is now DONE 753 * as far as this connection is concerned. */ 754 mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_DONE); 755 break; 756 757 case CONNECTION_LOST_WHILE_PENDING: 758 /* transfer log cleanup after connection loss */ 759 mod_rq_state(req, m, 760 RQ_NET_OK|RQ_NET_PENDING|RQ_COMPLETION_SUSP, 761 RQ_NET_DONE); 762 break; 763 764 case CONFLICT_RESOLVED: 765 /* for superseded conflicting writes of multiple primaries, 766 * there is no need to keep anything in the tl, potential 767 * node crashes are covered by the activity log. 768 * 769 * If this request had been marked as RQ_POSTPONED before, 770 * it will actually not be completed, but "restarted", 771 * resubmitted from the retry worker context. */ 772 D_ASSERT(device, req->rq_state & RQ_NET_PENDING); 773 D_ASSERT(device, req->rq_state & RQ_EXP_WRITE_ACK); 774 mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_DONE|RQ_NET_OK); 775 break; 776 777 case WRITE_ACKED_BY_PEER_AND_SIS: 778 req->rq_state |= RQ_NET_SIS; 779 case WRITE_ACKED_BY_PEER: 780 /* Normal operation protocol C: successfully written on peer. 781 * During resync, even in protocol != C, 782 * we requested an explicit write ack anyways. 783 * Which means we cannot even assert anything here. 784 * Nothing more to do here. 785 * We want to keep the tl in place for all protocols, to cater 786 * for volatile write-back caches on lower level devices. */ 787 goto ack_common; 788 case RECV_ACKED_BY_PEER: 789 D_ASSERT(device, req->rq_state & RQ_EXP_RECEIVE_ACK); 790 /* protocol B; pretends to be successfully written on peer. 791 * see also notes above in HANDED_OVER_TO_NETWORK about 792 * protocol != C */ 793 ack_common: 794 mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK); 795 break; 796 797 case POSTPONE_WRITE: 798 D_ASSERT(device, req->rq_state & RQ_EXP_WRITE_ACK); 799 /* If this node has already detected the write conflict, the 800 * worker will be waiting on misc_wait. Wake it up once this 801 * request has completed locally. 802 */ 803 D_ASSERT(device, req->rq_state & RQ_NET_PENDING); 804 req->rq_state |= RQ_POSTPONED; 805 if (req->i.waiting) 806 wake_up(&device->misc_wait); 807 /* Do not clear RQ_NET_PENDING. This request will make further 808 * progress via restart_conflicting_writes() or 809 * fail_postponed_requests(). Hopefully. */ 810 break; 811 812 case NEG_ACKED: 813 mod_rq_state(req, m, RQ_NET_OK|RQ_NET_PENDING, 0); 814 break; 815 816 case FAIL_FROZEN_DISK_IO: 817 if (!(req->rq_state & RQ_LOCAL_COMPLETED)) 818 break; 819 mod_rq_state(req, m, RQ_COMPLETION_SUSP, 0); 820 break; 821 822 case RESTART_FROZEN_DISK_IO: 823 if (!(req->rq_state & RQ_LOCAL_COMPLETED)) 824 break; 825 826 mod_rq_state(req, m, 827 RQ_COMPLETION_SUSP|RQ_LOCAL_COMPLETED, 828 RQ_LOCAL_PENDING); 829 830 rv = MR_READ; 831 if (bio_data_dir(req->master_bio) == WRITE) 832 rv = MR_WRITE; 833 834 get_ldev(device); /* always succeeds in this call path */ 835 req->w.cb = w_restart_disk_io; 836 drbd_queue_work(&connection->sender_work, 837 &req->w); 838 break; 839 840 case RESEND: 841 /* Simply complete (local only) READs. */ 842 if (!(req->rq_state & RQ_WRITE) && !req->w.cb) { 843 mod_rq_state(req, m, RQ_COMPLETION_SUSP, 0); 844 break; 845 } 846 847 /* If RQ_NET_OK is already set, we got a P_WRITE_ACK or P_RECV_ACK 848 before the connection loss (B&C only); only P_BARRIER_ACK 849 (or the local completion?) was missing when we suspended. 850 Throwing them out of the TL here by pretending we got a BARRIER_ACK. 851 During connection handshake, we ensure that the peer was not rebooted. */ 852 if (!(req->rq_state & RQ_NET_OK)) { 853 /* FIXME could this possibly be a req->dw.cb == w_send_out_of_sync? 854 * in that case we must not set RQ_NET_PENDING. */ 855 856 mod_rq_state(req, m, RQ_COMPLETION_SUSP, RQ_NET_QUEUED|RQ_NET_PENDING); 857 if (req->w.cb) { 858 /* w.cb expected to be w_send_dblock, or w_send_read_req */ 859 drbd_queue_work(&connection->sender_work, 860 &req->w); 861 rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ; 862 } /* else: FIXME can this happen? */ 863 break; 864 } 865 /* else, fall through to BARRIER_ACKED */ 866 867 case BARRIER_ACKED: 868 /* barrier ack for READ requests does not make sense */ 869 if (!(req->rq_state & RQ_WRITE)) 870 break; 871 872 if (req->rq_state & RQ_NET_PENDING) { 873 /* barrier came in before all requests were acked. 874 * this is bad, because if the connection is lost now, 875 * we won't be able to clean them up... */ 876 drbd_err(device, "FIXME (BARRIER_ACKED but pending)\n"); 877 } 878 /* Allowed to complete requests, even while suspended. 879 * As this is called for all requests within a matching epoch, 880 * we need to filter, and only set RQ_NET_DONE for those that 881 * have actually been on the wire. */ 882 mod_rq_state(req, m, RQ_COMPLETION_SUSP, 883 (req->rq_state & RQ_NET_MASK) ? RQ_NET_DONE : 0); 884 break; 885 886 case DATA_RECEIVED: 887 D_ASSERT(device, req->rq_state & RQ_NET_PENDING); 888 mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK|RQ_NET_DONE); 889 break; 890 891 case QUEUE_AS_DRBD_BARRIER: 892 start_new_tl_epoch(connection); 893 mod_rq_state(req, m, 0, RQ_NET_OK|RQ_NET_DONE); 894 break; 895 }; 896 897 return rv; 898 } 899 900 /* we may do a local read if: 901 * - we are consistent (of course), 902 * - or we are generally inconsistent, 903 * BUT we are still/already IN SYNC for this area. 904 * since size may be bigger than BM_BLOCK_SIZE, 905 * we may need to check several bits. 906 */ 907 static bool drbd_may_do_local_read(struct drbd_device *device, sector_t sector, int size) 908 { 909 unsigned long sbnr, ebnr; 910 sector_t esector, nr_sectors; 911 912 if (device->state.disk == D_UP_TO_DATE) 913 return true; 914 if (device->state.disk != D_INCONSISTENT) 915 return false; 916 esector = sector + (size >> 9) - 1; 917 nr_sectors = drbd_get_capacity(device->this_bdev); 918 D_ASSERT(device, sector < nr_sectors); 919 D_ASSERT(device, esector < nr_sectors); 920 921 sbnr = BM_SECT_TO_BIT(sector); 922 ebnr = BM_SECT_TO_BIT(esector); 923 924 return drbd_bm_count_bits(device, sbnr, ebnr) == 0; 925 } 926 927 static bool remote_due_to_read_balancing(struct drbd_device *device, sector_t sector, 928 enum drbd_read_balancing rbm) 929 { 930 struct backing_dev_info *bdi; 931 int stripe_shift; 932 933 switch (rbm) { 934 case RB_CONGESTED_REMOTE: 935 bdi = device->ldev->backing_bdev->bd_disk->queue->backing_dev_info; 936 return bdi_read_congested(bdi); 937 case RB_LEAST_PENDING: 938 return atomic_read(&device->local_cnt) > 939 atomic_read(&device->ap_pending_cnt) + atomic_read(&device->rs_pending_cnt); 940 case RB_32K_STRIPING: /* stripe_shift = 15 */ 941 case RB_64K_STRIPING: 942 case RB_128K_STRIPING: 943 case RB_256K_STRIPING: 944 case RB_512K_STRIPING: 945 case RB_1M_STRIPING: /* stripe_shift = 20 */ 946 stripe_shift = (rbm - RB_32K_STRIPING + 15); 947 return (sector >> (stripe_shift - 9)) & 1; 948 case RB_ROUND_ROBIN: 949 return test_and_change_bit(READ_BALANCE_RR, &device->flags); 950 case RB_PREFER_REMOTE: 951 return true; 952 case RB_PREFER_LOCAL: 953 default: 954 return false; 955 } 956 } 957 958 /* 959 * complete_conflicting_writes - wait for any conflicting write requests 960 * 961 * The write_requests tree contains all active write requests which we 962 * currently know about. Wait for any requests to complete which conflict with 963 * the new one. 964 * 965 * Only way out: remove the conflicting intervals from the tree. 966 */ 967 static void complete_conflicting_writes(struct drbd_request *req) 968 { 969 DEFINE_WAIT(wait); 970 struct drbd_device *device = req->device; 971 struct drbd_interval *i; 972 sector_t sector = req->i.sector; 973 int size = req->i.size; 974 975 for (;;) { 976 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 977 /* Ignore, if already completed to upper layers. */ 978 if (i->completed) 979 continue; 980 /* Handle the first found overlap. After the schedule 981 * we have to restart the tree walk. */ 982 break; 983 } 984 if (!i) /* if any */ 985 break; 986 987 /* Indicate to wake up device->misc_wait on progress. */ 988 prepare_to_wait(&device->misc_wait, &wait, TASK_UNINTERRUPTIBLE); 989 i->waiting = true; 990 spin_unlock_irq(&device->resource->req_lock); 991 schedule(); 992 spin_lock_irq(&device->resource->req_lock); 993 } 994 finish_wait(&device->misc_wait, &wait); 995 } 996 997 /* called within req_lock */ 998 static void maybe_pull_ahead(struct drbd_device *device) 999 { 1000 struct drbd_connection *connection = first_peer_device(device)->connection; 1001 struct net_conf *nc; 1002 bool congested = false; 1003 enum drbd_on_congestion on_congestion; 1004 1005 rcu_read_lock(); 1006 nc = rcu_dereference(connection->net_conf); 1007 on_congestion = nc ? nc->on_congestion : OC_BLOCK; 1008 rcu_read_unlock(); 1009 if (on_congestion == OC_BLOCK || 1010 connection->agreed_pro_version < 96) 1011 return; 1012 1013 if (on_congestion == OC_PULL_AHEAD && device->state.conn == C_AHEAD) 1014 return; /* nothing to do ... */ 1015 1016 /* If I don't even have good local storage, we can not reasonably try 1017 * to pull ahead of the peer. We also need the local reference to make 1018 * sure device->act_log is there. 1019 */ 1020 if (!get_ldev_if_state(device, D_UP_TO_DATE)) 1021 return; 1022 1023 if (nc->cong_fill && 1024 atomic_read(&device->ap_in_flight) >= nc->cong_fill) { 1025 drbd_info(device, "Congestion-fill threshold reached\n"); 1026 congested = true; 1027 } 1028 1029 if (device->act_log->used >= nc->cong_extents) { 1030 drbd_info(device, "Congestion-extents threshold reached\n"); 1031 congested = true; 1032 } 1033 1034 if (congested) { 1035 /* start a new epoch for non-mirrored writes */ 1036 start_new_tl_epoch(first_peer_device(device)->connection); 1037 1038 if (on_congestion == OC_PULL_AHEAD) 1039 _drbd_set_state(_NS(device, conn, C_AHEAD), 0, NULL); 1040 else /*nc->on_congestion == OC_DISCONNECT */ 1041 _drbd_set_state(_NS(device, conn, C_DISCONNECTING), 0, NULL); 1042 } 1043 put_ldev(device); 1044 } 1045 1046 /* If this returns false, and req->private_bio is still set, 1047 * this should be submitted locally. 1048 * 1049 * If it returns false, but req->private_bio is not set, 1050 * we do not have access to good data :( 1051 * 1052 * Otherwise, this destroys req->private_bio, if any, 1053 * and returns true. 1054 */ 1055 static bool do_remote_read(struct drbd_request *req) 1056 { 1057 struct drbd_device *device = req->device; 1058 enum drbd_read_balancing rbm; 1059 1060 if (req->private_bio) { 1061 if (!drbd_may_do_local_read(device, 1062 req->i.sector, req->i.size)) { 1063 bio_put(req->private_bio); 1064 req->private_bio = NULL; 1065 put_ldev(device); 1066 } 1067 } 1068 1069 if (device->state.pdsk != D_UP_TO_DATE) 1070 return false; 1071 1072 if (req->private_bio == NULL) 1073 return true; 1074 1075 /* TODO: improve read balancing decisions, take into account drbd 1076 * protocol, pending requests etc. */ 1077 1078 rcu_read_lock(); 1079 rbm = rcu_dereference(device->ldev->disk_conf)->read_balancing; 1080 rcu_read_unlock(); 1081 1082 if (rbm == RB_PREFER_LOCAL && req->private_bio) 1083 return false; /* submit locally */ 1084 1085 if (remote_due_to_read_balancing(device, req->i.sector, rbm)) { 1086 if (req->private_bio) { 1087 bio_put(req->private_bio); 1088 req->private_bio = NULL; 1089 put_ldev(device); 1090 } 1091 return true; 1092 } 1093 1094 return false; 1095 } 1096 1097 bool drbd_should_do_remote(union drbd_dev_state s) 1098 { 1099 return s.pdsk == D_UP_TO_DATE || 1100 (s.pdsk >= D_INCONSISTENT && 1101 s.conn >= C_WF_BITMAP_T && 1102 s.conn < C_AHEAD); 1103 /* Before proto 96 that was >= CONNECTED instead of >= C_WF_BITMAP_T. 1104 That is equivalent since before 96 IO was frozen in the C_WF_BITMAP* 1105 states. */ 1106 } 1107 1108 static bool drbd_should_send_out_of_sync(union drbd_dev_state s) 1109 { 1110 return s.conn == C_AHEAD || s.conn == C_WF_BITMAP_S; 1111 /* pdsk = D_INCONSISTENT as a consequence. Protocol 96 check not necessary 1112 since we enter state C_AHEAD only if proto >= 96 */ 1113 } 1114 1115 /* returns number of connections (== 1, for drbd 8.4) 1116 * expected to actually write this data, 1117 * which does NOT include those that we are L_AHEAD for. */ 1118 static int drbd_process_write_request(struct drbd_request *req) 1119 { 1120 struct drbd_device *device = req->device; 1121 int remote, send_oos; 1122 1123 remote = drbd_should_do_remote(device->state); 1124 send_oos = drbd_should_send_out_of_sync(device->state); 1125 1126 /* Need to replicate writes. Unless it is an empty flush, 1127 * which is better mapped to a DRBD P_BARRIER packet, 1128 * also for drbd wire protocol compatibility reasons. 1129 * If this was a flush, just start a new epoch. 1130 * Unless the current epoch was empty anyways, or we are not currently 1131 * replicating, in which case there is no point. */ 1132 if (unlikely(req->i.size == 0)) { 1133 /* The only size==0 bios we expect are empty flushes. */ 1134 D_ASSERT(device, req->master_bio->bi_opf & REQ_PREFLUSH); 1135 if (remote) 1136 _req_mod(req, QUEUE_AS_DRBD_BARRIER); 1137 return remote; 1138 } 1139 1140 if (!remote && !send_oos) 1141 return 0; 1142 1143 D_ASSERT(device, !(remote && send_oos)); 1144 1145 if (remote) { 1146 _req_mod(req, TO_BE_SENT); 1147 _req_mod(req, QUEUE_FOR_NET_WRITE); 1148 } else if (drbd_set_out_of_sync(device, req->i.sector, req->i.size)) 1149 _req_mod(req, QUEUE_FOR_SEND_OOS); 1150 1151 return remote; 1152 } 1153 1154 static void drbd_process_discard_req(struct drbd_request *req) 1155 { 1156 struct block_device *bdev = req->device->ldev->backing_bdev; 1157 1158 if (blkdev_issue_zeroout(bdev, req->i.sector, req->i.size >> 9, 1159 GFP_NOIO, 0)) 1160 req->private_bio->bi_error = -EIO; 1161 bio_endio(req->private_bio); 1162 } 1163 1164 static void 1165 drbd_submit_req_private_bio(struct drbd_request *req) 1166 { 1167 struct drbd_device *device = req->device; 1168 struct bio *bio = req->private_bio; 1169 unsigned int type; 1170 1171 if (bio_op(bio) != REQ_OP_READ) 1172 type = DRBD_FAULT_DT_WR; 1173 else if (bio->bi_opf & REQ_RAHEAD) 1174 type = DRBD_FAULT_DT_RA; 1175 else 1176 type = DRBD_FAULT_DT_RD; 1177 1178 bio->bi_bdev = device->ldev->backing_bdev; 1179 1180 /* State may have changed since we grabbed our reference on the 1181 * ->ldev member. Double check, and short-circuit to endio. 1182 * In case the last activity log transaction failed to get on 1183 * stable storage, and this is a WRITE, we may not even submit 1184 * this bio. */ 1185 if (get_ldev(device)) { 1186 if (drbd_insert_fault(device, type)) 1187 bio_io_error(bio); 1188 else if (bio_op(bio) == REQ_OP_WRITE_ZEROES || 1189 bio_op(bio) == REQ_OP_DISCARD) 1190 drbd_process_discard_req(req); 1191 else 1192 generic_make_request(bio); 1193 put_ldev(device); 1194 } else 1195 bio_io_error(bio); 1196 } 1197 1198 static void drbd_queue_write(struct drbd_device *device, struct drbd_request *req) 1199 { 1200 spin_lock_irq(&device->resource->req_lock); 1201 list_add_tail(&req->tl_requests, &device->submit.writes); 1202 list_add_tail(&req->req_pending_master_completion, 1203 &device->pending_master_completion[1 /* WRITE */]); 1204 spin_unlock_irq(&device->resource->req_lock); 1205 queue_work(device->submit.wq, &device->submit.worker); 1206 /* do_submit() may sleep internally on al_wait, too */ 1207 wake_up(&device->al_wait); 1208 } 1209 1210 /* returns the new drbd_request pointer, if the caller is expected to 1211 * drbd_send_and_submit() it (to save latency), or NULL if we queued the 1212 * request on the submitter thread. 1213 * Returns ERR_PTR(-ENOMEM) if we cannot allocate a drbd_request. 1214 */ 1215 static struct drbd_request * 1216 drbd_request_prepare(struct drbd_device *device, struct bio *bio, unsigned long start_jif) 1217 { 1218 const int rw = bio_data_dir(bio); 1219 struct drbd_request *req; 1220 1221 /* allocate outside of all locks; */ 1222 req = drbd_req_new(device, bio); 1223 if (!req) { 1224 dec_ap_bio(device); 1225 /* only pass the error to the upper layers. 1226 * if user cannot handle io errors, that's not our business. */ 1227 drbd_err(device, "could not kmalloc() req\n"); 1228 bio->bi_error = -ENOMEM; 1229 bio_endio(bio); 1230 return ERR_PTR(-ENOMEM); 1231 } 1232 req->start_jif = start_jif; 1233 1234 if (!get_ldev(device)) { 1235 bio_put(req->private_bio); 1236 req->private_bio = NULL; 1237 } 1238 1239 /* Update disk stats */ 1240 _drbd_start_io_acct(device, req); 1241 1242 /* process discards always from our submitter thread */ 1243 if ((bio_op(bio) & REQ_OP_WRITE_ZEROES) || 1244 (bio_op(bio) & REQ_OP_DISCARD)) 1245 goto queue_for_submitter_thread; 1246 1247 if (rw == WRITE && req->private_bio && req->i.size 1248 && !test_bit(AL_SUSPENDED, &device->flags)) { 1249 if (!drbd_al_begin_io_fastpath(device, &req->i)) 1250 goto queue_for_submitter_thread; 1251 req->rq_state |= RQ_IN_ACT_LOG; 1252 req->in_actlog_jif = jiffies; 1253 } 1254 return req; 1255 1256 queue_for_submitter_thread: 1257 atomic_inc(&device->ap_actlog_cnt); 1258 drbd_queue_write(device, req); 1259 return NULL; 1260 } 1261 1262 /* Require at least one path to current data. 1263 * We don't want to allow writes on C_STANDALONE D_INCONSISTENT: 1264 * We would not allow to read what was written, 1265 * we would not have bumped the data generation uuids, 1266 * we would cause data divergence for all the wrong reasons. 1267 * 1268 * If we don't see at least one D_UP_TO_DATE, we will fail this request, 1269 * which either returns EIO, or, if OND_SUSPEND_IO is set, suspends IO, 1270 * and queues for retry later. 1271 */ 1272 static bool may_do_writes(struct drbd_device *device) 1273 { 1274 const union drbd_dev_state s = device->state; 1275 return s.disk == D_UP_TO_DATE || s.pdsk == D_UP_TO_DATE; 1276 } 1277 1278 static void drbd_send_and_submit(struct drbd_device *device, struct drbd_request *req) 1279 { 1280 struct drbd_resource *resource = device->resource; 1281 const int rw = bio_data_dir(req->master_bio); 1282 struct bio_and_error m = { NULL, }; 1283 bool no_remote = false; 1284 bool submit_private_bio = false; 1285 1286 spin_lock_irq(&resource->req_lock); 1287 if (rw == WRITE) { 1288 /* This may temporarily give up the req_lock, 1289 * but will re-aquire it before it returns here. 1290 * Needs to be before the check on drbd_suspended() */ 1291 complete_conflicting_writes(req); 1292 /* no more giving up req_lock from now on! */ 1293 1294 /* check for congestion, and potentially stop sending 1295 * full data updates, but start sending "dirty bits" only. */ 1296 maybe_pull_ahead(device); 1297 } 1298 1299 1300 if (drbd_suspended(device)) { 1301 /* push back and retry: */ 1302 req->rq_state |= RQ_POSTPONED; 1303 if (req->private_bio) { 1304 bio_put(req->private_bio); 1305 req->private_bio = NULL; 1306 put_ldev(device); 1307 } 1308 goto out; 1309 } 1310 1311 /* We fail READ early, if we can not serve it. 1312 * We must do this before req is registered on any lists. 1313 * Otherwise, drbd_req_complete() will queue failed READ for retry. */ 1314 if (rw != WRITE) { 1315 if (!do_remote_read(req) && !req->private_bio) 1316 goto nodata; 1317 } 1318 1319 /* which transfer log epoch does this belong to? */ 1320 req->epoch = atomic_read(&first_peer_device(device)->connection->current_tle_nr); 1321 1322 /* no point in adding empty flushes to the transfer log, 1323 * they are mapped to drbd barriers already. */ 1324 if (likely(req->i.size!=0)) { 1325 if (rw == WRITE) 1326 first_peer_device(device)->connection->current_tle_writes++; 1327 1328 list_add_tail(&req->tl_requests, &first_peer_device(device)->connection->transfer_log); 1329 } 1330 1331 if (rw == WRITE) { 1332 if (req->private_bio && !may_do_writes(device)) { 1333 bio_put(req->private_bio); 1334 req->private_bio = NULL; 1335 put_ldev(device); 1336 goto nodata; 1337 } 1338 if (!drbd_process_write_request(req)) 1339 no_remote = true; 1340 } else { 1341 /* We either have a private_bio, or we can read from remote. 1342 * Otherwise we had done the goto nodata above. */ 1343 if (req->private_bio == NULL) { 1344 _req_mod(req, TO_BE_SENT); 1345 _req_mod(req, QUEUE_FOR_NET_READ); 1346 } else 1347 no_remote = true; 1348 } 1349 1350 /* If it took the fast path in drbd_request_prepare, add it here. 1351 * The slow path has added it already. */ 1352 if (list_empty(&req->req_pending_master_completion)) 1353 list_add_tail(&req->req_pending_master_completion, 1354 &device->pending_master_completion[rw == WRITE]); 1355 if (req->private_bio) { 1356 /* needs to be marked within the same spinlock */ 1357 req->pre_submit_jif = jiffies; 1358 list_add_tail(&req->req_pending_local, 1359 &device->pending_completion[rw == WRITE]); 1360 _req_mod(req, TO_BE_SUBMITTED); 1361 /* but we need to give up the spinlock to submit */ 1362 submit_private_bio = true; 1363 } else if (no_remote) { 1364 nodata: 1365 if (__ratelimit(&drbd_ratelimit_state)) 1366 drbd_err(device, "IO ERROR: neither local nor remote data, sector %llu+%u\n", 1367 (unsigned long long)req->i.sector, req->i.size >> 9); 1368 /* A write may have been queued for send_oos, however. 1369 * So we can not simply free it, we must go through drbd_req_put_completion_ref() */ 1370 } 1371 1372 out: 1373 drbd_req_put_completion_ref(req, &m, 1); 1374 spin_unlock_irq(&resource->req_lock); 1375 1376 /* Even though above is a kref_put(), this is safe. 1377 * As long as we still need to submit our private bio, 1378 * we hold a completion ref, and the request cannot disappear. 1379 * If however this request did not even have a private bio to submit 1380 * (e.g. remote read), req may already be invalid now. 1381 * That's why we cannot check on req->private_bio. */ 1382 if (submit_private_bio) 1383 drbd_submit_req_private_bio(req); 1384 if (m.bio) 1385 complete_master_bio(device, &m); 1386 } 1387 1388 void __drbd_make_request(struct drbd_device *device, struct bio *bio, unsigned long start_jif) 1389 { 1390 struct drbd_request *req = drbd_request_prepare(device, bio, start_jif); 1391 if (IS_ERR_OR_NULL(req)) 1392 return; 1393 drbd_send_and_submit(device, req); 1394 } 1395 1396 static void submit_fast_path(struct drbd_device *device, struct list_head *incoming) 1397 { 1398 struct drbd_request *req, *tmp; 1399 list_for_each_entry_safe(req, tmp, incoming, tl_requests) { 1400 const int rw = bio_data_dir(req->master_bio); 1401 1402 if (rw == WRITE /* rw != WRITE should not even end up here! */ 1403 && req->private_bio && req->i.size 1404 && !test_bit(AL_SUSPENDED, &device->flags)) { 1405 if (!drbd_al_begin_io_fastpath(device, &req->i)) 1406 continue; 1407 1408 req->rq_state |= RQ_IN_ACT_LOG; 1409 req->in_actlog_jif = jiffies; 1410 atomic_dec(&device->ap_actlog_cnt); 1411 } 1412 1413 list_del_init(&req->tl_requests); 1414 drbd_send_and_submit(device, req); 1415 } 1416 } 1417 1418 static bool prepare_al_transaction_nonblock(struct drbd_device *device, 1419 struct list_head *incoming, 1420 struct list_head *pending, 1421 struct list_head *later) 1422 { 1423 struct drbd_request *req, *tmp; 1424 int wake = 0; 1425 int err; 1426 1427 spin_lock_irq(&device->al_lock); 1428 list_for_each_entry_safe(req, tmp, incoming, tl_requests) { 1429 err = drbd_al_begin_io_nonblock(device, &req->i); 1430 if (err == -ENOBUFS) 1431 break; 1432 if (err == -EBUSY) 1433 wake = 1; 1434 if (err) 1435 list_move_tail(&req->tl_requests, later); 1436 else 1437 list_move_tail(&req->tl_requests, pending); 1438 } 1439 spin_unlock_irq(&device->al_lock); 1440 if (wake) 1441 wake_up(&device->al_wait); 1442 return !list_empty(pending); 1443 } 1444 1445 void send_and_submit_pending(struct drbd_device *device, struct list_head *pending) 1446 { 1447 struct drbd_request *req, *tmp; 1448 1449 list_for_each_entry_safe(req, tmp, pending, tl_requests) { 1450 req->rq_state |= RQ_IN_ACT_LOG; 1451 req->in_actlog_jif = jiffies; 1452 atomic_dec(&device->ap_actlog_cnt); 1453 list_del_init(&req->tl_requests); 1454 drbd_send_and_submit(device, req); 1455 } 1456 } 1457 1458 void do_submit(struct work_struct *ws) 1459 { 1460 struct drbd_device *device = container_of(ws, struct drbd_device, submit.worker); 1461 LIST_HEAD(incoming); /* from drbd_make_request() */ 1462 LIST_HEAD(pending); /* to be submitted after next AL-transaction commit */ 1463 LIST_HEAD(busy); /* blocked by resync requests */ 1464 1465 /* grab new incoming requests */ 1466 spin_lock_irq(&device->resource->req_lock); 1467 list_splice_tail_init(&device->submit.writes, &incoming); 1468 spin_unlock_irq(&device->resource->req_lock); 1469 1470 for (;;) { 1471 DEFINE_WAIT(wait); 1472 1473 /* move used-to-be-busy back to front of incoming */ 1474 list_splice_init(&busy, &incoming); 1475 submit_fast_path(device, &incoming); 1476 if (list_empty(&incoming)) 1477 break; 1478 1479 for (;;) { 1480 prepare_to_wait(&device->al_wait, &wait, TASK_UNINTERRUPTIBLE); 1481 1482 list_splice_init(&busy, &incoming); 1483 prepare_al_transaction_nonblock(device, &incoming, &pending, &busy); 1484 if (!list_empty(&pending)) 1485 break; 1486 1487 schedule(); 1488 1489 /* If all currently "hot" activity log extents are kept busy by 1490 * incoming requests, we still must not totally starve new 1491 * requests to "cold" extents. 1492 * Something left on &incoming means there had not been 1493 * enough update slots available, and the activity log 1494 * has been marked as "starving". 1495 * 1496 * Try again now, without looking for new requests, 1497 * effectively blocking all new requests until we made 1498 * at least _some_ progress with what we currently have. 1499 */ 1500 if (!list_empty(&incoming)) 1501 continue; 1502 1503 /* Nothing moved to pending, but nothing left 1504 * on incoming: all moved to busy! 1505 * Grab new and iterate. */ 1506 spin_lock_irq(&device->resource->req_lock); 1507 list_splice_tail_init(&device->submit.writes, &incoming); 1508 spin_unlock_irq(&device->resource->req_lock); 1509 } 1510 finish_wait(&device->al_wait, &wait); 1511 1512 /* If the transaction was full, before all incoming requests 1513 * had been processed, skip ahead to commit, and iterate 1514 * without splicing in more incoming requests from upper layers. 1515 * 1516 * Else, if all incoming have been processed, 1517 * they have become either "pending" (to be submitted after 1518 * next transaction commit) or "busy" (blocked by resync). 1519 * 1520 * Maybe more was queued, while we prepared the transaction? 1521 * Try to stuff those into this transaction as well. 1522 * Be strictly non-blocking here, 1523 * we already have something to commit. 1524 * 1525 * Commit if we don't make any more progres. 1526 */ 1527 1528 while (list_empty(&incoming)) { 1529 LIST_HEAD(more_pending); 1530 LIST_HEAD(more_incoming); 1531 bool made_progress; 1532 1533 /* It is ok to look outside the lock, 1534 * it's only an optimization anyways */ 1535 if (list_empty(&device->submit.writes)) 1536 break; 1537 1538 spin_lock_irq(&device->resource->req_lock); 1539 list_splice_tail_init(&device->submit.writes, &more_incoming); 1540 spin_unlock_irq(&device->resource->req_lock); 1541 1542 if (list_empty(&more_incoming)) 1543 break; 1544 1545 made_progress = prepare_al_transaction_nonblock(device, &more_incoming, &more_pending, &busy); 1546 1547 list_splice_tail_init(&more_pending, &pending); 1548 list_splice_tail_init(&more_incoming, &incoming); 1549 if (!made_progress) 1550 break; 1551 } 1552 1553 drbd_al_begin_io_commit(device); 1554 send_and_submit_pending(device, &pending); 1555 } 1556 } 1557 1558 blk_qc_t drbd_make_request(struct request_queue *q, struct bio *bio) 1559 { 1560 struct drbd_device *device = (struct drbd_device *) q->queuedata; 1561 unsigned long start_jif; 1562 1563 blk_queue_split(q, &bio, q->bio_split); 1564 1565 start_jif = jiffies; 1566 1567 /* 1568 * what we "blindly" assume: 1569 */ 1570 D_ASSERT(device, IS_ALIGNED(bio->bi_iter.bi_size, 512)); 1571 1572 inc_ap_bio(device); 1573 __drbd_make_request(device, bio, start_jif); 1574 return BLK_QC_T_NONE; 1575 } 1576 1577 static bool net_timeout_reached(struct drbd_request *net_req, 1578 struct drbd_connection *connection, 1579 unsigned long now, unsigned long ent, 1580 unsigned int ko_count, unsigned int timeout) 1581 { 1582 struct drbd_device *device = net_req->device; 1583 1584 if (!time_after(now, net_req->pre_send_jif + ent)) 1585 return false; 1586 1587 if (time_in_range(now, connection->last_reconnect_jif, connection->last_reconnect_jif + ent)) 1588 return false; 1589 1590 if (net_req->rq_state & RQ_NET_PENDING) { 1591 drbd_warn(device, "Remote failed to finish a request within %ums > ko-count (%u) * timeout (%u * 0.1s)\n", 1592 jiffies_to_msecs(now - net_req->pre_send_jif), ko_count, timeout); 1593 return true; 1594 } 1595 1596 /* We received an ACK already (or are using protocol A), 1597 * but are waiting for the epoch closing barrier ack. 1598 * Check if we sent the barrier already. We should not blame the peer 1599 * for being unresponsive, if we did not even ask it yet. */ 1600 if (net_req->epoch == connection->send.current_epoch_nr) { 1601 drbd_warn(device, 1602 "We did not send a P_BARRIER for %ums > ko-count (%u) * timeout (%u * 0.1s); drbd kernel thread blocked?\n", 1603 jiffies_to_msecs(now - net_req->pre_send_jif), ko_count, timeout); 1604 return false; 1605 } 1606 1607 /* Worst case: we may have been blocked for whatever reason, then 1608 * suddenly are able to send a lot of requests (and epoch separating 1609 * barriers) in quick succession. 1610 * The timestamp of the net_req may be much too old and not correspond 1611 * to the sending time of the relevant unack'ed barrier packet, so 1612 * would trigger a spurious timeout. The latest barrier packet may 1613 * have a too recent timestamp to trigger the timeout, potentially miss 1614 * a timeout. Right now we don't have a place to conveniently store 1615 * these timestamps. 1616 * But in this particular situation, the application requests are still 1617 * completed to upper layers, DRBD should still "feel" responsive. 1618 * No need yet to kill this connection, it may still recover. 1619 * If not, eventually we will have queued enough into the network for 1620 * us to block. From that point of view, the timestamp of the last sent 1621 * barrier packet is relevant enough. 1622 */ 1623 if (time_after(now, connection->send.last_sent_barrier_jif + ent)) { 1624 drbd_warn(device, "Remote failed to answer a P_BARRIER (sent at %lu jif; now=%lu jif) within %ums > ko-count (%u) * timeout (%u * 0.1s)\n", 1625 connection->send.last_sent_barrier_jif, now, 1626 jiffies_to_msecs(now - connection->send.last_sent_barrier_jif), ko_count, timeout); 1627 return true; 1628 } 1629 return false; 1630 } 1631 1632 /* A request is considered timed out, if 1633 * - we have some effective timeout from the configuration, 1634 * with some state restrictions applied, 1635 * - the oldest request is waiting for a response from the network 1636 * resp. the local disk, 1637 * - the oldest request is in fact older than the effective timeout, 1638 * - the connection was established (resp. disk was attached) 1639 * for longer than the timeout already. 1640 * Note that for 32bit jiffies and very stable connections/disks, 1641 * we may have a wrap around, which is catched by 1642 * !time_in_range(now, last_..._jif, last_..._jif + timeout). 1643 * 1644 * Side effect: once per 32bit wrap-around interval, which means every 1645 * ~198 days with 250 HZ, we have a window where the timeout would need 1646 * to expire twice (worst case) to become effective. Good enough. 1647 */ 1648 1649 void request_timer_fn(unsigned long data) 1650 { 1651 struct drbd_device *device = (struct drbd_device *) data; 1652 struct drbd_connection *connection = first_peer_device(device)->connection; 1653 struct drbd_request *req_read, *req_write, *req_peer; /* oldest request */ 1654 struct net_conf *nc; 1655 unsigned long oldest_submit_jif; 1656 unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */ 1657 unsigned long now; 1658 unsigned int ko_count = 0, timeout = 0; 1659 1660 rcu_read_lock(); 1661 nc = rcu_dereference(connection->net_conf); 1662 if (nc && device->state.conn >= C_WF_REPORT_PARAMS) { 1663 ko_count = nc->ko_count; 1664 timeout = nc->timeout; 1665 } 1666 1667 if (get_ldev(device)) { /* implicit state.disk >= D_INCONSISTENT */ 1668 dt = rcu_dereference(device->ldev->disk_conf)->disk_timeout * HZ / 10; 1669 put_ldev(device); 1670 } 1671 rcu_read_unlock(); 1672 1673 1674 ent = timeout * HZ/10 * ko_count; 1675 et = min_not_zero(dt, ent); 1676 1677 if (!et) 1678 return; /* Recurring timer stopped */ 1679 1680 now = jiffies; 1681 nt = now + et; 1682 1683 spin_lock_irq(&device->resource->req_lock); 1684 req_read = list_first_entry_or_null(&device->pending_completion[0], struct drbd_request, req_pending_local); 1685 req_write = list_first_entry_or_null(&device->pending_completion[1], struct drbd_request, req_pending_local); 1686 1687 /* maybe the oldest request waiting for the peer is in fact still 1688 * blocking in tcp sendmsg. That's ok, though, that's handled via the 1689 * socket send timeout, requesting a ping, and bumping ko-count in 1690 * we_should_drop_the_connection(). 1691 */ 1692 1693 /* check the oldest request we did successfully sent, 1694 * but which is still waiting for an ACK. */ 1695 req_peer = connection->req_ack_pending; 1696 1697 /* if we don't have such request (e.g. protocoll A) 1698 * check the oldest requests which is still waiting on its epoch 1699 * closing barrier ack. */ 1700 if (!req_peer) 1701 req_peer = connection->req_not_net_done; 1702 1703 /* evaluate the oldest peer request only in one timer! */ 1704 if (req_peer && req_peer->device != device) 1705 req_peer = NULL; 1706 1707 /* do we have something to evaluate? */ 1708 if (req_peer == NULL && req_write == NULL && req_read == NULL) 1709 goto out; 1710 1711 oldest_submit_jif = 1712 (req_write && req_read) 1713 ? ( time_before(req_write->pre_submit_jif, req_read->pre_submit_jif) 1714 ? req_write->pre_submit_jif : req_read->pre_submit_jif ) 1715 : req_write ? req_write->pre_submit_jif 1716 : req_read ? req_read->pre_submit_jif : now; 1717 1718 if (ent && req_peer && net_timeout_reached(req_peer, connection, now, ent, ko_count, timeout)) 1719 _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_VERBOSE | CS_HARD); 1720 1721 if (dt && oldest_submit_jif != now && 1722 time_after(now, oldest_submit_jif + dt) && 1723 !time_in_range(now, device->last_reattach_jif, device->last_reattach_jif + dt)) { 1724 drbd_warn(device, "Local backing device failed to meet the disk-timeout\n"); 1725 __drbd_chk_io_error(device, DRBD_FORCE_DETACH); 1726 } 1727 1728 /* Reschedule timer for the nearest not already expired timeout. 1729 * Fallback to now + min(effective network timeout, disk timeout). */ 1730 ent = (ent && req_peer && time_before(now, req_peer->pre_send_jif + ent)) 1731 ? req_peer->pre_send_jif + ent : now + et; 1732 dt = (dt && oldest_submit_jif != now && time_before(now, oldest_submit_jif + dt)) 1733 ? oldest_submit_jif + dt : now + et; 1734 nt = time_before(ent, dt) ? ent : dt; 1735 out: 1736 spin_unlock_irq(&device->resource->req_lock); 1737 mod_timer(&device->request_timer, nt); 1738 } 1739