xref: /openbmc/qemu/block/throttle-groups.c (revision 53e116fed6dde572003aebf3bc32e25663eeb446)
1 /*
2  * QEMU block throttling group infrastructure
3  *
4  * Copyright (C) Nodalink, EURL. 2014
5  * Copyright (C) Igalia, S.L. 2015
6  *
7  * Authors:
8  *   Benoît Canet <benoit.canet@nodalink.com>
9  *   Alberto Garcia <berto@igalia.com>
10  *
11  * This program is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU General Public License as
13  * published by the Free Software Foundation; either version 2 or
14  * (at your option) version 3 of the License.
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with this program; if not, see <http://www.gnu.org/licenses/>.
23  */
24 
25 #include "qemu/osdep.h"
26 #include "sysemu/block-backend.h"
27 #include "block/throttle-groups.h"
28 #include "qemu/throttle-options.h"
29 #include "qemu/queue.h"
30 #include "qemu/thread.h"
31 #include "sysemu/qtest.h"
32 #include "qapi/error.h"
33 #include "qapi/qapi-visit-block-core.h"
34 #include "qom/object.h"
35 #include "qom/object_interfaces.h"
36 
37 static void throttle_group_obj_init(Object *obj);
38 static void throttle_group_obj_complete(UserCreatable *obj, Error **errp);
39 static void timer_cb(ThrottleGroupMember *tgm, bool is_write);
40 
41 /* The ThrottleGroup structure (with its ThrottleState) is shared
42  * among different ThrottleGroupMembers and it's independent from
43  * AioContext, so in order to use it from different threads it needs
44  * its own locking.
45  *
46  * This locking is however handled internally in this file, so it's
47  * transparent to outside users.
48  *
49  * The whole ThrottleGroup structure is private and invisible to
50  * outside users, that only use it through its ThrottleState.
51  *
52  * In addition to the ThrottleGroup structure, ThrottleGroupMember has
53  * fields that need to be accessed by other members of the group and
54  * therefore also need to be protected by this lock. Once a
55  * ThrottleGroupMember is registered in a group those fields can be accessed
56  * by other threads any time.
57  *
58  * Again, all this is handled internally and is mostly transparent to
59  * the outside. The 'throttle_timers' field however has an additional
60  * constraint because it may be temporarily invalid (see for example
61  * blk_set_aio_context()). Therefore in this file a thread will
62  * access some other ThrottleGroupMember's timers only after verifying that
63  * that ThrottleGroupMember has throttled requests in the queue.
64  */
65 typedef struct ThrottleGroup {
66     Object parent_obj;
67 
68     /* refuse individual property change if initialization is complete */
69     bool is_initialized;
70     char *name; /* This is constant during the lifetime of the group */
71 
72     QemuMutex lock; /* This lock protects the following four fields */
73     ThrottleState ts;
74     QLIST_HEAD(, ThrottleGroupMember) head;
75     ThrottleGroupMember *tokens[2];
76     bool any_timer_armed[2];
77     QEMUClockType clock_type;
78 
79     /* This field is protected by the global QEMU mutex */
80     QTAILQ_ENTRY(ThrottleGroup) list;
81 } ThrottleGroup;
82 
83 /* This is protected by the global QEMU mutex */
84 static QTAILQ_HEAD(, ThrottleGroup) throttle_groups =
85     QTAILQ_HEAD_INITIALIZER(throttle_groups);
86 
87 
88 /* This function reads throttle_groups and must be called under the global
89  * mutex.
90  */
91 static ThrottleGroup *throttle_group_by_name(const char *name)
92 {
93     ThrottleGroup *iter;
94 
95     /* Look for an existing group with that name */
96     QTAILQ_FOREACH(iter, &throttle_groups, list) {
97         if (!g_strcmp0(name, iter->name)) {
98             return iter;
99         }
100     }
101 
102     return NULL;
103 }
104 
105 /* This function reads throttle_groups and must be called under the global
106  * mutex.
107  */
108 bool throttle_group_exists(const char *name)
109 {
110     return throttle_group_by_name(name) != NULL;
111 }
112 
113 /* Increments the reference count of a ThrottleGroup given its name.
114  *
115  * If no ThrottleGroup is found with the given name a new one is
116  * created.
117  *
118  * This function edits throttle_groups and must be called under the global
119  * mutex.
120  *
121  * @name: the name of the ThrottleGroup
122  * @ret:  the ThrottleState member of the ThrottleGroup
123  */
124 ThrottleState *throttle_group_incref(const char *name)
125 {
126     ThrottleGroup *tg = NULL;
127 
128     /* Look for an existing group with that name */
129     tg = throttle_group_by_name(name);
130 
131     if (tg) {
132         object_ref(OBJECT(tg));
133     } else {
134         /* Create a new one if not found */
135         /* new ThrottleGroup obj will have a refcnt = 1 */
136         tg = THROTTLE_GROUP(object_new(TYPE_THROTTLE_GROUP));
137         tg->name = g_strdup(name);
138         throttle_group_obj_complete(USER_CREATABLE(tg), &error_abort);
139     }
140 
141     return &tg->ts;
142 }
143 
144 /* Decrease the reference count of a ThrottleGroup.
145  *
146  * When the reference count reaches zero the ThrottleGroup is
147  * destroyed.
148  *
149  * This function edits throttle_groups and must be called under the global
150  * mutex.
151  *
152  * @ts:  The ThrottleGroup to unref, given by its ThrottleState member
153  */
154 void throttle_group_unref(ThrottleState *ts)
155 {
156     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
157     object_unref(OBJECT(tg));
158 }
159 
160 /* Get the name from a ThrottleGroupMember's group. The name (and the pointer)
161  * is guaranteed to remain constant during the lifetime of the group.
162  *
163  * @tgm:  a ThrottleGroupMember
164  * @ret:  the name of the group.
165  */
166 const char *throttle_group_get_name(ThrottleGroupMember *tgm)
167 {
168     ThrottleGroup *tg = container_of(tgm->throttle_state, ThrottleGroup, ts);
169     return tg->name;
170 }
171 
172 /* Return the next ThrottleGroupMember in the round-robin sequence, simulating
173  * a circular list.
174  *
175  * This assumes that tg->lock is held.
176  *
177  * @tgm: the current ThrottleGroupMember
178  * @ret: the next ThrottleGroupMember in the sequence
179  */
180 static ThrottleGroupMember *throttle_group_next_tgm(ThrottleGroupMember *tgm)
181 {
182     ThrottleState *ts = tgm->throttle_state;
183     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
184     ThrottleGroupMember *next = QLIST_NEXT(tgm, round_robin);
185 
186     if (!next) {
187         next = QLIST_FIRST(&tg->head);
188     }
189 
190     return next;
191 }
192 
193 /*
194  * Return whether a ThrottleGroupMember has pending requests.
195  *
196  * This assumes that tg->lock is held.
197  *
198  * @tgm:        the ThrottleGroupMember
199  * @is_write:   the type of operation (read/write)
200  * @ret:        whether the ThrottleGroupMember has pending requests.
201  */
202 static inline bool tgm_has_pending_reqs(ThrottleGroupMember *tgm,
203                                         bool is_write)
204 {
205     return tgm->pending_reqs[is_write];
206 }
207 
208 /* Return the next ThrottleGroupMember in the round-robin sequence with pending
209  * I/O requests.
210  *
211  * This assumes that tg->lock is held.
212  *
213  * @tgm:       the current ThrottleGroupMember
214  * @is_write:  the type of operation (read/write)
215  * @ret:       the next ThrottleGroupMember with pending requests, or tgm if
216  *             there is none.
217  */
218 static ThrottleGroupMember *next_throttle_token(ThrottleGroupMember *tgm,
219                                                 bool is_write)
220 {
221     ThrottleState *ts = tgm->throttle_state;
222     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
223     ThrottleGroupMember *token, *start;
224 
225     /* If this member has its I/O limits disabled then it means that
226      * it's being drained. Skip the round-robin search and return tgm
227      * immediately if it has pending requests. Otherwise we could be
228      * forcing it to wait for other member's throttled requests. */
229     if (tgm_has_pending_reqs(tgm, is_write) &&
230         atomic_read(&tgm->io_limits_disabled)) {
231         return tgm;
232     }
233 
234     start = token = tg->tokens[is_write];
235 
236     /* get next bs round in round robin style */
237     token = throttle_group_next_tgm(token);
238     while (token != start && !tgm_has_pending_reqs(token, is_write)) {
239         token = throttle_group_next_tgm(token);
240     }
241 
242     /* If no IO are queued for scheduling on the next round robin token
243      * then decide the token is the current tgm because chances are
244      * the current tgm got the current request queued.
245      */
246     if (token == start && !tgm_has_pending_reqs(token, is_write)) {
247         token = tgm;
248     }
249 
250     /* Either we return the original TGM, or one with pending requests */
251     assert(token == tgm || tgm_has_pending_reqs(token, is_write));
252 
253     return token;
254 }
255 
256 /* Check if the next I/O request for a ThrottleGroupMember needs to be
257  * throttled or not. If there's no timer set in this group, set one and update
258  * the token accordingly.
259  *
260  * This assumes that tg->lock is held.
261  *
262  * @tgm:        the current ThrottleGroupMember
263  * @is_write:   the type of operation (read/write)
264  * @ret:        whether the I/O request needs to be throttled or not
265  */
266 static bool throttle_group_schedule_timer(ThrottleGroupMember *tgm,
267                                           bool is_write)
268 {
269     ThrottleState *ts = tgm->throttle_state;
270     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
271     ThrottleTimers *tt = &tgm->throttle_timers;
272     bool must_wait;
273 
274     if (atomic_read(&tgm->io_limits_disabled)) {
275         return false;
276     }
277 
278     /* Check if any of the timers in this group is already armed */
279     if (tg->any_timer_armed[is_write]) {
280         return true;
281     }
282 
283     must_wait = throttle_schedule_timer(ts, tt, is_write);
284 
285     /* If a timer just got armed, set tgm as the current token */
286     if (must_wait) {
287         tg->tokens[is_write] = tgm;
288         tg->any_timer_armed[is_write] = true;
289     }
290 
291     return must_wait;
292 }
293 
294 /* Start the next pending I/O request for a ThrottleGroupMember. Return whether
295  * any request was actually pending.
296  *
297  * @tgm:       the current ThrottleGroupMember
298  * @is_write:  the type of operation (read/write)
299  */
300 static bool coroutine_fn throttle_group_co_restart_queue(ThrottleGroupMember *tgm,
301                                                          bool is_write)
302 {
303     bool ret;
304 
305     qemu_co_mutex_lock(&tgm->throttled_reqs_lock);
306     ret = qemu_co_queue_next(&tgm->throttled_reqs[is_write]);
307     qemu_co_mutex_unlock(&tgm->throttled_reqs_lock);
308 
309     return ret;
310 }
311 
312 /* Look for the next pending I/O request and schedule it.
313  *
314  * This assumes that tg->lock is held.
315  *
316  * @tgm:       the current ThrottleGroupMember
317  * @is_write:  the type of operation (read/write)
318  */
319 static void schedule_next_request(ThrottleGroupMember *tgm, bool is_write)
320 {
321     ThrottleState *ts = tgm->throttle_state;
322     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
323     bool must_wait;
324     ThrottleGroupMember *token;
325 
326     /* Check if there's any pending request to schedule next */
327     token = next_throttle_token(tgm, is_write);
328     if (!tgm_has_pending_reqs(token, is_write)) {
329         return;
330     }
331 
332     /* Set a timer for the request if it needs to be throttled */
333     must_wait = throttle_group_schedule_timer(token, is_write);
334 
335     /* If it doesn't have to wait, queue it for immediate execution */
336     if (!must_wait) {
337         /* Give preference to requests from the current tgm */
338         if (qemu_in_coroutine() &&
339             throttle_group_co_restart_queue(tgm, is_write)) {
340             token = tgm;
341         } else {
342             ThrottleTimers *tt = &token->throttle_timers;
343             int64_t now = qemu_clock_get_ns(tg->clock_type);
344             timer_mod(tt->timers[is_write], now);
345             tg->any_timer_armed[is_write] = true;
346         }
347         tg->tokens[is_write] = token;
348     }
349 }
350 
351 /* Check if an I/O request needs to be throttled, wait and set a timer
352  * if necessary, and schedule the next request using a round robin
353  * algorithm.
354  *
355  * @tgm:       the current ThrottleGroupMember
356  * @bytes:     the number of bytes for this I/O
357  * @is_write:  the type of operation (read/write)
358  */
359 void coroutine_fn throttle_group_co_io_limits_intercept(ThrottleGroupMember *tgm,
360                                                         unsigned int bytes,
361                                                         bool is_write)
362 {
363     bool must_wait;
364     ThrottleGroupMember *token;
365     ThrottleGroup *tg = container_of(tgm->throttle_state, ThrottleGroup, ts);
366     qemu_mutex_lock(&tg->lock);
367 
368     /* First we check if this I/O has to be throttled. */
369     token = next_throttle_token(tgm, is_write);
370     must_wait = throttle_group_schedule_timer(token, is_write);
371 
372     /* Wait if there's a timer set or queued requests of this type */
373     if (must_wait || tgm->pending_reqs[is_write]) {
374         tgm->pending_reqs[is_write]++;
375         qemu_mutex_unlock(&tg->lock);
376         qemu_co_mutex_lock(&tgm->throttled_reqs_lock);
377         qemu_co_queue_wait(&tgm->throttled_reqs[is_write],
378                            &tgm->throttled_reqs_lock);
379         qemu_co_mutex_unlock(&tgm->throttled_reqs_lock);
380         qemu_mutex_lock(&tg->lock);
381         tgm->pending_reqs[is_write]--;
382     }
383 
384     /* The I/O will be executed, so do the accounting */
385     throttle_account(tgm->throttle_state, is_write, bytes);
386 
387     /* Schedule the next request */
388     schedule_next_request(tgm, is_write);
389 
390     qemu_mutex_unlock(&tg->lock);
391 }
392 
393 typedef struct {
394     ThrottleGroupMember *tgm;
395     bool is_write;
396 } RestartData;
397 
398 static void coroutine_fn throttle_group_restart_queue_entry(void *opaque)
399 {
400     RestartData *data = opaque;
401     ThrottleGroupMember *tgm = data->tgm;
402     ThrottleState *ts = tgm->throttle_state;
403     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
404     bool is_write = data->is_write;
405     bool empty_queue;
406 
407     empty_queue = !throttle_group_co_restart_queue(tgm, is_write);
408 
409     /* If the request queue was empty then we have to take care of
410      * scheduling the next one */
411     if (empty_queue) {
412         qemu_mutex_lock(&tg->lock);
413         schedule_next_request(tgm, is_write);
414         qemu_mutex_unlock(&tg->lock);
415     }
416 
417     g_free(data);
418 
419     atomic_dec(&tgm->restart_pending);
420     aio_wait_kick();
421 }
422 
423 static void throttle_group_restart_queue(ThrottleGroupMember *tgm, bool is_write)
424 {
425     Coroutine *co;
426     RestartData *rd = g_new0(RestartData, 1);
427 
428     rd->tgm = tgm;
429     rd->is_write = is_write;
430 
431     /* This function is called when a timer is fired or when
432      * throttle_group_restart_tgm() is called. Either way, there can
433      * be no timer pending on this tgm at this point */
434     assert(!timer_pending(tgm->throttle_timers.timers[is_write]));
435 
436     atomic_inc(&tgm->restart_pending);
437 
438     co = qemu_coroutine_create(throttle_group_restart_queue_entry, rd);
439     aio_co_enter(tgm->aio_context, co);
440 }
441 
442 void throttle_group_restart_tgm(ThrottleGroupMember *tgm)
443 {
444     int i;
445 
446     if (tgm->throttle_state) {
447         for (i = 0; i < 2; i++) {
448             QEMUTimer *t = tgm->throttle_timers.timers[i];
449             if (timer_pending(t)) {
450                 /* If there's a pending timer on this tgm, fire it now */
451                 timer_del(t);
452                 timer_cb(tgm, i);
453             } else {
454                 /* Else run the next request from the queue manually */
455                 throttle_group_restart_queue(tgm, i);
456             }
457         }
458     }
459 }
460 
461 /* Update the throttle configuration for a particular group. Similar
462  * to throttle_config(), but guarantees atomicity within the
463  * throttling group.
464  *
465  * @tgm:    a ThrottleGroupMember that is a member of the group
466  * @cfg: the configuration to set
467  */
468 void throttle_group_config(ThrottleGroupMember *tgm, ThrottleConfig *cfg)
469 {
470     ThrottleState *ts = tgm->throttle_state;
471     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
472     qemu_mutex_lock(&tg->lock);
473     throttle_config(ts, tg->clock_type, cfg);
474     qemu_mutex_unlock(&tg->lock);
475 
476     throttle_group_restart_tgm(tgm);
477 }
478 
479 /* Get the throttle configuration from a particular group. Similar to
480  * throttle_get_config(), but guarantees atomicity within the
481  * throttling group.
482  *
483  * @tgm:    a ThrottleGroupMember that is a member of the group
484  * @cfg: the configuration will be written here
485  */
486 void throttle_group_get_config(ThrottleGroupMember *tgm, ThrottleConfig *cfg)
487 {
488     ThrottleState *ts = tgm->throttle_state;
489     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
490     qemu_mutex_lock(&tg->lock);
491     throttle_get_config(ts, cfg);
492     qemu_mutex_unlock(&tg->lock);
493 }
494 
495 /* ThrottleTimers callback. This wakes up a request that was waiting
496  * because it had been throttled.
497  *
498  * @tgm:       the ThrottleGroupMember whose request had been throttled
499  * @is_write:  the type of operation (read/write)
500  */
501 static void timer_cb(ThrottleGroupMember *tgm, bool is_write)
502 {
503     ThrottleState *ts = tgm->throttle_state;
504     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
505 
506     /* The timer has just been fired, so we can update the flag */
507     qemu_mutex_lock(&tg->lock);
508     tg->any_timer_armed[is_write] = false;
509     qemu_mutex_unlock(&tg->lock);
510 
511     /* Run the request that was waiting for this timer */
512     throttle_group_restart_queue(tgm, is_write);
513 }
514 
515 static void read_timer_cb(void *opaque)
516 {
517     timer_cb(opaque, false);
518 }
519 
520 static void write_timer_cb(void *opaque)
521 {
522     timer_cb(opaque, true);
523 }
524 
525 /* Register a ThrottleGroupMember from the throttling group, also initializing
526  * its timers and updating its throttle_state pointer to point to it. If a
527  * throttling group with that name does not exist yet, it will be created.
528  *
529  * This function edits throttle_groups and must be called under the global
530  * mutex.
531  *
532  * @tgm:       the ThrottleGroupMember to insert
533  * @groupname: the name of the group
534  * @ctx:       the AioContext to use
535  */
536 void throttle_group_register_tgm(ThrottleGroupMember *tgm,
537                                  const char *groupname,
538                                  AioContext *ctx)
539 {
540     int i;
541     ThrottleState *ts = throttle_group_incref(groupname);
542     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
543 
544     tgm->throttle_state = ts;
545     tgm->aio_context = ctx;
546     atomic_set(&tgm->restart_pending, 0);
547 
548     qemu_mutex_lock(&tg->lock);
549     /* If the ThrottleGroup is new set this ThrottleGroupMember as the token */
550     for (i = 0; i < 2; i++) {
551         if (!tg->tokens[i]) {
552             tg->tokens[i] = tgm;
553         }
554     }
555 
556     QLIST_INSERT_HEAD(&tg->head, tgm, round_robin);
557 
558     throttle_timers_init(&tgm->throttle_timers,
559                          tgm->aio_context,
560                          tg->clock_type,
561                          read_timer_cb,
562                          write_timer_cb,
563                          tgm);
564     qemu_co_mutex_init(&tgm->throttled_reqs_lock);
565     qemu_co_queue_init(&tgm->throttled_reqs[0]);
566     qemu_co_queue_init(&tgm->throttled_reqs[1]);
567 
568     qemu_mutex_unlock(&tg->lock);
569 }
570 
571 /* Unregister a ThrottleGroupMember from its group, removing it from the list,
572  * destroying the timers and setting the throttle_state pointer to NULL.
573  *
574  * The ThrottleGroupMember must not have pending throttled requests, so the
575  * caller has to drain them first.
576  *
577  * The group will be destroyed if it's empty after this operation.
578  *
579  * @tgm the ThrottleGroupMember to remove
580  */
581 void throttle_group_unregister_tgm(ThrottleGroupMember *tgm)
582 {
583     ThrottleState *ts = tgm->throttle_state;
584     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
585     ThrottleGroupMember *token;
586     int i;
587 
588     if (!ts) {
589         /* Discard already unregistered tgm */
590         return;
591     }
592 
593     /* Wait for throttle_group_restart_queue_entry() coroutines to finish */
594     AIO_WAIT_WHILE(tgm->aio_context, atomic_read(&tgm->restart_pending) > 0);
595 
596     qemu_mutex_lock(&tg->lock);
597     for (i = 0; i < 2; i++) {
598         assert(tgm->pending_reqs[i] == 0);
599         assert(qemu_co_queue_empty(&tgm->throttled_reqs[i]));
600         assert(!timer_pending(tgm->throttle_timers.timers[i]));
601         if (tg->tokens[i] == tgm) {
602             token = throttle_group_next_tgm(tgm);
603             /* Take care of the case where this is the last tgm in the group */
604             if (token == tgm) {
605                 token = NULL;
606             }
607             tg->tokens[i] = token;
608         }
609     }
610 
611     /* remove the current tgm from the list */
612     QLIST_REMOVE(tgm, round_robin);
613     throttle_timers_destroy(&tgm->throttle_timers);
614     qemu_mutex_unlock(&tg->lock);
615 
616     throttle_group_unref(&tg->ts);
617     tgm->throttle_state = NULL;
618 }
619 
620 void throttle_group_attach_aio_context(ThrottleGroupMember *tgm,
621                                        AioContext *new_context)
622 {
623     ThrottleTimers *tt = &tgm->throttle_timers;
624     throttle_timers_attach_aio_context(tt, new_context);
625     tgm->aio_context = new_context;
626 }
627 
628 void throttle_group_detach_aio_context(ThrottleGroupMember *tgm)
629 {
630     ThrottleGroup *tg = container_of(tgm->throttle_state, ThrottleGroup, ts);
631     ThrottleTimers *tt = &tgm->throttle_timers;
632     int i;
633 
634     /* Requests must have been drained */
635     assert(tgm->pending_reqs[0] == 0 && tgm->pending_reqs[1] == 0);
636     assert(qemu_co_queue_empty(&tgm->throttled_reqs[0]));
637     assert(qemu_co_queue_empty(&tgm->throttled_reqs[1]));
638 
639     /* Kick off next ThrottleGroupMember, if necessary */
640     qemu_mutex_lock(&tg->lock);
641     for (i = 0; i < 2; i++) {
642         if (timer_pending(tt->timers[i])) {
643             tg->any_timer_armed[i] = false;
644             schedule_next_request(tgm, i);
645         }
646     }
647     qemu_mutex_unlock(&tg->lock);
648 
649     throttle_timers_detach_aio_context(tt);
650     tgm->aio_context = NULL;
651 }
652 
653 #undef THROTTLE_OPT_PREFIX
654 #define THROTTLE_OPT_PREFIX "x-"
655 
656 /* Helper struct and array for QOM property setter/getter */
657 typedef struct {
658     const char *name;
659     BucketType type;
660     enum {
661         AVG,
662         MAX,
663         BURST_LENGTH,
664         IOPS_SIZE,
665     } category;
666 } ThrottleParamInfo;
667 
668 static ThrottleParamInfo properties[] = {
669     {
670         THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_TOTAL,
671         THROTTLE_OPS_TOTAL, AVG,
672     },
673     {
674         THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_TOTAL_MAX,
675         THROTTLE_OPS_TOTAL, MAX,
676     },
677     {
678         THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_TOTAL_MAX_LENGTH,
679         THROTTLE_OPS_TOTAL, BURST_LENGTH,
680     },
681     {
682         THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_READ,
683         THROTTLE_OPS_READ, AVG,
684     },
685     {
686         THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_READ_MAX,
687         THROTTLE_OPS_READ, MAX,
688     },
689     {
690         THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_READ_MAX_LENGTH,
691         THROTTLE_OPS_READ, BURST_LENGTH,
692     },
693     {
694         THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_WRITE,
695         THROTTLE_OPS_WRITE, AVG,
696     },
697     {
698         THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_WRITE_MAX,
699         THROTTLE_OPS_WRITE, MAX,
700     },
701     {
702         THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_WRITE_MAX_LENGTH,
703         THROTTLE_OPS_WRITE, BURST_LENGTH,
704     },
705     {
706         THROTTLE_OPT_PREFIX QEMU_OPT_BPS_TOTAL,
707         THROTTLE_BPS_TOTAL, AVG,
708     },
709     {
710         THROTTLE_OPT_PREFIX QEMU_OPT_BPS_TOTAL_MAX,
711         THROTTLE_BPS_TOTAL, MAX,
712     },
713     {
714         THROTTLE_OPT_PREFIX QEMU_OPT_BPS_TOTAL_MAX_LENGTH,
715         THROTTLE_BPS_TOTAL, BURST_LENGTH,
716     },
717     {
718         THROTTLE_OPT_PREFIX QEMU_OPT_BPS_READ,
719         THROTTLE_BPS_READ, AVG,
720     },
721     {
722         THROTTLE_OPT_PREFIX QEMU_OPT_BPS_READ_MAX,
723         THROTTLE_BPS_READ, MAX,
724     },
725     {
726         THROTTLE_OPT_PREFIX QEMU_OPT_BPS_READ_MAX_LENGTH,
727         THROTTLE_BPS_READ, BURST_LENGTH,
728     },
729     {
730         THROTTLE_OPT_PREFIX QEMU_OPT_BPS_WRITE,
731         THROTTLE_BPS_WRITE, AVG,
732     },
733     {
734         THROTTLE_OPT_PREFIX QEMU_OPT_BPS_WRITE_MAX,
735         THROTTLE_BPS_WRITE, MAX,
736     },
737     {
738         THROTTLE_OPT_PREFIX QEMU_OPT_BPS_WRITE_MAX_LENGTH,
739         THROTTLE_BPS_WRITE, BURST_LENGTH,
740     },
741     {
742         THROTTLE_OPT_PREFIX QEMU_OPT_IOPS_SIZE,
743         0, IOPS_SIZE,
744     }
745 };
746 
747 /* This function edits throttle_groups and must be called under the global
748  * mutex */
749 static void throttle_group_obj_init(Object *obj)
750 {
751     ThrottleGroup *tg = THROTTLE_GROUP(obj);
752 
753     tg->clock_type = QEMU_CLOCK_REALTIME;
754     if (qtest_enabled()) {
755         /* For testing block IO throttling only */
756         tg->clock_type = QEMU_CLOCK_VIRTUAL;
757     }
758     tg->is_initialized = false;
759     qemu_mutex_init(&tg->lock);
760     throttle_init(&tg->ts);
761     QLIST_INIT(&tg->head);
762 }
763 
764 /* This function edits throttle_groups and must be called under the global
765  * mutex */
766 static void throttle_group_obj_complete(UserCreatable *obj, Error **errp)
767 {
768     ThrottleGroup *tg = THROTTLE_GROUP(obj);
769     ThrottleConfig cfg;
770 
771     /* set group name to object id if it exists */
772     if (!tg->name && tg->parent_obj.parent) {
773         tg->name = object_get_canonical_path_component(OBJECT(obj));
774     }
775     /* We must have a group name at this point */
776     assert(tg->name);
777 
778     /* error if name is duplicate */
779     if (throttle_group_exists(tg->name)) {
780         error_setg(errp, "A group with this name already exists");
781         return;
782     }
783 
784     /* check validity */
785     throttle_get_config(&tg->ts, &cfg);
786     if (!throttle_is_valid(&cfg, errp)) {
787         return;
788     }
789     throttle_config(&tg->ts, tg->clock_type, &cfg);
790     QTAILQ_INSERT_TAIL(&throttle_groups, tg, list);
791     tg->is_initialized = true;
792 }
793 
794 /* This function edits throttle_groups and must be called under the global
795  * mutex */
796 static void throttle_group_obj_finalize(Object *obj)
797 {
798     ThrottleGroup *tg = THROTTLE_GROUP(obj);
799     if (tg->is_initialized) {
800         QTAILQ_REMOVE(&throttle_groups, tg, list);
801     }
802     qemu_mutex_destroy(&tg->lock);
803     g_free(tg->name);
804 }
805 
806 static void throttle_group_set(Object *obj, Visitor *v, const char * name,
807                                void *opaque, Error **errp)
808 
809 {
810     ThrottleGroup *tg = THROTTLE_GROUP(obj);
811     ThrottleConfig *cfg;
812     ThrottleParamInfo *info = opaque;
813     Error *local_err = NULL;
814     int64_t value;
815 
816     /* If we have finished initialization, don't accept individual property
817      * changes through QOM. Throttle configuration limits must be set in one
818      * transaction, as certain combinations are invalid.
819      */
820     if (tg->is_initialized) {
821         error_setg(&local_err, "Property cannot be set after initialization");
822         goto ret;
823     }
824 
825     visit_type_int64(v, name, &value, &local_err);
826     if (local_err) {
827         goto ret;
828     }
829     if (value < 0) {
830         error_setg(&local_err, "Property values cannot be negative");
831         goto ret;
832     }
833 
834     cfg = &tg->ts.cfg;
835     switch (info->category) {
836     case AVG:
837         cfg->buckets[info->type].avg = value;
838         break;
839     case MAX:
840         cfg->buckets[info->type].max = value;
841         break;
842     case BURST_LENGTH:
843         if (value > UINT_MAX) {
844             error_setg(&local_err, "%s value must be in the"
845                        "range [0, %u]", info->name, UINT_MAX);
846             goto ret;
847         }
848         cfg->buckets[info->type].burst_length = value;
849         break;
850     case IOPS_SIZE:
851         cfg->op_size = value;
852         break;
853     }
854 
855 ret:
856     error_propagate(errp, local_err);
857     return;
858 
859 }
860 
861 static void throttle_group_get(Object *obj, Visitor *v, const char *name,
862                                void *opaque, Error **errp)
863 {
864     ThrottleGroup *tg = THROTTLE_GROUP(obj);
865     ThrottleConfig cfg;
866     ThrottleParamInfo *info = opaque;
867     int64_t value;
868 
869     throttle_get_config(&tg->ts, &cfg);
870     switch (info->category) {
871     case AVG:
872         value = cfg.buckets[info->type].avg;
873         break;
874     case MAX:
875         value = cfg.buckets[info->type].max;
876         break;
877     case BURST_LENGTH:
878         value = cfg.buckets[info->type].burst_length;
879         break;
880     case IOPS_SIZE:
881         value = cfg.op_size;
882         break;
883     }
884 
885     visit_type_int64(v, name, &value, errp);
886 }
887 
888 static void throttle_group_set_limits(Object *obj, Visitor *v,
889                                       const char *name, void *opaque,
890                                       Error **errp)
891 
892 {
893     ThrottleGroup *tg = THROTTLE_GROUP(obj);
894     ThrottleConfig cfg;
895     ThrottleLimits arg = { 0 };
896     ThrottleLimits *argp = &arg;
897     Error *local_err = NULL;
898 
899     visit_type_ThrottleLimits(v, name, &argp, &local_err);
900     if (local_err) {
901         goto ret;
902     }
903     qemu_mutex_lock(&tg->lock);
904     throttle_get_config(&tg->ts, &cfg);
905     throttle_limits_to_config(argp, &cfg, &local_err);
906     if (local_err) {
907         goto unlock;
908     }
909     throttle_config(&tg->ts, tg->clock_type, &cfg);
910 
911 unlock:
912     qemu_mutex_unlock(&tg->lock);
913 ret:
914     error_propagate(errp, local_err);
915     return;
916 }
917 
918 static void throttle_group_get_limits(Object *obj, Visitor *v,
919                                       const char *name, void *opaque,
920                                       Error **errp)
921 {
922     ThrottleGroup *tg = THROTTLE_GROUP(obj);
923     ThrottleConfig cfg;
924     ThrottleLimits arg = { 0 };
925     ThrottleLimits *argp = &arg;
926 
927     qemu_mutex_lock(&tg->lock);
928     throttle_get_config(&tg->ts, &cfg);
929     qemu_mutex_unlock(&tg->lock);
930 
931     throttle_config_to_limits(&cfg, argp);
932 
933     visit_type_ThrottleLimits(v, name, &argp, errp);
934 }
935 
936 static bool throttle_group_can_be_deleted(UserCreatable *uc)
937 {
938     return OBJECT(uc)->ref == 1;
939 }
940 
941 static void throttle_group_obj_class_init(ObjectClass *klass, void *class_data)
942 {
943     size_t i = 0;
944     UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
945 
946     ucc->complete = throttle_group_obj_complete;
947     ucc->can_be_deleted = throttle_group_can_be_deleted;
948 
949     /* individual properties */
950     for (i = 0; i < sizeof(properties) / sizeof(ThrottleParamInfo); i++) {
951         object_class_property_add(klass,
952                                   properties[i].name,
953                                   "int",
954                                   throttle_group_get,
955                                   throttle_group_set,
956                                   NULL, &properties[i],
957                                   &error_abort);
958     }
959 
960     /* ThrottleLimits */
961     object_class_property_add(klass,
962                               "limits", "ThrottleLimits",
963                               throttle_group_get_limits,
964                               throttle_group_set_limits,
965                               NULL, NULL,
966                               &error_abort);
967 }
968 
969 static const TypeInfo throttle_group_info = {
970     .name = TYPE_THROTTLE_GROUP,
971     .parent = TYPE_OBJECT,
972     .class_init = throttle_group_obj_class_init,
973     .instance_size = sizeof(ThrottleGroup),
974     .instance_init = throttle_group_obj_init,
975     .instance_finalize = throttle_group_obj_finalize,
976     .interfaces = (InterfaceInfo[]) {
977         { TYPE_USER_CREATABLE },
978         { }
979     },
980 };
981 
982 static void throttle_groups_init(void)
983 {
984     type_register_static(&throttle_group_info);
985 }
986 
987 type_init(throttle_groups_init);
988