1 /* 2 * QEMU block throttling group infrastructure 3 * 4 * Copyright (C) Nodalink, EURL. 2014 5 * Copyright (C) Igalia, S.L. 2015 6 * 7 * Authors: 8 * Benoît Canet <benoit.canet@nodalink.com> 9 * Alberto Garcia <berto@igalia.com> 10 * 11 * This program is free software; you can redistribute it and/or 12 * modify it under the terms of the GNU General Public License as 13 * published by the Free Software Foundation; either version 2 or 14 * (at your option) version 3 of the License. 15 * 16 * This program is distributed in the hope that it will be useful, 17 * but WITHOUT ANY WARRANTY; without even the implied warranty of 18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 19 * GNU General Public License for more details. 20 * 21 * You should have received a copy of the GNU General Public License 22 * along with this program; if not, see <http://www.gnu.org/licenses/>. 23 */ 24 25 #include "qemu/osdep.h" 26 #include "block/throttle-groups.h" 27 #include "qemu/queue.h" 28 #include "qemu/thread.h" 29 #include "sysemu/qtest.h" 30 31 /* The ThrottleGroup structure (with its ThrottleState) is shared 32 * among different BlockDriverState and it's independent from 33 * AioContext, so in order to use it from different threads it needs 34 * its own locking. 35 * 36 * This locking is however handled internally in this file, so it's 37 * transparent to outside users. 38 * 39 * The whole ThrottleGroup structure is private and invisible to 40 * outside users, that only use it through its ThrottleState. 41 * 42 * In addition to the ThrottleGroup structure, BlockDriverState has 43 * fields that need to be accessed by other members of the group and 44 * therefore also need to be protected by this lock. Once a BDS is 45 * registered in a group those fields can be accessed by other threads 46 * any time. 47 * 48 * Again, all this is handled internally and is mostly transparent to 49 * the outside. The 'throttle_timers' field however has an additional 50 * constraint because it may be temporarily invalid (see for example 51 * bdrv_set_aio_context()). Therefore in this file a thread will 52 * access some other BDS's timers only after verifying that that BDS 53 * has throttled requests in the queue. 54 */ 55 typedef struct ThrottleGroup { 56 char *name; /* This is constant during the lifetime of the group */ 57 58 QemuMutex lock; /* This lock protects the following four fields */ 59 ThrottleState ts; 60 QLIST_HEAD(, BlockDriverState) head; 61 BlockDriverState *tokens[2]; 62 bool any_timer_armed[2]; 63 64 /* These two are protected by the global throttle_groups_lock */ 65 unsigned refcount; 66 QTAILQ_ENTRY(ThrottleGroup) list; 67 } ThrottleGroup; 68 69 static QemuMutex throttle_groups_lock; 70 static QTAILQ_HEAD(, ThrottleGroup) throttle_groups = 71 QTAILQ_HEAD_INITIALIZER(throttle_groups); 72 73 /* Increments the reference count of a ThrottleGroup given its name. 74 * 75 * If no ThrottleGroup is found with the given name a new one is 76 * created. 77 * 78 * @name: the name of the ThrottleGroup 79 * @ret: the ThrottleState member of the ThrottleGroup 80 */ 81 ThrottleState *throttle_group_incref(const char *name) 82 { 83 ThrottleGroup *tg = NULL; 84 ThrottleGroup *iter; 85 86 qemu_mutex_lock(&throttle_groups_lock); 87 88 /* Look for an existing group with that name */ 89 QTAILQ_FOREACH(iter, &throttle_groups, list) { 90 if (!strcmp(name, iter->name)) { 91 tg = iter; 92 break; 93 } 94 } 95 96 /* Create a new one if not found */ 97 if (!tg) { 98 tg = g_new0(ThrottleGroup, 1); 99 tg->name = g_strdup(name); 100 qemu_mutex_init(&tg->lock); 101 throttle_init(&tg->ts); 102 QLIST_INIT(&tg->head); 103 104 QTAILQ_INSERT_TAIL(&throttle_groups, tg, list); 105 } 106 107 tg->refcount++; 108 109 qemu_mutex_unlock(&throttle_groups_lock); 110 111 return &tg->ts; 112 } 113 114 /* Decrease the reference count of a ThrottleGroup. 115 * 116 * When the reference count reaches zero the ThrottleGroup is 117 * destroyed. 118 * 119 * @ts: The ThrottleGroup to unref, given by its ThrottleState member 120 */ 121 void throttle_group_unref(ThrottleState *ts) 122 { 123 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 124 125 qemu_mutex_lock(&throttle_groups_lock); 126 if (--tg->refcount == 0) { 127 QTAILQ_REMOVE(&throttle_groups, tg, list); 128 qemu_mutex_destroy(&tg->lock); 129 g_free(tg->name); 130 g_free(tg); 131 } 132 qemu_mutex_unlock(&throttle_groups_lock); 133 } 134 135 /* Get the name from a BlockDriverState's ThrottleGroup. The name (and 136 * the pointer) is guaranteed to remain constant during the lifetime 137 * of the group. 138 * 139 * @bs: a BlockDriverState that is member of a throttling group 140 * @ret: the name of the group. 141 */ 142 const char *throttle_group_get_name(BlockDriverState *bs) 143 { 144 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 145 return tg->name; 146 } 147 148 /* Return the next BlockDriverState in the round-robin sequence, 149 * simulating a circular list. 150 * 151 * This assumes that tg->lock is held. 152 * 153 * @bs: the current BlockDriverState 154 * @ret: the next BlockDriverState in the sequence 155 */ 156 static BlockDriverState *throttle_group_next_bs(BlockDriverState *bs) 157 { 158 ThrottleState *ts = bs->throttle_state; 159 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 160 BlockDriverState *next = QLIST_NEXT(bs, round_robin); 161 162 if (!next) { 163 return QLIST_FIRST(&tg->head); 164 } 165 166 return next; 167 } 168 169 /* Return the next BlockDriverState in the round-robin sequence with 170 * pending I/O requests. 171 * 172 * This assumes that tg->lock is held. 173 * 174 * @bs: the current BlockDriverState 175 * @is_write: the type of operation (read/write) 176 * @ret: the next BlockDriverState with pending requests, or bs 177 * if there is none. 178 */ 179 static BlockDriverState *next_throttle_token(BlockDriverState *bs, 180 bool is_write) 181 { 182 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 183 BlockDriverState *token, *start; 184 185 start = token = tg->tokens[is_write]; 186 187 /* get next bs round in round robin style */ 188 token = throttle_group_next_bs(token); 189 while (token != start && !token->pending_reqs[is_write]) { 190 token = throttle_group_next_bs(token); 191 } 192 193 /* If no IO are queued for scheduling on the next round robin token 194 * then decide the token is the current bs because chances are 195 * the current bs get the current request queued. 196 */ 197 if (token == start && !token->pending_reqs[is_write]) { 198 token = bs; 199 } 200 201 return token; 202 } 203 204 /* Check if the next I/O request for a BlockDriverState needs to be 205 * throttled or not. If there's no timer set in this group, set one 206 * and update the token accordingly. 207 * 208 * This assumes that tg->lock is held. 209 * 210 * @bs: the current BlockDriverState 211 * @is_write: the type of operation (read/write) 212 * @ret: whether the I/O request needs to be throttled or not 213 */ 214 static bool throttle_group_schedule_timer(BlockDriverState *bs, 215 bool is_write) 216 { 217 ThrottleState *ts = bs->throttle_state; 218 ThrottleTimers *tt = &bs->throttle_timers; 219 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 220 bool must_wait; 221 222 if (bs->io_limits_disabled) { 223 return false; 224 } 225 226 /* Check if any of the timers in this group is already armed */ 227 if (tg->any_timer_armed[is_write]) { 228 return true; 229 } 230 231 must_wait = throttle_schedule_timer(ts, tt, is_write); 232 233 /* If a timer just got armed, set bs as the current token */ 234 if (must_wait) { 235 tg->tokens[is_write] = bs; 236 tg->any_timer_armed[is_write] = true; 237 } 238 239 return must_wait; 240 } 241 242 /* Look for the next pending I/O request and schedule it. 243 * 244 * This assumes that tg->lock is held. 245 * 246 * @bs: the current BlockDriverState 247 * @is_write: the type of operation (read/write) 248 */ 249 static void schedule_next_request(BlockDriverState *bs, bool is_write) 250 { 251 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 252 bool must_wait; 253 BlockDriverState *token; 254 255 /* Check if there's any pending request to schedule next */ 256 token = next_throttle_token(bs, is_write); 257 if (!token->pending_reqs[is_write]) { 258 return; 259 } 260 261 /* Set a timer for the request if it needs to be throttled */ 262 must_wait = throttle_group_schedule_timer(token, is_write); 263 264 /* If it doesn't have to wait, queue it for immediate execution */ 265 if (!must_wait) { 266 /* Give preference to requests from the current bs */ 267 if (qemu_in_coroutine() && 268 qemu_co_queue_next(&bs->throttled_reqs[is_write])) { 269 token = bs; 270 } else { 271 ThrottleTimers *tt = &token->throttle_timers; 272 int64_t now = qemu_clock_get_ns(tt->clock_type); 273 timer_mod(tt->timers[is_write], now + 1); 274 tg->any_timer_armed[is_write] = true; 275 } 276 tg->tokens[is_write] = token; 277 } 278 } 279 280 /* Check if an I/O request needs to be throttled, wait and set a timer 281 * if necessary, and schedule the next request using a round robin 282 * algorithm. 283 * 284 * @bs: the current BlockDriverState 285 * @bytes: the number of bytes for this I/O 286 * @is_write: the type of operation (read/write) 287 */ 288 void coroutine_fn throttle_group_co_io_limits_intercept(BlockDriverState *bs, 289 unsigned int bytes, 290 bool is_write) 291 { 292 bool must_wait; 293 BlockDriverState *token; 294 295 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 296 qemu_mutex_lock(&tg->lock); 297 298 /* First we check if this I/O has to be throttled. */ 299 token = next_throttle_token(bs, is_write); 300 must_wait = throttle_group_schedule_timer(token, is_write); 301 302 /* Wait if there's a timer set or queued requests of this type */ 303 if (must_wait || bs->pending_reqs[is_write]) { 304 bs->pending_reqs[is_write]++; 305 qemu_mutex_unlock(&tg->lock); 306 qemu_co_queue_wait(&bs->throttled_reqs[is_write]); 307 qemu_mutex_lock(&tg->lock); 308 bs->pending_reqs[is_write]--; 309 } 310 311 /* The I/O will be executed, so do the accounting */ 312 throttle_account(bs->throttle_state, is_write, bytes); 313 314 /* Schedule the next request */ 315 schedule_next_request(bs, is_write); 316 317 qemu_mutex_unlock(&tg->lock); 318 } 319 320 void throttle_group_restart_bs(BlockDriverState *bs) 321 { 322 int i; 323 324 for (i = 0; i < 2; i++) { 325 while (qemu_co_enter_next(&bs->throttled_reqs[i])) { 326 ; 327 } 328 } 329 } 330 331 /* Update the throttle configuration for a particular group. Similar 332 * to throttle_config(), but guarantees atomicity within the 333 * throttling group. 334 * 335 * @bs: a BlockDriverState that is member of the group 336 * @cfg: the configuration to set 337 */ 338 void throttle_group_config(BlockDriverState *bs, ThrottleConfig *cfg) 339 { 340 ThrottleTimers *tt = &bs->throttle_timers; 341 ThrottleState *ts = bs->throttle_state; 342 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 343 qemu_mutex_lock(&tg->lock); 344 /* throttle_config() cancels the timers */ 345 if (timer_pending(tt->timers[0])) { 346 tg->any_timer_armed[0] = false; 347 } 348 if (timer_pending(tt->timers[1])) { 349 tg->any_timer_armed[1] = false; 350 } 351 throttle_config(ts, tt, cfg); 352 qemu_mutex_unlock(&tg->lock); 353 354 qemu_co_enter_next(&bs->throttled_reqs[0]); 355 qemu_co_enter_next(&bs->throttled_reqs[1]); 356 } 357 358 /* Get the throttle configuration from a particular group. Similar to 359 * throttle_get_config(), but guarantees atomicity within the 360 * throttling group. 361 * 362 * @bs: a BlockDriverState that is member of the group 363 * @cfg: the configuration will be written here 364 */ 365 void throttle_group_get_config(BlockDriverState *bs, ThrottleConfig *cfg) 366 { 367 ThrottleState *ts = bs->throttle_state; 368 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 369 qemu_mutex_lock(&tg->lock); 370 throttle_get_config(ts, cfg); 371 qemu_mutex_unlock(&tg->lock); 372 } 373 374 /* ThrottleTimers callback. This wakes up a request that was waiting 375 * because it had been throttled. 376 * 377 * @bs: the BlockDriverState whose request had been throttled 378 * @is_write: the type of operation (read/write) 379 */ 380 static void timer_cb(BlockDriverState *bs, bool is_write) 381 { 382 ThrottleState *ts = bs->throttle_state; 383 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 384 bool empty_queue; 385 386 /* The timer has just been fired, so we can update the flag */ 387 qemu_mutex_lock(&tg->lock); 388 tg->any_timer_armed[is_write] = false; 389 qemu_mutex_unlock(&tg->lock); 390 391 /* Run the request that was waiting for this timer */ 392 empty_queue = !qemu_co_enter_next(&bs->throttled_reqs[is_write]); 393 394 /* If the request queue was empty then we have to take care of 395 * scheduling the next one */ 396 if (empty_queue) { 397 qemu_mutex_lock(&tg->lock); 398 schedule_next_request(bs, is_write); 399 qemu_mutex_unlock(&tg->lock); 400 } 401 } 402 403 static void read_timer_cb(void *opaque) 404 { 405 timer_cb(opaque, false); 406 } 407 408 static void write_timer_cb(void *opaque) 409 { 410 timer_cb(opaque, true); 411 } 412 413 /* Register a BlockDriverState in the throttling group, also 414 * initializing its timers and updating its throttle_state pointer to 415 * point to it. If a throttling group with that name does not exist 416 * yet, it will be created. 417 * 418 * @bs: the BlockDriverState to insert 419 * @groupname: the name of the group 420 */ 421 void throttle_group_register_bs(BlockDriverState *bs, const char *groupname) 422 { 423 int i; 424 ThrottleState *ts = throttle_group_incref(groupname); 425 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 426 int clock_type = QEMU_CLOCK_REALTIME; 427 428 if (qtest_enabled()) { 429 /* For testing block IO throttling only */ 430 clock_type = QEMU_CLOCK_VIRTUAL; 431 } 432 433 bs->throttle_state = ts; 434 435 qemu_mutex_lock(&tg->lock); 436 /* If the ThrottleGroup is new set this BlockDriverState as the token */ 437 for (i = 0; i < 2; i++) { 438 if (!tg->tokens[i]) { 439 tg->tokens[i] = bs; 440 } 441 } 442 443 QLIST_INSERT_HEAD(&tg->head, bs, round_robin); 444 445 throttle_timers_init(&bs->throttle_timers, 446 bdrv_get_aio_context(bs), 447 clock_type, 448 read_timer_cb, 449 write_timer_cb, 450 bs); 451 452 qemu_mutex_unlock(&tg->lock); 453 } 454 455 /* Unregister a BlockDriverState from its group, removing it from the 456 * list, destroying the timers and setting the throttle_state pointer 457 * to NULL. 458 * 459 * The BlockDriverState must not have pending throttled requests, so 460 * the caller has to drain them first. 461 * 462 * The group will be destroyed if it's empty after this operation. 463 * 464 * @bs: the BlockDriverState to remove 465 */ 466 void throttle_group_unregister_bs(BlockDriverState *bs) 467 { 468 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 469 int i; 470 471 assert(bs->pending_reqs[0] == 0 && bs->pending_reqs[1] == 0); 472 assert(qemu_co_queue_empty(&bs->throttled_reqs[0])); 473 assert(qemu_co_queue_empty(&bs->throttled_reqs[1])); 474 475 qemu_mutex_lock(&tg->lock); 476 for (i = 0; i < 2; i++) { 477 if (tg->tokens[i] == bs) { 478 BlockDriverState *token = throttle_group_next_bs(bs); 479 /* Take care of the case where this is the last bs in the group */ 480 if (token == bs) { 481 token = NULL; 482 } 483 tg->tokens[i] = token; 484 } 485 } 486 487 /* remove the current bs from the list */ 488 QLIST_REMOVE(bs, round_robin); 489 throttle_timers_destroy(&bs->throttle_timers); 490 qemu_mutex_unlock(&tg->lock); 491 492 throttle_group_unref(&tg->ts); 493 bs->throttle_state = NULL; 494 } 495 496 static void throttle_groups_init(void) 497 { 498 qemu_mutex_init(&throttle_groups_lock); 499 } 500 501 block_init(throttle_groups_init); 502