1 /* 2 * QEMU block throttling group infrastructure 3 * 4 * Copyright (C) Nodalink, EURL. 2014 5 * Copyright (C) Igalia, S.L. 2015 6 * 7 * Authors: 8 * Benoît Canet <benoit.canet@nodalink.com> 9 * Alberto Garcia <berto@igalia.com> 10 * 11 * This program is free software; you can redistribute it and/or 12 * modify it under the terms of the GNU General Public License as 13 * published by the Free Software Foundation; either version 2 or 14 * (at your option) version 3 of the License. 15 * 16 * This program is distributed in the hope that it will be useful, 17 * but WITHOUT ANY WARRANTY; without even the implied warranty of 18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 19 * GNU General Public License for more details. 20 * 21 * You should have received a copy of the GNU General Public License 22 * along with this program; if not, see <http://www.gnu.org/licenses/>. 23 */ 24 25 #include "qemu/osdep.h" 26 #include "block/throttle-groups.h" 27 #include "qemu/queue.h" 28 #include "qemu/thread.h" 29 #include "sysemu/qtest.h" 30 31 /* The ThrottleGroup structure (with its ThrottleState) is shared 32 * among different BlockDriverState and it's independent from 33 * AioContext, so in order to use it from different threads it needs 34 * its own locking. 35 * 36 * This locking is however handled internally in this file, so it's 37 * transparent to outside users. 38 * 39 * The whole ThrottleGroup structure is private and invisible to 40 * outside users, that only use it through its ThrottleState. 41 * 42 * In addition to the ThrottleGroup structure, BlockDriverState has 43 * fields that need to be accessed by other members of the group and 44 * therefore also need to be protected by this lock. Once a BDS is 45 * registered in a group those fields can be accessed by other threads 46 * any time. 47 * 48 * Again, all this is handled internally and is mostly transparent to 49 * the outside. The 'throttle_timers' field however has an additional 50 * constraint because it may be temporarily invalid (see for example 51 * bdrv_set_aio_context()). Therefore in this file a thread will 52 * access some other BDS's timers only after verifying that that BDS 53 * has throttled requests in the queue. 54 */ 55 typedef struct ThrottleGroup { 56 char *name; /* This is constant during the lifetime of the group */ 57 58 QemuMutex lock; /* This lock protects the following four fields */ 59 ThrottleState ts; 60 QLIST_HEAD(, BlockDriverState) head; 61 BlockDriverState *tokens[2]; 62 bool any_timer_armed[2]; 63 64 /* These two are protected by the global throttle_groups_lock */ 65 unsigned refcount; 66 QTAILQ_ENTRY(ThrottleGroup) list; 67 } ThrottleGroup; 68 69 static QemuMutex throttle_groups_lock; 70 static QTAILQ_HEAD(, ThrottleGroup) throttle_groups = 71 QTAILQ_HEAD_INITIALIZER(throttle_groups); 72 73 /* Increments the reference count of a ThrottleGroup given its name. 74 * 75 * If no ThrottleGroup is found with the given name a new one is 76 * created. 77 * 78 * @name: the name of the ThrottleGroup 79 * @ret: the ThrottleState member of the ThrottleGroup 80 */ 81 ThrottleState *throttle_group_incref(const char *name) 82 { 83 ThrottleGroup *tg = NULL; 84 ThrottleGroup *iter; 85 86 qemu_mutex_lock(&throttle_groups_lock); 87 88 /* Look for an existing group with that name */ 89 QTAILQ_FOREACH(iter, &throttle_groups, list) { 90 if (!strcmp(name, iter->name)) { 91 tg = iter; 92 break; 93 } 94 } 95 96 /* Create a new one if not found */ 97 if (!tg) { 98 tg = g_new0(ThrottleGroup, 1); 99 tg->name = g_strdup(name); 100 qemu_mutex_init(&tg->lock); 101 throttle_init(&tg->ts); 102 QLIST_INIT(&tg->head); 103 104 QTAILQ_INSERT_TAIL(&throttle_groups, tg, list); 105 } 106 107 tg->refcount++; 108 109 qemu_mutex_unlock(&throttle_groups_lock); 110 111 return &tg->ts; 112 } 113 114 /* Decrease the reference count of a ThrottleGroup. 115 * 116 * When the reference count reaches zero the ThrottleGroup is 117 * destroyed. 118 * 119 * @ts: The ThrottleGroup to unref, given by its ThrottleState member 120 */ 121 void throttle_group_unref(ThrottleState *ts) 122 { 123 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 124 125 qemu_mutex_lock(&throttle_groups_lock); 126 if (--tg->refcount == 0) { 127 QTAILQ_REMOVE(&throttle_groups, tg, list); 128 qemu_mutex_destroy(&tg->lock); 129 g_free(tg->name); 130 g_free(tg); 131 } 132 qemu_mutex_unlock(&throttle_groups_lock); 133 } 134 135 /* Get the name from a BlockDriverState's ThrottleGroup. The name (and 136 * the pointer) is guaranteed to remain constant during the lifetime 137 * of the group. 138 * 139 * @bs: a BlockDriverState that is member of a throttling group 140 * @ret: the name of the group. 141 */ 142 const char *throttle_group_get_name(BlockDriverState *bs) 143 { 144 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 145 return tg->name; 146 } 147 148 /* Return the next BlockDriverState in the round-robin sequence, 149 * simulating a circular list. 150 * 151 * This assumes that tg->lock is held. 152 * 153 * @bs: the current BlockDriverState 154 * @ret: the next BlockDriverState in the sequence 155 */ 156 static BlockDriverState *throttle_group_next_bs(BlockDriverState *bs) 157 { 158 ThrottleState *ts = bs->throttle_state; 159 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 160 BlockDriverState *next = QLIST_NEXT(bs, round_robin); 161 162 if (!next) { 163 return QLIST_FIRST(&tg->head); 164 } 165 166 return next; 167 } 168 169 /* Return the next BlockDriverState in the round-robin sequence with 170 * pending I/O requests. 171 * 172 * This assumes that tg->lock is held. 173 * 174 * @bs: the current BlockDriverState 175 * @is_write: the type of operation (read/write) 176 * @ret: the next BlockDriverState with pending requests, or bs 177 * if there is none. 178 */ 179 static BlockDriverState *next_throttle_token(BlockDriverState *bs, 180 bool is_write) 181 { 182 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 183 BlockDriverState *token, *start; 184 185 start = token = tg->tokens[is_write]; 186 187 /* get next bs round in round robin style */ 188 token = throttle_group_next_bs(token); 189 while (token != start && !token->pending_reqs[is_write]) { 190 token = throttle_group_next_bs(token); 191 } 192 193 /* If no IO are queued for scheduling on the next round robin token 194 * then decide the token is the current bs because chances are 195 * the current bs get the current request queued. 196 */ 197 if (token == start && !token->pending_reqs[is_write]) { 198 token = bs; 199 } 200 201 return token; 202 } 203 204 /* Check if the next I/O request for a BlockDriverState needs to be 205 * throttled or not. If there's no timer set in this group, set one 206 * and update the token accordingly. 207 * 208 * This assumes that tg->lock is held. 209 * 210 * @bs: the current BlockDriverState 211 * @is_write: the type of operation (read/write) 212 * @ret: whether the I/O request needs to be throttled or not 213 */ 214 static bool throttle_group_schedule_timer(BlockDriverState *bs, 215 bool is_write) 216 { 217 ThrottleState *ts = bs->throttle_state; 218 ThrottleTimers *tt = &bs->throttle_timers; 219 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 220 bool must_wait; 221 222 /* Check if any of the timers in this group is already armed */ 223 if (tg->any_timer_armed[is_write]) { 224 return true; 225 } 226 227 must_wait = throttle_schedule_timer(ts, tt, is_write); 228 229 /* If a timer just got armed, set bs as the current token */ 230 if (must_wait) { 231 tg->tokens[is_write] = bs; 232 tg->any_timer_armed[is_write] = true; 233 } 234 235 return must_wait; 236 } 237 238 /* Look for the next pending I/O request and schedule it. 239 * 240 * This assumes that tg->lock is held. 241 * 242 * @bs: the current BlockDriverState 243 * @is_write: the type of operation (read/write) 244 */ 245 static void schedule_next_request(BlockDriverState *bs, bool is_write) 246 { 247 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 248 bool must_wait; 249 BlockDriverState *token; 250 251 /* Check if there's any pending request to schedule next */ 252 token = next_throttle_token(bs, is_write); 253 if (!token->pending_reqs[is_write]) { 254 return; 255 } 256 257 /* Set a timer for the request if it needs to be throttled */ 258 must_wait = throttle_group_schedule_timer(token, is_write); 259 260 /* If it doesn't have to wait, queue it for immediate execution */ 261 if (!must_wait) { 262 /* Give preference to requests from the current bs */ 263 if (qemu_in_coroutine() && 264 qemu_co_queue_next(&bs->throttled_reqs[is_write])) { 265 token = bs; 266 } else { 267 ThrottleTimers *tt = &token->throttle_timers; 268 int64_t now = qemu_clock_get_ns(tt->clock_type); 269 timer_mod(tt->timers[is_write], now + 1); 270 tg->any_timer_armed[is_write] = true; 271 } 272 tg->tokens[is_write] = token; 273 } 274 } 275 276 /* Check if an I/O request needs to be throttled, wait and set a timer 277 * if necessary, and schedule the next request using a round robin 278 * algorithm. 279 * 280 * @bs: the current BlockDriverState 281 * @bytes: the number of bytes for this I/O 282 * @is_write: the type of operation (read/write) 283 */ 284 void coroutine_fn throttle_group_co_io_limits_intercept(BlockDriverState *bs, 285 unsigned int bytes, 286 bool is_write) 287 { 288 bool must_wait; 289 BlockDriverState *token; 290 291 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 292 qemu_mutex_lock(&tg->lock); 293 294 /* First we check if this I/O has to be throttled. */ 295 token = next_throttle_token(bs, is_write); 296 must_wait = throttle_group_schedule_timer(token, is_write); 297 298 /* Wait if there's a timer set or queued requests of this type */ 299 if (must_wait || bs->pending_reqs[is_write]) { 300 bs->pending_reqs[is_write]++; 301 qemu_mutex_unlock(&tg->lock); 302 qemu_co_queue_wait(&bs->throttled_reqs[is_write]); 303 qemu_mutex_lock(&tg->lock); 304 bs->pending_reqs[is_write]--; 305 } 306 307 /* The I/O will be executed, so do the accounting */ 308 throttle_account(bs->throttle_state, is_write, bytes); 309 310 /* Schedule the next request */ 311 schedule_next_request(bs, is_write); 312 313 qemu_mutex_unlock(&tg->lock); 314 } 315 316 /* Update the throttle configuration for a particular group. Similar 317 * to throttle_config(), but guarantees atomicity within the 318 * throttling group. 319 * 320 * @bs: a BlockDriverState that is member of the group 321 * @cfg: the configuration to set 322 */ 323 void throttle_group_config(BlockDriverState *bs, ThrottleConfig *cfg) 324 { 325 ThrottleTimers *tt = &bs->throttle_timers; 326 ThrottleState *ts = bs->throttle_state; 327 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 328 qemu_mutex_lock(&tg->lock); 329 /* throttle_config() cancels the timers */ 330 if (timer_pending(tt->timers[0])) { 331 tg->any_timer_armed[0] = false; 332 } 333 if (timer_pending(tt->timers[1])) { 334 tg->any_timer_armed[1] = false; 335 } 336 throttle_config(ts, tt, cfg); 337 qemu_mutex_unlock(&tg->lock); 338 } 339 340 /* Get the throttle configuration from a particular group. Similar to 341 * throttle_get_config(), but guarantees atomicity within the 342 * throttling group. 343 * 344 * @bs: a BlockDriverState that is member of the group 345 * @cfg: the configuration will be written here 346 */ 347 void throttle_group_get_config(BlockDriverState *bs, ThrottleConfig *cfg) 348 { 349 ThrottleState *ts = bs->throttle_state; 350 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 351 qemu_mutex_lock(&tg->lock); 352 throttle_get_config(ts, cfg); 353 qemu_mutex_unlock(&tg->lock); 354 } 355 356 /* ThrottleTimers callback. This wakes up a request that was waiting 357 * because it had been throttled. 358 * 359 * @bs: the BlockDriverState whose request had been throttled 360 * @is_write: the type of operation (read/write) 361 */ 362 static void timer_cb(BlockDriverState *bs, bool is_write) 363 { 364 ThrottleState *ts = bs->throttle_state; 365 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 366 bool empty_queue; 367 368 /* The timer has just been fired, so we can update the flag */ 369 qemu_mutex_lock(&tg->lock); 370 tg->any_timer_armed[is_write] = false; 371 qemu_mutex_unlock(&tg->lock); 372 373 /* Run the request that was waiting for this timer */ 374 empty_queue = !qemu_co_enter_next(&bs->throttled_reqs[is_write]); 375 376 /* If the request queue was empty then we have to take care of 377 * scheduling the next one */ 378 if (empty_queue) { 379 qemu_mutex_lock(&tg->lock); 380 schedule_next_request(bs, is_write); 381 qemu_mutex_unlock(&tg->lock); 382 } 383 } 384 385 static void read_timer_cb(void *opaque) 386 { 387 timer_cb(opaque, false); 388 } 389 390 static void write_timer_cb(void *opaque) 391 { 392 timer_cb(opaque, true); 393 } 394 395 /* Register a BlockDriverState in the throttling group, also 396 * initializing its timers and updating its throttle_state pointer to 397 * point to it. If a throttling group with that name does not exist 398 * yet, it will be created. 399 * 400 * @bs: the BlockDriverState to insert 401 * @groupname: the name of the group 402 */ 403 void throttle_group_register_bs(BlockDriverState *bs, const char *groupname) 404 { 405 int i; 406 ThrottleState *ts = throttle_group_incref(groupname); 407 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 408 int clock_type = QEMU_CLOCK_REALTIME; 409 410 if (qtest_enabled()) { 411 /* For testing block IO throttling only */ 412 clock_type = QEMU_CLOCK_VIRTUAL; 413 } 414 415 bs->throttle_state = ts; 416 417 qemu_mutex_lock(&tg->lock); 418 /* If the ThrottleGroup is new set this BlockDriverState as the token */ 419 for (i = 0; i < 2; i++) { 420 if (!tg->tokens[i]) { 421 tg->tokens[i] = bs; 422 } 423 } 424 425 QLIST_INSERT_HEAD(&tg->head, bs, round_robin); 426 427 throttle_timers_init(&bs->throttle_timers, 428 bdrv_get_aio_context(bs), 429 clock_type, 430 read_timer_cb, 431 write_timer_cb, 432 bs); 433 434 qemu_mutex_unlock(&tg->lock); 435 } 436 437 /* Unregister a BlockDriverState from its group, removing it from the 438 * list, destroying the timers and setting the throttle_state pointer 439 * to NULL. 440 * 441 * The BlockDriverState must not have pending throttled requests, so 442 * the caller has to drain them first. 443 * 444 * The group will be destroyed if it's empty after this operation. 445 * 446 * @bs: the BlockDriverState to remove 447 */ 448 void throttle_group_unregister_bs(BlockDriverState *bs) 449 { 450 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 451 int i; 452 453 assert(bs->pending_reqs[0] == 0 && bs->pending_reqs[1] == 0); 454 assert(qemu_co_queue_empty(&bs->throttled_reqs[0])); 455 assert(qemu_co_queue_empty(&bs->throttled_reqs[1])); 456 457 qemu_mutex_lock(&tg->lock); 458 for (i = 0; i < 2; i++) { 459 if (tg->tokens[i] == bs) { 460 BlockDriverState *token = throttle_group_next_bs(bs); 461 /* Take care of the case where this is the last bs in the group */ 462 if (token == bs) { 463 token = NULL; 464 } 465 tg->tokens[i] = token; 466 } 467 } 468 469 /* remove the current bs from the list */ 470 QLIST_REMOVE(bs, round_robin); 471 throttle_timers_destroy(&bs->throttle_timers); 472 qemu_mutex_unlock(&tg->lock); 473 474 throttle_group_unref(&tg->ts); 475 bs->throttle_state = NULL; 476 } 477 478 static void throttle_groups_init(void) 479 { 480 qemu_mutex_init(&throttle_groups_lock); 481 } 482 483 block_init(throttle_groups_init); 484