1 /* 2 * QEMU block throttling group infrastructure 3 * 4 * Copyright (C) Nodalink, EURL. 2014 5 * Copyright (C) Igalia, S.L. 2015 6 * 7 * Authors: 8 * Benoît Canet <benoit.canet@nodalink.com> 9 * Alberto Garcia <berto@igalia.com> 10 * 11 * This program is free software; you can redistribute it and/or 12 * modify it under the terms of the GNU General Public License as 13 * published by the Free Software Foundation; either version 2 or 14 * (at your option) version 3 of the License. 15 * 16 * This program is distributed in the hope that it will be useful, 17 * but WITHOUT ANY WARRANTY; without even the implied warranty of 18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 19 * GNU General Public License for more details. 20 * 21 * You should have received a copy of the GNU General Public License 22 * along with this program; if not, see <http://www.gnu.org/licenses/>. 23 */ 24 25 #include "block/throttle-groups.h" 26 #include "qemu/queue.h" 27 #include "qemu/thread.h" 28 #include "sysemu/qtest.h" 29 30 /* The ThrottleGroup structure (with its ThrottleState) is shared 31 * among different BlockDriverState and it's independent from 32 * AioContext, so in order to use it from different threads it needs 33 * its own locking. 34 * 35 * This locking is however handled internally in this file, so it's 36 * transparent to outside users. 37 * 38 * The whole ThrottleGroup structure is private and invisible to 39 * outside users, that only use it through its ThrottleState. 40 * 41 * In addition to the ThrottleGroup structure, BlockDriverState has 42 * fields that need to be accessed by other members of the group and 43 * therefore also need to be protected by this lock. Once a BDS is 44 * registered in a group those fields can be accessed by other threads 45 * any time. 46 * 47 * Again, all this is handled internally and is mostly transparent to 48 * the outside. The 'throttle_timers' field however has an additional 49 * constraint because it may be temporarily invalid (see for example 50 * bdrv_set_aio_context()). Therefore in this file a thread will 51 * access some other BDS's timers only after verifying that that BDS 52 * has throttled requests in the queue. 53 */ 54 typedef struct ThrottleGroup { 55 char *name; /* This is constant during the lifetime of the group */ 56 57 QemuMutex lock; /* This lock protects the following four fields */ 58 ThrottleState ts; 59 QLIST_HEAD(, BlockDriverState) head; 60 BlockDriverState *tokens[2]; 61 bool any_timer_armed[2]; 62 63 /* These two are protected by the global throttle_groups_lock */ 64 unsigned refcount; 65 QTAILQ_ENTRY(ThrottleGroup) list; 66 } ThrottleGroup; 67 68 static QemuMutex throttle_groups_lock; 69 static QTAILQ_HEAD(, ThrottleGroup) throttle_groups = 70 QTAILQ_HEAD_INITIALIZER(throttle_groups); 71 72 /* Increments the reference count of a ThrottleGroup given its name. 73 * 74 * If no ThrottleGroup is found with the given name a new one is 75 * created. 76 * 77 * @name: the name of the ThrottleGroup 78 * @ret: the ThrottleState member of the ThrottleGroup 79 */ 80 ThrottleState *throttle_group_incref(const char *name) 81 { 82 ThrottleGroup *tg = NULL; 83 ThrottleGroup *iter; 84 85 qemu_mutex_lock(&throttle_groups_lock); 86 87 /* Look for an existing group with that name */ 88 QTAILQ_FOREACH(iter, &throttle_groups, list) { 89 if (!strcmp(name, iter->name)) { 90 tg = iter; 91 break; 92 } 93 } 94 95 /* Create a new one if not found */ 96 if (!tg) { 97 tg = g_new0(ThrottleGroup, 1); 98 tg->name = g_strdup(name); 99 qemu_mutex_init(&tg->lock); 100 throttle_init(&tg->ts); 101 QLIST_INIT(&tg->head); 102 103 QTAILQ_INSERT_TAIL(&throttle_groups, tg, list); 104 } 105 106 tg->refcount++; 107 108 qemu_mutex_unlock(&throttle_groups_lock); 109 110 return &tg->ts; 111 } 112 113 /* Decrease the reference count of a ThrottleGroup. 114 * 115 * When the reference count reaches zero the ThrottleGroup is 116 * destroyed. 117 * 118 * @ts: The ThrottleGroup to unref, given by its ThrottleState member 119 */ 120 void throttle_group_unref(ThrottleState *ts) 121 { 122 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 123 124 qemu_mutex_lock(&throttle_groups_lock); 125 if (--tg->refcount == 0) { 126 QTAILQ_REMOVE(&throttle_groups, tg, list); 127 qemu_mutex_destroy(&tg->lock); 128 g_free(tg->name); 129 g_free(tg); 130 } 131 qemu_mutex_unlock(&throttle_groups_lock); 132 } 133 134 /* Get the name from a BlockDriverState's ThrottleGroup. The name (and 135 * the pointer) is guaranteed to remain constant during the lifetime 136 * of the group. 137 * 138 * @bs: a BlockDriverState that is member of a throttling group 139 * @ret: the name of the group. 140 */ 141 const char *throttle_group_get_name(BlockDriverState *bs) 142 { 143 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 144 return tg->name; 145 } 146 147 /* Return the next BlockDriverState in the round-robin sequence, 148 * simulating a circular list. 149 * 150 * This assumes that tg->lock is held. 151 * 152 * @bs: the current BlockDriverState 153 * @ret: the next BlockDriverState in the sequence 154 */ 155 static BlockDriverState *throttle_group_next_bs(BlockDriverState *bs) 156 { 157 ThrottleState *ts = bs->throttle_state; 158 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 159 BlockDriverState *next = QLIST_NEXT(bs, round_robin); 160 161 if (!next) { 162 return QLIST_FIRST(&tg->head); 163 } 164 165 return next; 166 } 167 168 /* Return the next BlockDriverState in the round-robin sequence with 169 * pending I/O requests. 170 * 171 * This assumes that tg->lock is held. 172 * 173 * @bs: the current BlockDriverState 174 * @is_write: the type of operation (read/write) 175 * @ret: the next BlockDriverState with pending requests, or bs 176 * if there is none. 177 */ 178 static BlockDriverState *next_throttle_token(BlockDriverState *bs, 179 bool is_write) 180 { 181 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 182 BlockDriverState *token, *start; 183 184 start = token = tg->tokens[is_write]; 185 186 /* get next bs round in round robin style */ 187 token = throttle_group_next_bs(token); 188 while (token != start && !token->pending_reqs[is_write]) { 189 token = throttle_group_next_bs(token); 190 } 191 192 /* If no IO are queued for scheduling on the next round robin token 193 * then decide the token is the current bs because chances are 194 * the current bs get the current request queued. 195 */ 196 if (token == start && !token->pending_reqs[is_write]) { 197 token = bs; 198 } 199 200 return token; 201 } 202 203 /* Check if the next I/O request for a BlockDriverState needs to be 204 * throttled or not. If there's no timer set in this group, set one 205 * and update the token accordingly. 206 * 207 * This assumes that tg->lock is held. 208 * 209 * @bs: the current BlockDriverState 210 * @is_write: the type of operation (read/write) 211 * @ret: whether the I/O request needs to be throttled or not 212 */ 213 static bool throttle_group_schedule_timer(BlockDriverState *bs, 214 bool is_write) 215 { 216 ThrottleState *ts = bs->throttle_state; 217 ThrottleTimers *tt = &bs->throttle_timers; 218 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 219 bool must_wait; 220 221 /* Check if any of the timers in this group is already armed */ 222 if (tg->any_timer_armed[is_write]) { 223 return true; 224 } 225 226 must_wait = throttle_schedule_timer(ts, tt, is_write); 227 228 /* If a timer just got armed, set bs as the current token */ 229 if (must_wait) { 230 tg->tokens[is_write] = bs; 231 tg->any_timer_armed[is_write] = true; 232 } 233 234 return must_wait; 235 } 236 237 /* Look for the next pending I/O request and schedule it. 238 * 239 * This assumes that tg->lock is held. 240 * 241 * @bs: the current BlockDriverState 242 * @is_write: the type of operation (read/write) 243 */ 244 static void schedule_next_request(BlockDriverState *bs, bool is_write) 245 { 246 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 247 bool must_wait; 248 BlockDriverState *token; 249 250 /* Check if there's any pending request to schedule next */ 251 token = next_throttle_token(bs, is_write); 252 if (!token->pending_reqs[is_write]) { 253 return; 254 } 255 256 /* Set a timer for the request if it needs to be throttled */ 257 must_wait = throttle_group_schedule_timer(token, is_write); 258 259 /* If it doesn't have to wait, queue it for immediate execution */ 260 if (!must_wait) { 261 /* Give preference to requests from the current bs */ 262 if (qemu_in_coroutine() && 263 qemu_co_queue_next(&bs->throttled_reqs[is_write])) { 264 token = bs; 265 } else { 266 ThrottleTimers *tt = &token->throttle_timers; 267 int64_t now = qemu_clock_get_ns(tt->clock_type); 268 timer_mod(tt->timers[is_write], now + 1); 269 tg->any_timer_armed[is_write] = true; 270 } 271 tg->tokens[is_write] = token; 272 } 273 } 274 275 /* Check if an I/O request needs to be throttled, wait and set a timer 276 * if necessary, and schedule the next request using a round robin 277 * algorithm. 278 * 279 * @bs: the current BlockDriverState 280 * @bytes: the number of bytes for this I/O 281 * @is_write: the type of operation (read/write) 282 */ 283 void coroutine_fn throttle_group_co_io_limits_intercept(BlockDriverState *bs, 284 unsigned int bytes, 285 bool is_write) 286 { 287 bool must_wait; 288 BlockDriverState *token; 289 290 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 291 qemu_mutex_lock(&tg->lock); 292 293 /* First we check if this I/O has to be throttled. */ 294 token = next_throttle_token(bs, is_write); 295 must_wait = throttle_group_schedule_timer(token, is_write); 296 297 /* Wait if there's a timer set or queued requests of this type */ 298 if (must_wait || bs->pending_reqs[is_write]) { 299 bs->pending_reqs[is_write]++; 300 qemu_mutex_unlock(&tg->lock); 301 qemu_co_queue_wait(&bs->throttled_reqs[is_write]); 302 qemu_mutex_lock(&tg->lock); 303 bs->pending_reqs[is_write]--; 304 } 305 306 /* The I/O will be executed, so do the accounting */ 307 throttle_account(bs->throttle_state, is_write, bytes); 308 309 /* Schedule the next request */ 310 schedule_next_request(bs, is_write); 311 312 qemu_mutex_unlock(&tg->lock); 313 } 314 315 /* Update the throttle configuration for a particular group. Similar 316 * to throttle_config(), but guarantees atomicity within the 317 * throttling group. 318 * 319 * @bs: a BlockDriverState that is member of the group 320 * @cfg: the configuration to set 321 */ 322 void throttle_group_config(BlockDriverState *bs, ThrottleConfig *cfg) 323 { 324 ThrottleTimers *tt = &bs->throttle_timers; 325 ThrottleState *ts = bs->throttle_state; 326 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 327 qemu_mutex_lock(&tg->lock); 328 /* throttle_config() cancels the timers */ 329 if (timer_pending(tt->timers[0])) { 330 tg->any_timer_armed[0] = false; 331 } 332 if (timer_pending(tt->timers[1])) { 333 tg->any_timer_armed[1] = false; 334 } 335 throttle_config(ts, tt, cfg); 336 qemu_mutex_unlock(&tg->lock); 337 } 338 339 /* Get the throttle configuration from a particular group. Similar to 340 * throttle_get_config(), but guarantees atomicity within the 341 * throttling group. 342 * 343 * @bs: a BlockDriverState that is member of the group 344 * @cfg: the configuration will be written here 345 */ 346 void throttle_group_get_config(BlockDriverState *bs, ThrottleConfig *cfg) 347 { 348 ThrottleState *ts = bs->throttle_state; 349 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 350 qemu_mutex_lock(&tg->lock); 351 throttle_get_config(ts, cfg); 352 qemu_mutex_unlock(&tg->lock); 353 } 354 355 /* ThrottleTimers callback. This wakes up a request that was waiting 356 * because it had been throttled. 357 * 358 * @bs: the BlockDriverState whose request had been throttled 359 * @is_write: the type of operation (read/write) 360 */ 361 static void timer_cb(BlockDriverState *bs, bool is_write) 362 { 363 ThrottleState *ts = bs->throttle_state; 364 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 365 bool empty_queue; 366 367 /* The timer has just been fired, so we can update the flag */ 368 qemu_mutex_lock(&tg->lock); 369 tg->any_timer_armed[is_write] = false; 370 qemu_mutex_unlock(&tg->lock); 371 372 /* Run the request that was waiting for this timer */ 373 empty_queue = !qemu_co_enter_next(&bs->throttled_reqs[is_write]); 374 375 /* If the request queue was empty then we have to take care of 376 * scheduling the next one */ 377 if (empty_queue) { 378 qemu_mutex_lock(&tg->lock); 379 schedule_next_request(bs, is_write); 380 qemu_mutex_unlock(&tg->lock); 381 } 382 } 383 384 static void read_timer_cb(void *opaque) 385 { 386 timer_cb(opaque, false); 387 } 388 389 static void write_timer_cb(void *opaque) 390 { 391 timer_cb(opaque, true); 392 } 393 394 /* Register a BlockDriverState in the throttling group, also 395 * initializing its timers and updating its throttle_state pointer to 396 * point to it. If a throttling group with that name does not exist 397 * yet, it will be created. 398 * 399 * @bs: the BlockDriverState to insert 400 * @groupname: the name of the group 401 */ 402 void throttle_group_register_bs(BlockDriverState *bs, const char *groupname) 403 { 404 int i; 405 ThrottleState *ts = throttle_group_incref(groupname); 406 ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts); 407 int clock_type = QEMU_CLOCK_REALTIME; 408 409 if (qtest_enabled()) { 410 /* For testing block IO throttling only */ 411 clock_type = QEMU_CLOCK_VIRTUAL; 412 } 413 414 bs->throttle_state = ts; 415 416 qemu_mutex_lock(&tg->lock); 417 /* If the ThrottleGroup is new set this BlockDriverState as the token */ 418 for (i = 0; i < 2; i++) { 419 if (!tg->tokens[i]) { 420 tg->tokens[i] = bs; 421 } 422 } 423 424 QLIST_INSERT_HEAD(&tg->head, bs, round_robin); 425 426 throttle_timers_init(&bs->throttle_timers, 427 bdrv_get_aio_context(bs), 428 clock_type, 429 read_timer_cb, 430 write_timer_cb, 431 bs); 432 433 qemu_mutex_unlock(&tg->lock); 434 } 435 436 /* Unregister a BlockDriverState from its group, removing it from the 437 * list, destroying the timers and setting the throttle_state pointer 438 * to NULL. 439 * 440 * The group will be destroyed if it's empty after this operation. 441 * 442 * @bs: the BlockDriverState to remove 443 */ 444 void throttle_group_unregister_bs(BlockDriverState *bs) 445 { 446 ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts); 447 int i; 448 449 qemu_mutex_lock(&tg->lock); 450 for (i = 0; i < 2; i++) { 451 if (tg->tokens[i] == bs) { 452 BlockDriverState *token = throttle_group_next_bs(bs); 453 /* Take care of the case where this is the last bs in the group */ 454 if (token == bs) { 455 token = NULL; 456 } 457 tg->tokens[i] = token; 458 } 459 } 460 461 /* remove the current bs from the list */ 462 QLIST_REMOVE(bs, round_robin); 463 throttle_timers_destroy(&bs->throttle_timers); 464 qemu_mutex_unlock(&tg->lock); 465 466 throttle_group_unref(&tg->ts); 467 bs->throttle_state = NULL; 468 } 469 470 static void throttle_groups_init(void) 471 { 472 qemu_mutex_init(&throttle_groups_lock); 473 } 474 475 block_init(throttle_groups_init); 476