1 /* 2 * QEMU block throttling group infrastructure 3 * 4 * Copyright (C) Nodalink, EURL. 2014 5 * Copyright (C) Igalia, S.L. 2015 6 * 7 * Authors: 8 * Benoît Canet <benoit.canet@nodalink.com> 9 * Alberto Garcia <berto@igalia.com> 10 * 11 * This program is free software; you can redistribute it and/or 12 * modify it under the terms of the GNU General Public License as 13 * published by the Free Software Foundation; either version 2 or 14 * (at your option) version 3 of the License. 15 * 16 * This program is distributed in the hope that it will be useful, 17 * but WITHOUT ANY WARRANTY; without even the implied warranty of 18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 19 * GNU General Public License for more details. 20 * 21 * You should have received a copy of the GNU General Public License 22 * along with this program; if not, see <http://www.gnu.org/licenses/>. 23 */ 24 25 #include "qemu/osdep.h" 26 #include "sysemu/block-backend.h" 27 #include "block/throttle-groups.h" 28 #include "qemu/queue.h" 29 #include "qemu/thread.h" 30 #include "sysemu/qtest.h" 31 32 /* The ThrottleGroup structure (with its ThrottleState) is shared 33 * among different BlockBackends and it's independent from 34 * AioContext, so in order to use it from different threads it needs 35 * its own locking. 36 * 37 * This locking is however handled internally in this file, so it's 38 * transparent to outside users. 39 * 40 * The whole ThrottleGroup structure is private and invisible to 41 * outside users, that only use it through its ThrottleState. 42 * 43 * In addition to the ThrottleGroup structure, BlockBackendPublic has 44 * fields that need to be accessed by other members of the group and 45 * therefore also need to be protected by this lock. Once a 46 * BlockBackend is registered in a group those fields can be accessed 47 * by other threads any time. 48 * 49 * Again, all this is handled internally and is mostly transparent to 50 * the outside. The 'throttle_timers' field however has an additional 51 * constraint because it may be temporarily invalid (see for example 52 * blk_set_aio_context()). Therefore in this file a thread will 53 * access some other BlockBackend's timers only after verifying that 54 * that BlockBackend has throttled requests in the queue. 55 */ 56 typedef struct ThrottleGroup { 57 char *name; /* This is constant during the lifetime of the group */ 58 59 QemuMutex lock; /* This lock protects the following four fields */ 60 ThrottleState ts; 61 QLIST_HEAD(, BlockBackendPublic) head; 62 BlockBackend *tokens[2]; 63 bool any_timer_armed[2]; 64 QEMUClockType clock_type; 65 66 /* These two are protected by the global throttle_groups_lock */ 67 unsigned refcount; 68 QTAILQ_ENTRY(ThrottleGroup) list; 69 } ThrottleGroup; 70 71 static QemuMutex throttle_groups_lock; 72 static QTAILQ_HEAD(, ThrottleGroup) throttle_groups = 73 QTAILQ_HEAD_INITIALIZER(throttle_groups); 74 75 /* Increments the reference count of a ThrottleGroup given its name. 76 * 77 * If no ThrottleGroup is found with the given name a new one is 78 * created. 79 * 80 * @name: the name of the ThrottleGroup 81 * @ret: the ThrottleState member of the ThrottleGroup 82 */ 83 ThrottleState *throttle_group_incref(const char *name) 84 { 85 ThrottleGroup *tg = NULL; 86 ThrottleGroup *iter; 87 88 qemu_mutex_lock(&throttle_groups_lock); 89 90 /* Look for an existing group with that name */ 91 QTAILQ_FOREACH(iter, &throttle_groups, list) { 92 if (!strcmp(name, iter->name)) { 93 tg = iter; 94 break; 95 } 96 } 97 98 /* Create a new one if not found */ 99 if (!tg) { 100 tg = g_new0(ThrottleGroup, 1); 101 tg->name = g_strdup(name); 102 tg->clock_type = QEMU_CLOCK_REALTIME; 103 104 if (qtest_enabled()) { 105 /* For testing block IO throttling only */ 106 tg->clock_type = QEMU_CLOCK_VIRTUAL; 107 } 108 qemu_mutex_init(&tg->lock); 109 throttle_init(&tg->ts); 110 QLIST_INIT(&tg->head); 111 112 QTAILQ_INSERT_TAIL(&throttle_groups, tg, list); 113 } 114 115 tg->refcount++; 116 117 qemu_mutex_unlock(&throttle_groups_lock); 118 119 return &tg->ts; 120 } 121 122 /* Decrease the reference count of a ThrottleGroup. 123 * 124 * When the reference count reaches zero the ThrottleGroup is 125 * destroyed. 126 * 127 * @ts: The ThrottleGroup to unref, given by its ThrottleState member 128 */ 129 void throttle_group_unref(ThrottleState *ts) 130 { 131 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 132 133 qemu_mutex_lock(&throttle_groups_lock); 134 if (--tg->refcount == 0) { 135 QTAILQ_REMOVE(&throttle_groups, tg, list); 136 qemu_mutex_destroy(&tg->lock); 137 g_free(tg->name); 138 g_free(tg); 139 } 140 qemu_mutex_unlock(&throttle_groups_lock); 141 } 142 143 /* Get the name from a BlockBackend's ThrottleGroup. The name (and the pointer) 144 * is guaranteed to remain constant during the lifetime of the group. 145 * 146 * @blk: a BlockBackend that is member of a throttling group 147 * @ret: the name of the group. 148 */ 149 const char *throttle_group_get_name(BlockBackend *blk) 150 { 151 BlockBackendPublic *blkp = blk_get_public(blk); 152 ThrottleGroup *tg = container_of(blkp->throttle_state, ThrottleGroup, ts); 153 return tg->name; 154 } 155 156 /* Return the next BlockBackend in the round-robin sequence, simulating a 157 * circular list. 158 * 159 * This assumes that tg->lock is held. 160 * 161 * @blk: the current BlockBackend 162 * @ret: the next BlockBackend in the sequence 163 */ 164 static BlockBackend *throttle_group_next_blk(BlockBackend *blk) 165 { 166 BlockBackendPublic *blkp = blk_get_public(blk); 167 ThrottleState *ts = blkp->throttle_state; 168 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 169 BlockBackendPublic *next = QLIST_NEXT(blkp, round_robin); 170 171 if (!next) { 172 next = QLIST_FIRST(&tg->head); 173 } 174 175 return blk_by_public(next); 176 } 177 178 /* 179 * Return whether a BlockBackend has pending requests. 180 * 181 * This assumes that tg->lock is held. 182 * 183 * @blk: the BlockBackend 184 * @is_write: the type of operation (read/write) 185 * @ret: whether the BlockBackend has pending requests. 186 */ 187 static inline bool blk_has_pending_reqs(BlockBackend *blk, 188 bool is_write) 189 { 190 const BlockBackendPublic *blkp = blk_get_public(blk); 191 return blkp->pending_reqs[is_write]; 192 } 193 194 /* Return the next BlockBackend in the round-robin sequence with pending I/O 195 * requests. 196 * 197 * This assumes that tg->lock is held. 198 * 199 * @blk: the current BlockBackend 200 * @is_write: the type of operation (read/write) 201 * @ret: the next BlockBackend with pending requests, or blk if there is 202 * none. 203 */ 204 static BlockBackend *next_throttle_token(BlockBackend *blk, bool is_write) 205 { 206 BlockBackendPublic *blkp = blk_get_public(blk); 207 ThrottleGroup *tg = container_of(blkp->throttle_state, ThrottleGroup, ts); 208 BlockBackend *token, *start; 209 210 start = token = tg->tokens[is_write]; 211 212 /* get next bs round in round robin style */ 213 token = throttle_group_next_blk(token); 214 while (token != start && !blk_has_pending_reqs(token, is_write)) { 215 token = throttle_group_next_blk(token); 216 } 217 218 /* If no IO are queued for scheduling on the next round robin token 219 * then decide the token is the current bs because chances are 220 * the current bs get the current request queued. 221 */ 222 if (token == start && !blk_has_pending_reqs(token, is_write)) { 223 token = blk; 224 } 225 226 /* Either we return the original BB, or one with pending requests */ 227 assert(token == blk || blk_has_pending_reqs(token, is_write)); 228 229 return token; 230 } 231 232 /* Check if the next I/O request for a BlockBackend needs to be throttled or 233 * not. If there's no timer set in this group, set one and update the token 234 * accordingly. 235 * 236 * This assumes that tg->lock is held. 237 * 238 * @blk: the current BlockBackend 239 * @is_write: the type of operation (read/write) 240 * @ret: whether the I/O request needs to be throttled or not 241 */ 242 static bool throttle_group_schedule_timer(BlockBackend *blk, bool is_write) 243 { 244 BlockBackendPublic *blkp = blk_get_public(blk); 245 ThrottleState *ts = blkp->throttle_state; 246 ThrottleTimers *tt = &blkp->throttle_timers; 247 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 248 bool must_wait; 249 250 if (atomic_read(&blkp->io_limits_disabled)) { 251 return false; 252 } 253 254 /* Check if any of the timers in this group is already armed */ 255 if (tg->any_timer_armed[is_write]) { 256 return true; 257 } 258 259 must_wait = throttle_schedule_timer(ts, tt, is_write); 260 261 /* If a timer just got armed, set blk as the current token */ 262 if (must_wait) { 263 tg->tokens[is_write] = blk; 264 tg->any_timer_armed[is_write] = true; 265 } 266 267 return must_wait; 268 } 269 270 /* Start the next pending I/O request for a BlockBackend. Return whether 271 * any request was actually pending. 272 * 273 * @blk: the current BlockBackend 274 * @is_write: the type of operation (read/write) 275 */ 276 static bool coroutine_fn throttle_group_co_restart_queue(BlockBackend *blk, 277 bool is_write) 278 { 279 BlockBackendPublic *blkp = blk_get_public(blk); 280 bool ret; 281 282 qemu_co_mutex_lock(&blkp->throttled_reqs_lock); 283 ret = qemu_co_queue_next(&blkp->throttled_reqs[is_write]); 284 qemu_co_mutex_unlock(&blkp->throttled_reqs_lock); 285 286 return ret; 287 } 288 289 /* Look for the next pending I/O request and schedule it. 290 * 291 * This assumes that tg->lock is held. 292 * 293 * @blk: the current BlockBackend 294 * @is_write: the type of operation (read/write) 295 */ 296 static void schedule_next_request(BlockBackend *blk, bool is_write) 297 { 298 BlockBackendPublic *blkp = blk_get_public(blk); 299 ThrottleGroup *tg = container_of(blkp->throttle_state, ThrottleGroup, ts); 300 bool must_wait; 301 BlockBackend *token; 302 303 /* Check if there's any pending request to schedule next */ 304 token = next_throttle_token(blk, is_write); 305 if (!blk_has_pending_reqs(token, is_write)) { 306 return; 307 } 308 309 /* Set a timer for the request if it needs to be throttled */ 310 must_wait = throttle_group_schedule_timer(token, is_write); 311 312 /* If it doesn't have to wait, queue it for immediate execution */ 313 if (!must_wait) { 314 /* Give preference to requests from the current blk */ 315 if (qemu_in_coroutine() && 316 throttle_group_co_restart_queue(blk, is_write)) { 317 token = blk; 318 } else { 319 ThrottleTimers *tt = &blk_get_public(token)->throttle_timers; 320 int64_t now = qemu_clock_get_ns(tg->clock_type); 321 timer_mod(tt->timers[is_write], now); 322 tg->any_timer_armed[is_write] = true; 323 } 324 tg->tokens[is_write] = token; 325 } 326 } 327 328 /* Check if an I/O request needs to be throttled, wait and set a timer 329 * if necessary, and schedule the next request using a round robin 330 * algorithm. 331 * 332 * @blk: the current BlockBackend 333 * @bytes: the number of bytes for this I/O 334 * @is_write: the type of operation (read/write) 335 */ 336 void coroutine_fn throttle_group_co_io_limits_intercept(BlockBackend *blk, 337 unsigned int bytes, 338 bool is_write) 339 { 340 bool must_wait; 341 BlockBackend *token; 342 343 BlockBackendPublic *blkp = blk_get_public(blk); 344 ThrottleGroup *tg = container_of(blkp->throttle_state, ThrottleGroup, ts); 345 qemu_mutex_lock(&tg->lock); 346 347 /* First we check if this I/O has to be throttled. */ 348 token = next_throttle_token(blk, is_write); 349 must_wait = throttle_group_schedule_timer(token, is_write); 350 351 /* Wait if there's a timer set or queued requests of this type */ 352 if (must_wait || blkp->pending_reqs[is_write]) { 353 blkp->pending_reqs[is_write]++; 354 qemu_mutex_unlock(&tg->lock); 355 qemu_co_mutex_lock(&blkp->throttled_reqs_lock); 356 qemu_co_queue_wait(&blkp->throttled_reqs[is_write], 357 &blkp->throttled_reqs_lock); 358 qemu_co_mutex_unlock(&blkp->throttled_reqs_lock); 359 qemu_mutex_lock(&tg->lock); 360 blkp->pending_reqs[is_write]--; 361 } 362 363 /* The I/O will be executed, so do the accounting */ 364 throttle_account(blkp->throttle_state, is_write, bytes); 365 366 /* Schedule the next request */ 367 schedule_next_request(blk, is_write); 368 369 qemu_mutex_unlock(&tg->lock); 370 } 371 372 typedef struct { 373 BlockBackend *blk; 374 bool is_write; 375 } RestartData; 376 377 static void coroutine_fn throttle_group_restart_queue_entry(void *opaque) 378 { 379 RestartData *data = opaque; 380 BlockBackend *blk = data->blk; 381 bool is_write = data->is_write; 382 BlockBackendPublic *blkp = blk_get_public(blk); 383 ThrottleGroup *tg = container_of(blkp->throttle_state, ThrottleGroup, ts); 384 bool empty_queue; 385 386 empty_queue = !throttle_group_co_restart_queue(blk, is_write); 387 388 /* If the request queue was empty then we have to take care of 389 * scheduling the next one */ 390 if (empty_queue) { 391 qemu_mutex_lock(&tg->lock); 392 schedule_next_request(blk, is_write); 393 qemu_mutex_unlock(&tg->lock); 394 } 395 } 396 397 static void throttle_group_restart_queue(BlockBackend *blk, bool is_write) 398 { 399 Coroutine *co; 400 RestartData rd = { 401 .blk = blk, 402 .is_write = is_write 403 }; 404 405 co = qemu_coroutine_create(throttle_group_restart_queue_entry, &rd); 406 aio_co_enter(blk_get_aio_context(blk), co); 407 } 408 409 void throttle_group_restart_blk(BlockBackend *blk) 410 { 411 BlockBackendPublic *blkp = blk_get_public(blk); 412 413 if (blkp->throttle_state) { 414 throttle_group_restart_queue(blk, 0); 415 throttle_group_restart_queue(blk, 1); 416 } 417 } 418 419 /* Update the throttle configuration for a particular group. Similar 420 * to throttle_config(), but guarantees atomicity within the 421 * throttling group. 422 * 423 * @blk: a BlockBackend that is a member of the group 424 * @cfg: the configuration to set 425 */ 426 void throttle_group_config(BlockBackend *blk, ThrottleConfig *cfg) 427 { 428 BlockBackendPublic *blkp = blk_get_public(blk); 429 ThrottleState *ts = blkp->throttle_state; 430 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 431 qemu_mutex_lock(&tg->lock); 432 throttle_config(ts, tg->clock_type, cfg); 433 qemu_mutex_unlock(&tg->lock); 434 435 throttle_group_restart_blk(blk); 436 } 437 438 /* Get the throttle configuration from a particular group. Similar to 439 * throttle_get_config(), but guarantees atomicity within the 440 * throttling group. 441 * 442 * @blk: a BlockBackend that is a member of the group 443 * @cfg: the configuration will be written here 444 */ 445 void throttle_group_get_config(BlockBackend *blk, ThrottleConfig *cfg) 446 { 447 BlockBackendPublic *blkp = blk_get_public(blk); 448 ThrottleState *ts = blkp->throttle_state; 449 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 450 qemu_mutex_lock(&tg->lock); 451 throttle_get_config(ts, cfg); 452 qemu_mutex_unlock(&tg->lock); 453 } 454 455 /* ThrottleTimers callback. This wakes up a request that was waiting 456 * because it had been throttled. 457 * 458 * @blk: the BlockBackend whose request had been throttled 459 * @is_write: the type of operation (read/write) 460 */ 461 static void timer_cb(BlockBackend *blk, bool is_write) 462 { 463 BlockBackendPublic *blkp = blk_get_public(blk); 464 ThrottleState *ts = blkp->throttle_state; 465 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 466 467 /* The timer has just been fired, so we can update the flag */ 468 qemu_mutex_lock(&tg->lock); 469 tg->any_timer_armed[is_write] = false; 470 qemu_mutex_unlock(&tg->lock); 471 472 /* Run the request that was waiting for this timer */ 473 throttle_group_restart_queue(blk, is_write); 474 } 475 476 static void read_timer_cb(void *opaque) 477 { 478 timer_cb(opaque, false); 479 } 480 481 static void write_timer_cb(void *opaque) 482 { 483 timer_cb(opaque, true); 484 } 485 486 /* Register a BlockBackend in the throttling group, also initializing its 487 * timers and updating its throttle_state pointer to point to it. If a 488 * throttling group with that name does not exist yet, it will be created. 489 * 490 * @blk: the BlockBackend to insert 491 * @groupname: the name of the group 492 */ 493 void throttle_group_register_blk(BlockBackend *blk, const char *groupname) 494 { 495 int i; 496 BlockBackendPublic *blkp = blk_get_public(blk); 497 ThrottleState *ts = throttle_group_incref(groupname); 498 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 499 blkp->throttle_state = ts; 500 501 qemu_mutex_lock(&tg->lock); 502 /* If the ThrottleGroup is new set this BlockBackend as the token */ 503 for (i = 0; i < 2; i++) { 504 if (!tg->tokens[i]) { 505 tg->tokens[i] = blk; 506 } 507 } 508 509 QLIST_INSERT_HEAD(&tg->head, blkp, round_robin); 510 511 throttle_timers_init(&blkp->throttle_timers, 512 blk_get_aio_context(blk), 513 tg->clock_type, 514 read_timer_cb, 515 write_timer_cb, 516 blk); 517 518 qemu_mutex_unlock(&tg->lock); 519 } 520 521 /* Unregister a BlockBackend from its group, removing it from the list, 522 * destroying the timers and setting the throttle_state pointer to NULL. 523 * 524 * The BlockBackend must not have pending throttled requests, so the caller has 525 * to drain them first. 526 * 527 * The group will be destroyed if it's empty after this operation. 528 * 529 * @blk: the BlockBackend to remove 530 */ 531 void throttle_group_unregister_blk(BlockBackend *blk) 532 { 533 BlockBackendPublic *blkp = blk_get_public(blk); 534 ThrottleGroup *tg = container_of(blkp->throttle_state, ThrottleGroup, ts); 535 int i; 536 537 assert(blkp->pending_reqs[0] == 0 && blkp->pending_reqs[1] == 0); 538 assert(qemu_co_queue_empty(&blkp->throttled_reqs[0])); 539 assert(qemu_co_queue_empty(&blkp->throttled_reqs[1])); 540 541 qemu_mutex_lock(&tg->lock); 542 for (i = 0; i < 2; i++) { 543 if (tg->tokens[i] == blk) { 544 BlockBackend *token = throttle_group_next_blk(blk); 545 /* Take care of the case where this is the last blk in the group */ 546 if (token == blk) { 547 token = NULL; 548 } 549 tg->tokens[i] = token; 550 } 551 } 552 553 /* remove the current blk from the list */ 554 QLIST_REMOVE(blkp, round_robin); 555 throttle_timers_destroy(&blkp->throttle_timers); 556 qemu_mutex_unlock(&tg->lock); 557 558 throttle_group_unref(&tg->ts); 559 blkp->throttle_state = NULL; 560 } 561 562 static void throttle_groups_init(void) 563 { 564 qemu_mutex_init(&throttle_groups_lock); 565 } 566 567 block_init(throttle_groups_init); 568