1 /* 2 * QEMU block throttling group infrastructure 3 * 4 * Copyright (C) Nodalink, EURL. 2014 5 * Copyright (C) Igalia, S.L. 2015 6 * 7 * Authors: 8 * Benoît Canet <benoit.canet@nodalink.com> 9 * Alberto Garcia <berto@igalia.com> 10 * 11 * This program is free software; you can redistribute it and/or 12 * modify it under the terms of the GNU General Public License as 13 * published by the Free Software Foundation; either version 2 or 14 * (at your option) version 3 of the License. 15 * 16 * This program is distributed in the hope that it will be useful, 17 * but WITHOUT ANY WARRANTY; without even the implied warranty of 18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 19 * GNU General Public License for more details. 20 * 21 * You should have received a copy of the GNU General Public License 22 * along with this program; if not, see <http://www.gnu.org/licenses/>. 23 */ 24 25 #include "block/throttle-groups.h" 26 #include "qemu/queue.h" 27 #include "qemu/thread.h" 28 #include "sysemu/qtest.h" 29 30 /* The ThrottleGroup structure (with its ThrottleState) is shared 31 * among different BlockDriverState and it's independent from 32 * AioContext, so in order to use it from different threads it needs 33 * its own locking. 34 * 35 * This locking is however handled internally in this file, so it's 36 * mostly transparent to outside users (but see the documentation in 37 * throttle_groups_lock()). 38 * 39 * The whole ThrottleGroup structure is private and invisible to 40 * outside users, that only use it through its ThrottleState. 41 * 42 * In addition to the ThrottleGroup structure, BlockDriverState has 43 * fields that need to be accessed by other members of the group and 44 * therefore also need to be protected by this lock. Once a BDS is 45 * registered in a group those fields can be accessed by other threads 46 * any time. 47 * 48 * Again, all this is handled internally and is mostly transparent to 49 * the outside. The 'throttle_timers' field however has an additional 50 * constraint because it may be temporarily invalid (see for example 51 * bdrv_set_aio_context()). Therefore in this file a thread will 52 * access some other BDS's timers only after verifying that that BDS 53 * has throttled requests in the queue. 54 */ 55 typedef struct ThrottleGroup { 56 char *name; /* This is constant during the lifetime of the group */ 57 58 QemuMutex lock; /* This lock protects the following four fields */ 59 ThrottleState ts; 60 QLIST_HEAD(, BlockDriverState) head; 61 BlockDriverState *tokens[2]; 62 bool any_timer_armed[2]; 63 64 /* These two are protected by the global throttle_groups_lock */ 65 unsigned refcount; 66 QTAILQ_ENTRY(ThrottleGroup) list; 67 } ThrottleGroup; 68 69 static QemuMutex throttle_groups_lock; 70 static QTAILQ_HEAD(, ThrottleGroup) throttle_groups = 71 QTAILQ_HEAD_INITIALIZER(throttle_groups); 72 73 /* Increments the reference count of a ThrottleGroup given its name. 74 * 75 * If no ThrottleGroup is found with the given name a new one is 76 * created. 77 * 78 * @name: the name of the ThrottleGroup 79 * @ret: the ThrottleGroup 80 */ 81 static ThrottleGroup *throttle_group_incref(const char *name) 82 { 83 ThrottleGroup *tg = NULL; 84 ThrottleGroup *iter; 85 86 qemu_mutex_lock(&throttle_groups_lock); 87 88 /* Look for an existing group with that name */ 89 QTAILQ_FOREACH(iter, &throttle_groups, list) { 90 if (!strcmp(name, iter->name)) { 91 tg = iter; 92 break; 93 } 94 } 95 96 /* Create a new one if not found */ 97 if (!tg) { 98 tg = g_new0(ThrottleGroup, 1); 99 tg->name = g_strdup(name); 100 qemu_mutex_init(&tg->lock); 101 throttle_init(&tg->ts); 102 QLIST_INIT(&tg->head); 103 104 QTAILQ_INSERT_TAIL(&throttle_groups, tg, list); 105 } 106 107 tg->refcount++; 108 109 qemu_mutex_unlock(&throttle_groups_lock); 110 111 return tg; 112 } 113 114 /* Decrease the reference count of a ThrottleGroup. 115 * 116 * When the reference count reaches zero the ThrottleGroup is 117 * destroyed. 118 * 119 * @tg: The ThrottleGroup to unref 120 */ 121 static void throttle_group_unref(ThrottleGroup *tg) 122 { 123 qemu_mutex_lock(&throttle_groups_lock); 124 if (--tg->refcount == 0) { 125 QTAILQ_REMOVE(&throttle_groups, tg, list); 126 qemu_mutex_destroy(&tg->lock); 127 g_free(tg->name); 128 g_free(tg); 129 } 130 qemu_mutex_unlock(&throttle_groups_lock); 131 } 132 133 /* Get the name from a BlockDriverState's ThrottleGroup. The name (and 134 * the pointer) is guaranteed to remain constant during the lifetime 135 * of the group. 136 * 137 * @bs: a BlockDriverState that is member of a throttling group 138 * @ret: the name of the group. 139 */ 140 const char *throttle_group_get_name(BlockDriverState *bs) 141 { 142 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 143 return tg->name; 144 } 145 146 /* Return the next BlockDriverState in the round-robin sequence, 147 * simulating a circular list. 148 * 149 * This assumes that tg->lock is held. 150 * 151 * @bs: the current BlockDriverState 152 * @ret: the next BlockDriverState in the sequence 153 */ 154 static BlockDriverState *throttle_group_next_bs(BlockDriverState *bs) 155 { 156 ThrottleState *ts = bs->throttle_state; 157 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 158 BlockDriverState *next = QLIST_NEXT(bs, round_robin); 159 160 if (!next) { 161 return QLIST_FIRST(&tg->head); 162 } 163 164 return next; 165 } 166 167 /* Return the next BlockDriverState in the round-robin sequence with 168 * pending I/O requests. 169 * 170 * This assumes that tg->lock is held. 171 * 172 * @bs: the current BlockDriverState 173 * @is_write: the type of operation (read/write) 174 * @ret: the next BlockDriverState with pending requests, or bs 175 * if there is none. 176 */ 177 static BlockDriverState *next_throttle_token(BlockDriverState *bs, 178 bool is_write) 179 { 180 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 181 BlockDriverState *token, *start; 182 183 start = token = tg->tokens[is_write]; 184 185 /* get next bs round in round robin style */ 186 token = throttle_group_next_bs(token); 187 while (token != start && !token->pending_reqs[is_write]) { 188 token = throttle_group_next_bs(token); 189 } 190 191 /* If no IO are queued for scheduling on the next round robin token 192 * then decide the token is the current bs because chances are 193 * the current bs get the current request queued. 194 */ 195 if (token == start && !token->pending_reqs[is_write]) { 196 token = bs; 197 } 198 199 return token; 200 } 201 202 /* Check if the next I/O request for a BlockDriverState needs to be 203 * throttled or not. If there's no timer set in this group, set one 204 * and update the token accordingly. 205 * 206 * This assumes that tg->lock is held. 207 * 208 * @bs: the current BlockDriverState 209 * @is_write: the type of operation (read/write) 210 * @ret: whether the I/O request needs to be throttled or not 211 */ 212 static bool throttle_group_schedule_timer(BlockDriverState *bs, 213 bool is_write) 214 { 215 ThrottleState *ts = bs->throttle_state; 216 ThrottleTimers *tt = &bs->throttle_timers; 217 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 218 bool must_wait; 219 220 /* Check if any of the timers in this group is already armed */ 221 if (tg->any_timer_armed[is_write]) { 222 return true; 223 } 224 225 must_wait = throttle_schedule_timer(ts, tt, is_write); 226 227 /* If a timer just got armed, set bs as the current token */ 228 if (must_wait) { 229 tg->tokens[is_write] = bs; 230 tg->any_timer_armed[is_write] = true; 231 } 232 233 return must_wait; 234 } 235 236 /* Look for the next pending I/O request and schedule it. 237 * 238 * This assumes that tg->lock is held. 239 * 240 * @bs: the current BlockDriverState 241 * @is_write: the type of operation (read/write) 242 */ 243 static void schedule_next_request(BlockDriverState *bs, bool is_write) 244 { 245 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 246 bool must_wait; 247 BlockDriverState *token; 248 249 /* Check if there's any pending request to schedule next */ 250 token = next_throttle_token(bs, is_write); 251 if (!token->pending_reqs[is_write]) { 252 return; 253 } 254 255 /* Set a timer for the request if it needs to be throttled */ 256 must_wait = throttle_group_schedule_timer(token, is_write); 257 258 /* If it doesn't have to wait, queue it for immediate execution */ 259 if (!must_wait) { 260 /* Give preference to requests from the current bs */ 261 if (qemu_in_coroutine() && 262 qemu_co_queue_next(&bs->throttled_reqs[is_write])) { 263 token = bs; 264 } else { 265 ThrottleTimers *tt = &token->throttle_timers; 266 int64_t now = qemu_clock_get_ns(tt->clock_type); 267 timer_mod(tt->timers[is_write], now + 1); 268 tg->any_timer_armed[is_write] = true; 269 } 270 tg->tokens[is_write] = token; 271 } 272 } 273 274 /* Check if an I/O request needs to be throttled, wait and set a timer 275 * if necessary, and schedule the next request using a round robin 276 * algorithm. 277 * 278 * @bs: the current BlockDriverState 279 * @bytes: the number of bytes for this I/O 280 * @is_write: the type of operation (read/write) 281 */ 282 void coroutine_fn throttle_group_co_io_limits_intercept(BlockDriverState *bs, 283 unsigned int bytes, 284 bool is_write) 285 { 286 bool must_wait; 287 BlockDriverState *token; 288 289 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 290 qemu_mutex_lock(&tg->lock); 291 292 /* First we check if this I/O has to be throttled. */ 293 token = next_throttle_token(bs, is_write); 294 must_wait = throttle_group_schedule_timer(token, is_write); 295 296 /* Wait if there's a timer set or queued requests of this type */ 297 if (must_wait || bs->pending_reqs[is_write]) { 298 bs->pending_reqs[is_write]++; 299 qemu_mutex_unlock(&tg->lock); 300 qemu_co_queue_wait(&bs->throttled_reqs[is_write]); 301 qemu_mutex_lock(&tg->lock); 302 bs->pending_reqs[is_write]--; 303 } 304 305 /* The I/O will be executed, so do the accounting */ 306 throttle_account(bs->throttle_state, is_write, bytes); 307 308 /* Schedule the next request */ 309 schedule_next_request(bs, is_write); 310 311 qemu_mutex_unlock(&tg->lock); 312 } 313 314 /* Update the throttle configuration for a particular group. Similar 315 * to throttle_config(), but guarantees atomicity within the 316 * throttling group. 317 * 318 * @bs: a BlockDriverState that is member of the group 319 * @cfg: the configuration to set 320 */ 321 void throttle_group_config(BlockDriverState *bs, ThrottleConfig *cfg) 322 { 323 ThrottleTimers *tt = &bs->throttle_timers; 324 ThrottleState *ts = bs->throttle_state; 325 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 326 qemu_mutex_lock(&tg->lock); 327 /* throttle_config() cancels the timers */ 328 if (timer_pending(tt->timers[0])) { 329 tg->any_timer_armed[0] = false; 330 } 331 if (timer_pending(tt->timers[1])) { 332 tg->any_timer_armed[1] = false; 333 } 334 throttle_config(ts, tt, cfg); 335 qemu_mutex_unlock(&tg->lock); 336 } 337 338 /* Get the throttle configuration from a particular group. Similar to 339 * throttle_get_config(), but guarantees atomicity within the 340 * throttling group. 341 * 342 * @bs: a BlockDriverState that is member of the group 343 * @cfg: the configuration will be written here 344 */ 345 void throttle_group_get_config(BlockDriverState *bs, ThrottleConfig *cfg) 346 { 347 ThrottleState *ts = bs->throttle_state; 348 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 349 qemu_mutex_lock(&tg->lock); 350 throttle_get_config(ts, cfg); 351 qemu_mutex_unlock(&tg->lock); 352 } 353 354 /* ThrottleTimers callback. This wakes up a request that was waiting 355 * because it had been throttled. 356 * 357 * @bs: the BlockDriverState whose request had been throttled 358 * @is_write: the type of operation (read/write) 359 */ 360 static void timer_cb(BlockDriverState *bs, bool is_write) 361 { 362 ThrottleState *ts = bs->throttle_state; 363 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 364 bool empty_queue; 365 366 /* The timer has just been fired, so we can update the flag */ 367 qemu_mutex_lock(&tg->lock); 368 tg->any_timer_armed[is_write] = false; 369 qemu_mutex_unlock(&tg->lock); 370 371 /* Run the request that was waiting for this timer */ 372 empty_queue = !qemu_co_enter_next(&bs->throttled_reqs[is_write]); 373 374 /* If the request queue was empty then we have to take care of 375 * scheduling the next one */ 376 if (empty_queue) { 377 qemu_mutex_lock(&tg->lock); 378 schedule_next_request(bs, is_write); 379 qemu_mutex_unlock(&tg->lock); 380 } 381 } 382 383 static void read_timer_cb(void *opaque) 384 { 385 timer_cb(opaque, false); 386 } 387 388 static void write_timer_cb(void *opaque) 389 { 390 timer_cb(opaque, true); 391 } 392 393 /* Register a BlockDriverState in the throttling group, also 394 * initializing its timers and updating its throttle_state pointer to 395 * point to it. If a throttling group with that name does not exist 396 * yet, it will be created. 397 * 398 * @bs: the BlockDriverState to insert 399 * @groupname: the name of the group 400 */ 401 void throttle_group_register_bs(BlockDriverState *bs, const char *groupname) 402 { 403 int i; 404 ThrottleGroup *tg = throttle_group_incref(groupname); 405 int clock_type = QEMU_CLOCK_REALTIME; 406 407 if (qtest_enabled()) { 408 /* For testing block IO throttling only */ 409 clock_type = QEMU_CLOCK_VIRTUAL; 410 } 411 412 bs->throttle_state = &tg->ts; 413 414 qemu_mutex_lock(&tg->lock); 415 /* If the ThrottleGroup is new set this BlockDriverState as the token */ 416 for (i = 0; i < 2; i++) { 417 if (!tg->tokens[i]) { 418 tg->tokens[i] = bs; 419 } 420 } 421 422 QLIST_INSERT_HEAD(&tg->head, bs, round_robin); 423 424 throttle_timers_init(&bs->throttle_timers, 425 bdrv_get_aio_context(bs), 426 clock_type, 427 read_timer_cb, 428 write_timer_cb, 429 bs); 430 431 qemu_mutex_unlock(&tg->lock); 432 } 433 434 /* Unregister a BlockDriverState from its group, removing it from the 435 * list, destroying the timers and setting the throttle_state pointer 436 * to NULL. 437 * 438 * The group will be destroyed if it's empty after this operation. 439 * 440 * @bs: the BlockDriverState to remove 441 */ 442 void throttle_group_unregister_bs(BlockDriverState *bs) 443 { 444 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 445 int i; 446 447 qemu_mutex_lock(&tg->lock); 448 for (i = 0; i < 2; i++) { 449 if (tg->tokens[i] == bs) { 450 BlockDriverState *token = throttle_group_next_bs(bs); 451 /* Take care of the case where this is the last bs in the group */ 452 if (token == bs) { 453 token = NULL; 454 } 455 tg->tokens[i] = token; 456 } 457 } 458 459 /* remove the current bs from the list */ 460 QLIST_REMOVE(bs, round_robin); 461 throttle_timers_destroy(&bs->throttle_timers); 462 qemu_mutex_unlock(&tg->lock); 463 464 throttle_group_unref(tg); 465 bs->throttle_state = NULL; 466 } 467 468 /* Acquire the lock of this throttling group. 469 * 470 * You won't normally need to use this. None of the functions from the 471 * ThrottleGroup API require you to acquire the lock since all of them 472 * deal with it internally. 473 * 474 * This should only be used in exceptional cases when you want to 475 * access the protected fields of a BlockDriverState directly 476 * (e.g. bdrv_swap()). 477 * 478 * @bs: a BlockDriverState that is member of the group 479 */ 480 void throttle_group_lock(BlockDriverState *bs) 481 { 482 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 483 qemu_mutex_lock(&tg->lock); 484 } 485 486 /* Release the lock of this throttling group. 487 * 488 * See the comments in throttle_group_lock(). 489 */ 490 void throttle_group_unlock(BlockDriverState *bs) 491 { 492 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 493 qemu_mutex_unlock(&tg->lock); 494 } 495 496 static void throttle_groups_init(void) 497 { 498 qemu_mutex_init(&throttle_groups_lock); 499 } 500 501 block_init(throttle_groups_init); 502