xref: /openbmc/qemu/block/throttle-groups.c (revision e2dbca03)
1 /*
2  * QEMU block throttling group infrastructure
3  *
4  * Copyright (C) Nodalink, EURL. 2014
5  * Copyright (C) Igalia, S.L. 2015
6  *
7  * Authors:
8  *   Benoît Canet <benoit.canet@nodalink.com>
9  *   Alberto Garcia <berto@igalia.com>
10  *
11  * This program is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU General Public License as
13  * published by the Free Software Foundation; either version 2 or
14  * (at your option) version 3 of the License.
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with this program; if not, see <http://www.gnu.org/licenses/>.
23  */
24 
25 #include "qemu/osdep.h"
26 #include "sysemu/block-backend.h"
27 #include "block/throttle-groups.h"
28 #include "qemu/throttle-options.h"
29 #include "qemu/main-loop.h"
30 #include "qemu/queue.h"
31 #include "qemu/thread.h"
32 #include "sysemu/qtest.h"
33 #include "qapi/error.h"
34 #include "qapi/qapi-visit-block-core.h"
35 #include "qom/object.h"
36 #include "qom/object_interfaces.h"
37 
38 static void throttle_group_obj_init(Object *obj);
39 static void throttle_group_obj_complete(UserCreatable *obj, Error **errp);
40 static void timer_cb(ThrottleGroupMember *tgm, ThrottleDirection direction);
41 
42 /* The ThrottleGroup structure (with its ThrottleState) is shared
43  * among different ThrottleGroupMembers and it's independent from
44  * AioContext, so in order to use it from different threads it needs
45  * its own locking.
46  *
47  * This locking is however handled internally in this file, so it's
48  * transparent to outside users.
49  *
50  * The whole ThrottleGroup structure is private and invisible to
51  * outside users, that only use it through its ThrottleState.
52  *
53  * In addition to the ThrottleGroup structure, ThrottleGroupMember has
54  * fields that need to be accessed by other members of the group and
55  * therefore also need to be protected by this lock. Once a
56  * ThrottleGroupMember is registered in a group those fields can be accessed
57  * by other threads any time.
58  *
59  * Again, all this is handled internally and is mostly transparent to
60  * the outside. The 'throttle_timers' field however has an additional
61  * constraint because it may be temporarily invalid (see for example
62  * blk_set_aio_context()). Therefore in this file a thread will
63  * access some other ThrottleGroupMember's timers only after verifying that
64  * that ThrottleGroupMember has throttled requests in the queue.
65  */
66 struct ThrottleGroup {
67     Object parent_obj;
68 
69     /* refuse individual property change if initialization is complete */
70     bool is_initialized;
71     char *name; /* This is constant during the lifetime of the group */
72 
73     QemuMutex lock; /* This lock protects the following four fields */
74     ThrottleState ts;
75     QLIST_HEAD(, ThrottleGroupMember) head;
76     ThrottleGroupMember *tokens[THROTTLE_MAX];
77     bool any_timer_armed[THROTTLE_MAX];
78     QEMUClockType clock_type;
79 
80     /* This field is protected by the global QEMU mutex */
81     QTAILQ_ENTRY(ThrottleGroup) list;
82 };
83 
84 /* This is protected by the global QEMU mutex */
85 static QTAILQ_HEAD(, ThrottleGroup) throttle_groups =
86     QTAILQ_HEAD_INITIALIZER(throttle_groups);
87 
88 
89 /* This function reads throttle_groups and must be called under the global
90  * mutex.
91  */
throttle_group_by_name(const char * name)92 static ThrottleGroup *throttle_group_by_name(const char *name)
93 {
94     ThrottleGroup *iter;
95 
96     /* Look for an existing group with that name */
97     QTAILQ_FOREACH(iter, &throttle_groups, list) {
98         if (!g_strcmp0(name, iter->name)) {
99             return iter;
100         }
101     }
102 
103     return NULL;
104 }
105 
106 /* This function reads throttle_groups and must be called under the global
107  * mutex.
108  */
throttle_group_exists(const char * name)109 bool throttle_group_exists(const char *name)
110 {
111     return throttle_group_by_name(name) != NULL;
112 }
113 
114 /* Increments the reference count of a ThrottleGroup given its name.
115  *
116  * If no ThrottleGroup is found with the given name a new one is
117  * created.
118  *
119  * This function edits throttle_groups and must be called under the global
120  * mutex.
121  *
122  * @name: the name of the ThrottleGroup
123  * @ret:  the ThrottleState member of the ThrottleGroup
124  */
throttle_group_incref(const char * name)125 ThrottleState *throttle_group_incref(const char *name)
126 {
127     ThrottleGroup *tg = NULL;
128 
129     /* Look for an existing group with that name */
130     tg = throttle_group_by_name(name);
131 
132     if (tg) {
133         object_ref(OBJECT(tg));
134     } else {
135         /* Create a new one if not found */
136         /* new ThrottleGroup obj will have a refcnt = 1 */
137         tg = THROTTLE_GROUP(object_new(TYPE_THROTTLE_GROUP));
138         tg->name = g_strdup(name);
139         throttle_group_obj_complete(USER_CREATABLE(tg), &error_abort);
140     }
141 
142     return &tg->ts;
143 }
144 
145 /* Decrease the reference count of a ThrottleGroup.
146  *
147  * When the reference count reaches zero the ThrottleGroup is
148  * destroyed.
149  *
150  * This function edits throttle_groups and must be called under the global
151  * mutex.
152  *
153  * @ts:  The ThrottleGroup to unref, given by its ThrottleState member
154  */
throttle_group_unref(ThrottleState * ts)155 void throttle_group_unref(ThrottleState *ts)
156 {
157     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
158     object_unref(OBJECT(tg));
159 }
160 
161 /* Get the name from a ThrottleGroupMember's group. The name (and the pointer)
162  * is guaranteed to remain constant during the lifetime of the group.
163  *
164  * @tgm:  a ThrottleGroupMember
165  * @ret:  the name of the group.
166  */
throttle_group_get_name(ThrottleGroupMember * tgm)167 const char *throttle_group_get_name(ThrottleGroupMember *tgm)
168 {
169     ThrottleGroup *tg = container_of(tgm->throttle_state, ThrottleGroup, ts);
170     return tg->name;
171 }
172 
173 /* Return the next ThrottleGroupMember in the round-robin sequence, simulating
174  * a circular list.
175  *
176  * This assumes that tg->lock is held.
177  *
178  * @tgm: the current ThrottleGroupMember
179  * @ret: the next ThrottleGroupMember in the sequence
180  */
throttle_group_next_tgm(ThrottleGroupMember * tgm)181 static ThrottleGroupMember *throttle_group_next_tgm(ThrottleGroupMember *tgm)
182 {
183     ThrottleState *ts = tgm->throttle_state;
184     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
185     ThrottleGroupMember *next = QLIST_NEXT(tgm, round_robin);
186 
187     if (!next) {
188         next = QLIST_FIRST(&tg->head);
189     }
190 
191     return next;
192 }
193 
194 /*
195  * Return whether a ThrottleGroupMember has pending requests.
196  *
197  * This assumes that tg->lock is held.
198  *
199  * @tgm:        the ThrottleGroupMember
200  * @direction:  the ThrottleDirection
201  * @ret:        whether the ThrottleGroupMember has pending requests.
202  */
tgm_has_pending_reqs(ThrottleGroupMember * tgm,ThrottleDirection direction)203 static inline bool tgm_has_pending_reqs(ThrottleGroupMember *tgm,
204                                         ThrottleDirection direction)
205 {
206     return tgm->pending_reqs[direction];
207 }
208 
209 /* Return the next ThrottleGroupMember in the round-robin sequence with pending
210  * I/O requests.
211  *
212  * This assumes that tg->lock is held.
213  *
214  * @tgm:       the current ThrottleGroupMember
215  * @direction: the ThrottleDirection
216  * @ret:       the next ThrottleGroupMember with pending requests, or tgm if
217  *             there is none.
218  */
next_throttle_token(ThrottleGroupMember * tgm,ThrottleDirection direction)219 static ThrottleGroupMember *next_throttle_token(ThrottleGroupMember *tgm,
220                                                 ThrottleDirection direction)
221 {
222     ThrottleState *ts = tgm->throttle_state;
223     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
224     ThrottleGroupMember *token, *start;
225 
226     /* If this member has its I/O limits disabled then it means that
227      * it's being drained. Skip the round-robin search and return tgm
228      * immediately if it has pending requests. Otherwise we could be
229      * forcing it to wait for other member's throttled requests. */
230     if (tgm_has_pending_reqs(tgm, direction) &&
231         qatomic_read(&tgm->io_limits_disabled)) {
232         return tgm;
233     }
234 
235     start = token = tg->tokens[direction];
236 
237     /* get next bs round in round robin style */
238     token = throttle_group_next_tgm(token);
239     while (token != start && !tgm_has_pending_reqs(token, direction)) {
240         token = throttle_group_next_tgm(token);
241     }
242 
243     /* If no IO are queued for scheduling on the next round robin token
244      * then decide the token is the current tgm because chances are
245      * the current tgm got the current request queued.
246      */
247     if (token == start && !tgm_has_pending_reqs(token, direction)) {
248         token = tgm;
249     }
250 
251     /* Either we return the original TGM, or one with pending requests */
252     assert(token == tgm || tgm_has_pending_reqs(token, direction));
253 
254     return token;
255 }
256 
257 /* Check if the next I/O request for a ThrottleGroupMember needs to be
258  * throttled or not. If there's no timer set in this group, set one and update
259  * the token accordingly.
260  *
261  * This assumes that tg->lock is held.
262  *
263  * @tgm:        the current ThrottleGroupMember
264  * @direction:  the ThrottleDirection
265  * @ret:        whether the I/O request needs to be throttled or not
266  */
throttle_group_schedule_timer(ThrottleGroupMember * tgm,ThrottleDirection direction)267 static bool throttle_group_schedule_timer(ThrottleGroupMember *tgm,
268                                           ThrottleDirection direction)
269 {
270     ThrottleState *ts = tgm->throttle_state;
271     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
272     ThrottleTimers *tt = &tgm->throttle_timers;
273     bool must_wait;
274 
275     if (qatomic_read(&tgm->io_limits_disabled)) {
276         return false;
277     }
278 
279     /* Check if any of the timers in this group is already armed */
280     if (tg->any_timer_armed[direction]) {
281         return true;
282     }
283 
284     must_wait = throttle_schedule_timer(ts, tt, direction);
285 
286     /* If a timer just got armed, set tgm as the current token */
287     if (must_wait) {
288         tg->tokens[direction] = tgm;
289         tg->any_timer_armed[direction] = true;
290     }
291 
292     return must_wait;
293 }
294 
295 /* Start the next pending I/O request for a ThrottleGroupMember. Return whether
296  * any request was actually pending.
297  *
298  * @tgm:       the current ThrottleGroupMember
299  * @direction: the ThrottleDirection
300  */
throttle_group_co_restart_queue(ThrottleGroupMember * tgm,ThrottleDirection direction)301 static bool coroutine_fn throttle_group_co_restart_queue(ThrottleGroupMember *tgm,
302                                                          ThrottleDirection direction)
303 {
304     bool ret;
305 
306     qemu_co_mutex_lock(&tgm->throttled_reqs_lock);
307     ret = qemu_co_queue_next(&tgm->throttled_reqs[direction]);
308     qemu_co_mutex_unlock(&tgm->throttled_reqs_lock);
309 
310     return ret;
311 }
312 
313 /* Look for the next pending I/O request and schedule it.
314  *
315  * This assumes that tg->lock is held.
316  *
317  * @tgm:       the current ThrottleGroupMember
318  * @direction: the ThrottleDirection
319  */
schedule_next_request(ThrottleGroupMember * tgm,ThrottleDirection direction)320 static void coroutine_mixed_fn schedule_next_request(ThrottleGroupMember *tgm,
321                                                      ThrottleDirection direction)
322 {
323     ThrottleState *ts = tgm->throttle_state;
324     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
325     bool must_wait;
326     ThrottleGroupMember *token;
327 
328     /* Check if there's any pending request to schedule next */
329     token = next_throttle_token(tgm, direction);
330     if (!tgm_has_pending_reqs(token, direction)) {
331         return;
332     }
333 
334     /* Set a timer for the request if it needs to be throttled */
335     must_wait = throttle_group_schedule_timer(token, direction);
336 
337     /* If it doesn't have to wait, queue it for immediate execution */
338     if (!must_wait) {
339         /* Give preference to requests from the current tgm */
340         if (qemu_in_coroutine() &&
341             throttle_group_co_restart_queue(tgm, direction)) {
342             token = tgm;
343         } else {
344             ThrottleTimers *tt = &token->throttle_timers;
345             int64_t now = qemu_clock_get_ns(tg->clock_type);
346             timer_mod(tt->timers[direction], now);
347             tg->any_timer_armed[direction] = true;
348         }
349         tg->tokens[direction] = token;
350     }
351 }
352 
353 /* Check if an I/O request needs to be throttled, wait and set a timer
354  * if necessary, and schedule the next request using a round robin
355  * algorithm.
356  *
357  * @tgm:       the current ThrottleGroupMember
358  * @bytes:     the number of bytes for this I/O
359  * @direction: the ThrottleDirection
360  */
throttle_group_co_io_limits_intercept(ThrottleGroupMember * tgm,int64_t bytes,ThrottleDirection direction)361 void coroutine_fn throttle_group_co_io_limits_intercept(ThrottleGroupMember *tgm,
362                                                         int64_t bytes,
363                                                         ThrottleDirection direction)
364 {
365     bool must_wait;
366     ThrottleGroupMember *token;
367     ThrottleGroup *tg = container_of(tgm->throttle_state, ThrottleGroup, ts);
368 
369     assert(bytes >= 0);
370     assert(direction < THROTTLE_MAX);
371 
372     qemu_mutex_lock(&tg->lock);
373 
374     /* First we check if this I/O has to be throttled. */
375     token = next_throttle_token(tgm, direction);
376     must_wait = throttle_group_schedule_timer(token, direction);
377 
378     /* Wait if there's a timer set or queued requests of this type */
379     if (must_wait || tgm->pending_reqs[direction]) {
380         tgm->pending_reqs[direction]++;
381         qemu_mutex_unlock(&tg->lock);
382         qemu_co_mutex_lock(&tgm->throttled_reqs_lock);
383         qemu_co_queue_wait(&tgm->throttled_reqs[direction],
384                            &tgm->throttled_reqs_lock);
385         qemu_co_mutex_unlock(&tgm->throttled_reqs_lock);
386         qemu_mutex_lock(&tg->lock);
387         tgm->pending_reqs[direction]--;
388     }
389 
390     /* The I/O will be executed, so do the accounting */
391     throttle_account(tgm->throttle_state, direction, bytes);
392 
393     /* Schedule the next request */
394     schedule_next_request(tgm, direction);
395 
396     qemu_mutex_unlock(&tg->lock);
397 }
398 
399 typedef struct {
400     ThrottleGroupMember *tgm;
401     ThrottleDirection direction;
402 } RestartData;
403 
throttle_group_restart_queue_entry(void * opaque)404 static void coroutine_fn throttle_group_restart_queue_entry(void *opaque)
405 {
406     RestartData *data = opaque;
407     ThrottleGroupMember *tgm = data->tgm;
408     ThrottleState *ts = tgm->throttle_state;
409     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
410     ThrottleDirection direction = data->direction;
411     bool empty_queue;
412 
413     empty_queue = !throttle_group_co_restart_queue(tgm, direction);
414 
415     /* If the request queue was empty then we have to take care of
416      * scheduling the next one */
417     if (empty_queue) {
418         qemu_mutex_lock(&tg->lock);
419         schedule_next_request(tgm, direction);
420         qemu_mutex_unlock(&tg->lock);
421     }
422 
423     g_free(data);
424 
425     qatomic_dec(&tgm->restart_pending);
426     aio_wait_kick();
427 }
428 
throttle_group_restart_queue(ThrottleGroupMember * tgm,ThrottleDirection direction)429 static void throttle_group_restart_queue(ThrottleGroupMember *tgm,
430                                         ThrottleDirection direction)
431 {
432     Coroutine *co;
433     RestartData *rd = g_new0(RestartData, 1);
434 
435     rd->tgm = tgm;
436     rd->direction = direction;
437 
438     /* This function is called when a timer is fired or when
439      * throttle_group_restart_tgm() is called. Either way, there can
440      * be no timer pending on this tgm at this point */
441     assert(!timer_pending(tgm->throttle_timers.timers[direction]));
442 
443     qatomic_inc(&tgm->restart_pending);
444 
445     co = qemu_coroutine_create(throttle_group_restart_queue_entry, rd);
446     aio_co_enter(tgm->aio_context, co);
447 }
448 
throttle_group_restart_tgm(ThrottleGroupMember * tgm)449 void throttle_group_restart_tgm(ThrottleGroupMember *tgm)
450 {
451     ThrottleDirection dir;
452 
453     if (tgm->throttle_state) {
454         for (dir = THROTTLE_READ; dir < THROTTLE_MAX; dir++) {
455             QEMUTimer *t = tgm->throttle_timers.timers[dir];
456             if (timer_pending(t)) {
457                 /* If there's a pending timer on this tgm, fire it now */
458                 timer_del(t);
459                 timer_cb(tgm, dir);
460             } else {
461                 /* Else run the next request from the queue manually */
462                 throttle_group_restart_queue(tgm, dir);
463             }
464         }
465     }
466 }
467 
468 /* Update the throttle configuration for a particular group. Similar
469  * to throttle_config(), but guarantees atomicity within the
470  * throttling group.
471  *
472  * @tgm:    a ThrottleGroupMember that is a member of the group
473  * @cfg: the configuration to set
474  */
throttle_group_config(ThrottleGroupMember * tgm,ThrottleConfig * cfg)475 void throttle_group_config(ThrottleGroupMember *tgm, ThrottleConfig *cfg)
476 {
477     ThrottleState *ts = tgm->throttle_state;
478     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
479     qemu_mutex_lock(&tg->lock);
480     throttle_config(ts, tg->clock_type, cfg);
481     qemu_mutex_unlock(&tg->lock);
482 
483     throttle_group_restart_tgm(tgm);
484 }
485 
486 /* Get the throttle configuration from a particular group. Similar to
487  * throttle_get_config(), but guarantees atomicity within the
488  * throttling group.
489  *
490  * @tgm:    a ThrottleGroupMember that is a member of the group
491  * @cfg: the configuration will be written here
492  */
throttle_group_get_config(ThrottleGroupMember * tgm,ThrottleConfig * cfg)493 void throttle_group_get_config(ThrottleGroupMember *tgm, ThrottleConfig *cfg)
494 {
495     ThrottleState *ts = tgm->throttle_state;
496     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
497     qemu_mutex_lock(&tg->lock);
498     throttle_get_config(ts, cfg);
499     qemu_mutex_unlock(&tg->lock);
500 }
501 
502 /* ThrottleTimers callback. This wakes up a request that was waiting
503  * because it had been throttled.
504  *
505  * @tgm:       the ThrottleGroupMember whose request had been throttled
506  * @direction: the ThrottleDirection
507  */
timer_cb(ThrottleGroupMember * tgm,ThrottleDirection direction)508 static void timer_cb(ThrottleGroupMember *tgm, ThrottleDirection direction)
509 {
510     ThrottleState *ts = tgm->throttle_state;
511     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
512 
513     /* The timer has just been fired, so we can update the flag */
514     qemu_mutex_lock(&tg->lock);
515     tg->any_timer_armed[direction] = false;
516     qemu_mutex_unlock(&tg->lock);
517 
518     /* Run the request that was waiting for this timer */
519     throttle_group_restart_queue(tgm, direction);
520 }
521 
read_timer_cb(void * opaque)522 static void read_timer_cb(void *opaque)
523 {
524     timer_cb(opaque, THROTTLE_READ);
525 }
526 
write_timer_cb(void * opaque)527 static void write_timer_cb(void *opaque)
528 {
529     timer_cb(opaque, THROTTLE_WRITE);
530 }
531 
532 /* Register a ThrottleGroupMember from the throttling group, also initializing
533  * its timers and updating its throttle_state pointer to point to it. If a
534  * throttling group with that name does not exist yet, it will be created.
535  *
536  * This function edits throttle_groups and must be called under the global
537  * mutex.
538  *
539  * @tgm:       the ThrottleGroupMember to insert
540  * @groupname: the name of the group
541  * @ctx:       the AioContext to use
542  */
throttle_group_register_tgm(ThrottleGroupMember * tgm,const char * groupname,AioContext * ctx)543 void throttle_group_register_tgm(ThrottleGroupMember *tgm,
544                                  const char *groupname,
545                                  AioContext *ctx)
546 {
547     ThrottleDirection dir;
548     ThrottleState *ts = throttle_group_incref(groupname);
549     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
550 
551     tgm->throttle_state = ts;
552     tgm->aio_context = ctx;
553     qatomic_set(&tgm->restart_pending, 0);
554 
555     QEMU_LOCK_GUARD(&tg->lock);
556     /* If the ThrottleGroup is new set this ThrottleGroupMember as the token */
557     for (dir = THROTTLE_READ; dir < THROTTLE_MAX; dir++) {
558         if (!tg->tokens[dir]) {
559             tg->tokens[dir] = tgm;
560         }
561         qemu_co_queue_init(&tgm->throttled_reqs[dir]);
562     }
563 
564     QLIST_INSERT_HEAD(&tg->head, tgm, round_robin);
565 
566     throttle_timers_init(&tgm->throttle_timers,
567                          tgm->aio_context,
568                          tg->clock_type,
569                          read_timer_cb,
570                          write_timer_cb,
571                          tgm);
572     qemu_co_mutex_init(&tgm->throttled_reqs_lock);
573 }
574 
575 /* Unregister a ThrottleGroupMember from its group, removing it from the list,
576  * destroying the timers and setting the throttle_state pointer to NULL.
577  *
578  * The ThrottleGroupMember must not have pending throttled requests, so the
579  * caller has to drain them first.
580  *
581  * The group will be destroyed if it's empty after this operation.
582  *
583  * @tgm the ThrottleGroupMember to remove
584  */
throttle_group_unregister_tgm(ThrottleGroupMember * tgm)585 void throttle_group_unregister_tgm(ThrottleGroupMember *tgm)
586 {
587     ThrottleState *ts = tgm->throttle_state;
588     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
589     ThrottleGroupMember *token;
590     ThrottleDirection dir;
591 
592     if (!ts) {
593         /* Discard already unregistered tgm */
594         return;
595     }
596 
597     /* Wait for throttle_group_restart_queue_entry() coroutines to finish */
598     AIO_WAIT_WHILE(tgm->aio_context, qatomic_read(&tgm->restart_pending) > 0);
599 
600     WITH_QEMU_LOCK_GUARD(&tg->lock) {
601         for (dir = THROTTLE_READ; dir < THROTTLE_MAX; dir++) {
602             assert(tgm->pending_reqs[dir] == 0);
603             assert(qemu_co_queue_empty(&tgm->throttled_reqs[dir]));
604             assert(!timer_pending(tgm->throttle_timers.timers[dir]));
605             if (tg->tokens[dir] == tgm) {
606                 token = throttle_group_next_tgm(tgm);
607                 /* Take care of the case where this is the last tgm in the group */
608                 if (token == tgm) {
609                     token = NULL;
610                 }
611                 tg->tokens[dir] = token;
612             }
613         }
614 
615         /* remove the current tgm from the list */
616         QLIST_REMOVE(tgm, round_robin);
617         throttle_timers_destroy(&tgm->throttle_timers);
618     }
619 
620     throttle_group_unref(&tg->ts);
621     tgm->throttle_state = NULL;
622 }
623 
throttle_group_attach_aio_context(ThrottleGroupMember * tgm,AioContext * new_context)624 void throttle_group_attach_aio_context(ThrottleGroupMember *tgm,
625                                        AioContext *new_context)
626 {
627     ThrottleTimers *tt = &tgm->throttle_timers;
628     throttle_timers_attach_aio_context(tt, new_context);
629     tgm->aio_context = new_context;
630 }
631 
throttle_group_detach_aio_context(ThrottleGroupMember * tgm)632 void throttle_group_detach_aio_context(ThrottleGroupMember *tgm)
633 {
634     ThrottleGroup *tg = container_of(tgm->throttle_state, ThrottleGroup, ts);
635     ThrottleTimers *tt = &tgm->throttle_timers;
636     ThrottleDirection dir;
637 
638     /* Requests must have been drained */
639     for (dir = THROTTLE_READ; dir < THROTTLE_MAX; dir++) {
640         assert(tgm->pending_reqs[dir] == 0);
641         assert(qemu_co_queue_empty(&tgm->throttled_reqs[dir]));
642     }
643 
644     /* Kick off next ThrottleGroupMember, if necessary */
645     WITH_QEMU_LOCK_GUARD(&tg->lock) {
646         for (dir = THROTTLE_READ; dir < THROTTLE_MAX; dir++) {
647             if (timer_pending(tt->timers[dir])) {
648                 tg->any_timer_armed[dir] = false;
649                 schedule_next_request(tgm, dir);
650             }
651         }
652     }
653 
654     throttle_timers_detach_aio_context(tt);
655     tgm->aio_context = NULL;
656 }
657 
658 #undef THROTTLE_OPT_PREFIX
659 #define THROTTLE_OPT_PREFIX "x-"
660 
661 /* Helper struct and array for QOM property setter/getter */
662 typedef struct {
663     const char *name;
664     BucketType type;
665     enum {
666         AVG,
667         MAX,
668         BURST_LENGTH,
669         IOPS_SIZE,
670     } category;
671 } ThrottleParamInfo;
672 
673 static ThrottleParamInfo properties[] = {
674     {
675         THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_TOTAL,
676         THROTTLE_OPS_TOTAL, AVG,
677     },
678     {
679         THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_TOTAL_MAX,
680         THROTTLE_OPS_TOTAL, MAX,
681     },
682     {
683         THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_TOTAL_MAX_LENGTH,
684         THROTTLE_OPS_TOTAL, BURST_LENGTH,
685     },
686     {
687         THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_READ,
688         THROTTLE_OPS_READ, AVG,
689     },
690     {
691         THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_READ_MAX,
692         THROTTLE_OPS_READ, MAX,
693     },
694     {
695         THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_READ_MAX_LENGTH,
696         THROTTLE_OPS_READ, BURST_LENGTH,
697     },
698     {
699         THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_WRITE,
700         THROTTLE_OPS_WRITE, AVG,
701     },
702     {
703         THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_WRITE_MAX,
704         THROTTLE_OPS_WRITE, MAX,
705     },
706     {
707         THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_WRITE_MAX_LENGTH,
708         THROTTLE_OPS_WRITE, BURST_LENGTH,
709     },
710     {
711         THROTTLE_OPT_PREFIX QEMU_OPT_BPS_TOTAL,
712         THROTTLE_BPS_TOTAL, AVG,
713     },
714     {
715         THROTTLE_OPT_PREFIX QEMU_OPT_BPS_TOTAL_MAX,
716         THROTTLE_BPS_TOTAL, MAX,
717     },
718     {
719         THROTTLE_OPT_PREFIX QEMU_OPT_BPS_TOTAL_MAX_LENGTH,
720         THROTTLE_BPS_TOTAL, BURST_LENGTH,
721     },
722     {
723         THROTTLE_OPT_PREFIX QEMU_OPT_BPS_READ,
724         THROTTLE_BPS_READ, AVG,
725     },
726     {
727         THROTTLE_OPT_PREFIX QEMU_OPT_BPS_READ_MAX,
728         THROTTLE_BPS_READ, MAX,
729     },
730     {
731         THROTTLE_OPT_PREFIX QEMU_OPT_BPS_READ_MAX_LENGTH,
732         THROTTLE_BPS_READ, BURST_LENGTH,
733     },
734     {
735         THROTTLE_OPT_PREFIX QEMU_OPT_BPS_WRITE,
736         THROTTLE_BPS_WRITE, AVG,
737     },
738     {
739         THROTTLE_OPT_PREFIX QEMU_OPT_BPS_WRITE_MAX,
740         THROTTLE_BPS_WRITE, MAX,
741     },
742     {
743         THROTTLE_OPT_PREFIX QEMU_OPT_BPS_WRITE_MAX_LENGTH,
744         THROTTLE_BPS_WRITE, BURST_LENGTH,
745     },
746     {
747         THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_SIZE,
748         0, IOPS_SIZE,
749     }
750 };
751 
752 /* This function edits throttle_groups and must be called under the global
753  * mutex */
throttle_group_obj_init(Object * obj)754 static void throttle_group_obj_init(Object *obj)
755 {
756     ThrottleGroup *tg = THROTTLE_GROUP(obj);
757 
758     tg->clock_type = QEMU_CLOCK_REALTIME;
759     if (qtest_enabled()) {
760         /* For testing block IO throttling only */
761         tg->clock_type = QEMU_CLOCK_VIRTUAL;
762     }
763     tg->is_initialized = false;
764     qemu_mutex_init(&tg->lock);
765     throttle_init(&tg->ts);
766     QLIST_INIT(&tg->head);
767 }
768 
769 /* This function edits throttle_groups and must be called under the global
770  * mutex */
throttle_group_obj_complete(UserCreatable * obj,Error ** errp)771 static void throttle_group_obj_complete(UserCreatable *obj, Error **errp)
772 {
773     ThrottleGroup *tg = THROTTLE_GROUP(obj);
774     ThrottleConfig cfg;
775 
776     /* set group name to object id if it exists */
777     if (!tg->name && tg->parent_obj.parent) {
778         tg->name = g_strdup(object_get_canonical_path_component(OBJECT(obj)));
779     }
780     /* We must have a group name at this point */
781     assert(tg->name);
782 
783     /* error if name is duplicate */
784     if (throttle_group_exists(tg->name)) {
785         error_setg(errp, "A group with this name already exists");
786         return;
787     }
788 
789     /* check validity */
790     throttle_get_config(&tg->ts, &cfg);
791     if (!throttle_is_valid(&cfg, errp)) {
792         return;
793     }
794     throttle_config(&tg->ts, tg->clock_type, &cfg);
795     QTAILQ_INSERT_TAIL(&throttle_groups, tg, list);
796     tg->is_initialized = true;
797 }
798 
799 /* This function edits throttle_groups and must be called under the global
800  * mutex */
throttle_group_obj_finalize(Object * obj)801 static void throttle_group_obj_finalize(Object *obj)
802 {
803     ThrottleGroup *tg = THROTTLE_GROUP(obj);
804     if (tg->is_initialized) {
805         QTAILQ_REMOVE(&throttle_groups, tg, list);
806     }
807     qemu_mutex_destroy(&tg->lock);
808     g_free(tg->name);
809 }
810 
throttle_group_set(Object * obj,Visitor * v,const char * name,void * opaque,Error ** errp)811 static void throttle_group_set(Object *obj, Visitor *v, const char * name,
812                                void *opaque, Error **errp)
813 
814 {
815     ThrottleGroup *tg = THROTTLE_GROUP(obj);
816     ThrottleConfig *cfg;
817     ThrottleParamInfo *info = opaque;
818     int64_t value;
819 
820     /* If we have finished initialization, don't accept individual property
821      * changes through QOM. Throttle configuration limits must be set in one
822      * transaction, as certain combinations are invalid.
823      */
824     if (tg->is_initialized) {
825         error_setg(errp, "Property cannot be set after initialization");
826         return;
827     }
828 
829     if (!visit_type_int64(v, name, &value, errp)) {
830         return;
831     }
832     if (value < 0) {
833         error_setg(errp, "Property values cannot be negative");
834         return;
835     }
836 
837     cfg = &tg->ts.cfg;
838     switch (info->category) {
839     case AVG:
840         cfg->buckets[info->type].avg = value;
841         break;
842     case MAX:
843         cfg->buckets[info->type].max = value;
844         break;
845     case BURST_LENGTH:
846         if (value > UINT_MAX) {
847             error_setg(errp, "%s value must be in the" "range [0, %u]",
848                        info->name, UINT_MAX);
849             return;
850         }
851         cfg->buckets[info->type].burst_length = value;
852         break;
853     case IOPS_SIZE:
854         cfg->op_size = value;
855         break;
856     }
857 }
858 
throttle_group_get(Object * obj,Visitor * v,const char * name,void * opaque,Error ** errp)859 static void throttle_group_get(Object *obj, Visitor *v, const char *name,
860                                void *opaque, Error **errp)
861 {
862     ThrottleGroup *tg = THROTTLE_GROUP(obj);
863     ThrottleConfig cfg;
864     ThrottleParamInfo *info = opaque;
865     int64_t value;
866 
867     throttle_get_config(&tg->ts, &cfg);
868     switch (info->category) {
869     case AVG:
870         value = cfg.buckets[info->type].avg;
871         break;
872     case MAX:
873         value = cfg.buckets[info->type].max;
874         break;
875     case BURST_LENGTH:
876         value = cfg.buckets[info->type].burst_length;
877         break;
878     case IOPS_SIZE:
879         value = cfg.op_size;
880         break;
881     }
882 
883     visit_type_int64(v, name, &value, errp);
884 }
885 
throttle_group_set_limits(Object * obj,Visitor * v,const char * name,void * opaque,Error ** errp)886 static void throttle_group_set_limits(Object *obj, Visitor *v,
887                                       const char *name, void *opaque,
888                                       Error **errp)
889 
890 {
891     ThrottleGroup *tg = THROTTLE_GROUP(obj);
892     ThrottleConfig cfg;
893     ThrottleLimits *argp;
894     Error *local_err = NULL;
895 
896     if (!visit_type_ThrottleLimits(v, name, &argp, errp)) {
897         return;
898     }
899     qemu_mutex_lock(&tg->lock);
900     throttle_get_config(&tg->ts, &cfg);
901     throttle_limits_to_config(argp, &cfg, &local_err);
902     if (local_err) {
903         goto unlock;
904     }
905     throttle_config(&tg->ts, tg->clock_type, &cfg);
906 
907 unlock:
908     qemu_mutex_unlock(&tg->lock);
909     qapi_free_ThrottleLimits(argp);
910     error_propagate(errp, local_err);
911     return;
912 }
913 
throttle_group_get_limits(Object * obj,Visitor * v,const char * name,void * opaque,Error ** errp)914 static void throttle_group_get_limits(Object *obj, Visitor *v,
915                                       const char *name, void *opaque,
916                                       Error **errp)
917 {
918     ThrottleGroup *tg = THROTTLE_GROUP(obj);
919     ThrottleConfig cfg;
920     ThrottleLimits arg = { 0 };
921     ThrottleLimits *argp = &arg;
922 
923     qemu_mutex_lock(&tg->lock);
924     throttle_get_config(&tg->ts, &cfg);
925     qemu_mutex_unlock(&tg->lock);
926 
927     throttle_config_to_limits(&cfg, argp);
928 
929     visit_type_ThrottleLimits(v, name, &argp, errp);
930 }
931 
throttle_group_can_be_deleted(UserCreatable * uc)932 static bool throttle_group_can_be_deleted(UserCreatable *uc)
933 {
934     return OBJECT(uc)->ref == 1;
935 }
936 
throttle_group_obj_class_init(ObjectClass * klass,void * class_data)937 static void throttle_group_obj_class_init(ObjectClass *klass, void *class_data)
938 {
939     size_t i = 0;
940     UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
941 
942     ucc->complete = throttle_group_obj_complete;
943     ucc->can_be_deleted = throttle_group_can_be_deleted;
944 
945     /* individual properties */
946     for (i = 0; i < sizeof(properties) / sizeof(ThrottleParamInfo); i++) {
947         object_class_property_add(klass,
948                                   properties[i].name,
949                                   "int",
950                                   throttle_group_get,
951                                   throttle_group_set,
952                                   NULL, &properties[i]);
953     }
954 
955     /* ThrottleLimits */
956     object_class_property_add(klass,
957                               "limits", "ThrottleLimits",
958                               throttle_group_get_limits,
959                               throttle_group_set_limits,
960                               NULL, NULL);
961 }
962 
963 static const TypeInfo throttle_group_info = {
964     .name = TYPE_THROTTLE_GROUP,
965     .parent = TYPE_OBJECT,
966     .class_init = throttle_group_obj_class_init,
967     .instance_size = sizeof(ThrottleGroup),
968     .instance_init = throttle_group_obj_init,
969     .instance_finalize = throttle_group_obj_finalize,
970     .interfaces = (InterfaceInfo[]) {
971         { TYPE_USER_CREATABLE },
972         { }
973     },
974 };
975 
throttle_groups_init(void)976 static void throttle_groups_init(void)
977 {
978     type_register_static(&throttle_group_info);
979 }
980 
981 type_init(throttle_groups_init);
982