xref: /openbmc/qemu/util/thread-pool.c (revision 84d61e5f)
1c2b38b27SPaolo Bonzini /*
2c2b38b27SPaolo Bonzini  * QEMU block layer thread pool
3c2b38b27SPaolo Bonzini  *
4c2b38b27SPaolo Bonzini  * Copyright IBM, Corp. 2008
5c2b38b27SPaolo Bonzini  * Copyright Red Hat, Inc. 2012
6c2b38b27SPaolo Bonzini  *
7c2b38b27SPaolo Bonzini  * Authors:
8c2b38b27SPaolo Bonzini  *  Anthony Liguori   <aliguori@us.ibm.com>
9c2b38b27SPaolo Bonzini  *  Paolo Bonzini     <pbonzini@redhat.com>
10c2b38b27SPaolo Bonzini  *
11c2b38b27SPaolo Bonzini  * This work is licensed under the terms of the GNU GPL, version 2.  See
12c2b38b27SPaolo Bonzini  * the COPYING file in the top-level directory.
13c2b38b27SPaolo Bonzini  *
14c2b38b27SPaolo Bonzini  * Contributions after 2012-01-13 are licensed under the terms of the
15c2b38b27SPaolo Bonzini  * GNU GPL, version 2 or (at your option) any later version.
16c2b38b27SPaolo Bonzini  */
17c2b38b27SPaolo Bonzini #include "qemu/osdep.h"
18*84d61e5fSStefan Hajnoczi #include "qemu/defer-call.h"
19c2b38b27SPaolo Bonzini #include "qemu/queue.h"
20c2b38b27SPaolo Bonzini #include "qemu/thread.h"
21c2b38b27SPaolo Bonzini #include "qemu/coroutine.h"
22c2b38b27SPaolo Bonzini #include "trace.h"
23c2b38b27SPaolo Bonzini #include "block/thread-pool.h"
24c2b38b27SPaolo Bonzini #include "qemu/main-loop.h"
25c2b38b27SPaolo Bonzini 
26c2b38b27SPaolo Bonzini static void do_spawn_thread(ThreadPool *pool);
27c2b38b27SPaolo Bonzini 
28c2b38b27SPaolo Bonzini typedef struct ThreadPoolElement ThreadPoolElement;
29c2b38b27SPaolo Bonzini 
30c2b38b27SPaolo Bonzini enum ThreadState {
31c2b38b27SPaolo Bonzini     THREAD_QUEUED,
32c2b38b27SPaolo Bonzini     THREAD_ACTIVE,
33c2b38b27SPaolo Bonzini     THREAD_DONE,
34c2b38b27SPaolo Bonzini };
35c2b38b27SPaolo Bonzini 
36c2b38b27SPaolo Bonzini struct ThreadPoolElement {
37c2b38b27SPaolo Bonzini     BlockAIOCB common;
38c2b38b27SPaolo Bonzini     ThreadPool *pool;
39c2b38b27SPaolo Bonzini     ThreadPoolFunc *func;
40c2b38b27SPaolo Bonzini     void *arg;
41c2b38b27SPaolo Bonzini 
42c2b38b27SPaolo Bonzini     /* Moving state out of THREAD_QUEUED is protected by lock.  After
43c2b38b27SPaolo Bonzini      * that, only the worker thread can write to it.  Reads and writes
44c2b38b27SPaolo Bonzini      * of state and ret are ordered with memory barriers.
45c2b38b27SPaolo Bonzini      */
46c2b38b27SPaolo Bonzini     enum ThreadState state;
47c2b38b27SPaolo Bonzini     int ret;
48c2b38b27SPaolo Bonzini 
49c2b38b27SPaolo Bonzini     /* Access to this list is protected by lock.  */
50c2b38b27SPaolo Bonzini     QTAILQ_ENTRY(ThreadPoolElement) reqs;
51c2b38b27SPaolo Bonzini 
520fdb7311SEmanuele Giuseppe Esposito     /* This list is only written by the thread pool's mother thread.  */
53c2b38b27SPaolo Bonzini     QLIST_ENTRY(ThreadPoolElement) all;
54c2b38b27SPaolo Bonzini };
55c2b38b27SPaolo Bonzini 
56c2b38b27SPaolo Bonzini struct ThreadPool {
57c2b38b27SPaolo Bonzini     AioContext *ctx;
58c2b38b27SPaolo Bonzini     QEMUBH *completion_bh;
59c2b38b27SPaolo Bonzini     QemuMutex lock;
60c2b38b27SPaolo Bonzini     QemuCond worker_stopped;
61900fa208SPaolo Bonzini     QemuCond request_cond;
62c2b38b27SPaolo Bonzini     QEMUBH *new_thread_bh;
63c2b38b27SPaolo Bonzini 
64c2b38b27SPaolo Bonzini     /* The following variables are only accessed from one AioContext. */
65c2b38b27SPaolo Bonzini     QLIST_HEAD(, ThreadPoolElement) head;
66c2b38b27SPaolo Bonzini 
67c2b38b27SPaolo Bonzini     /* The following variables are protected by lock.  */
68c2b38b27SPaolo Bonzini     QTAILQ_HEAD(, ThreadPoolElement) request_list;
69c2b38b27SPaolo Bonzini     int cur_threads;
70c2b38b27SPaolo Bonzini     int idle_threads;
71c2b38b27SPaolo Bonzini     int new_threads;     /* backlog of threads we need to create */
72c2b38b27SPaolo Bonzini     int pending_threads; /* threads created but not running yet */
7371ad4713SNicolas Saenz Julienne     int min_threads;
7471ad4713SNicolas Saenz Julienne     int max_threads;
75c2b38b27SPaolo Bonzini };
76c2b38b27SPaolo Bonzini 
worker_thread(void * opaque)77c2b38b27SPaolo Bonzini static void *worker_thread(void *opaque)
78c2b38b27SPaolo Bonzini {
79c2b38b27SPaolo Bonzini     ThreadPool *pool = opaque;
80c2b38b27SPaolo Bonzini 
81c2b38b27SPaolo Bonzini     qemu_mutex_lock(&pool->lock);
82c2b38b27SPaolo Bonzini     pool->pending_threads--;
83c2b38b27SPaolo Bonzini     do_spawn_thread(pool);
84c2b38b27SPaolo Bonzini 
85232e9255SPaolo Bonzini     while (pool->cur_threads <= pool->max_threads) {
86c2b38b27SPaolo Bonzini         ThreadPoolElement *req;
87c2b38b27SPaolo Bonzini         int ret;
88c2b38b27SPaolo Bonzini 
89900fa208SPaolo Bonzini         if (QTAILQ_EMPTY(&pool->request_list)) {
90c2b38b27SPaolo Bonzini             pool->idle_threads++;
91900fa208SPaolo Bonzini             ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
92c2b38b27SPaolo Bonzini             pool->idle_threads--;
93900fa208SPaolo Bonzini             if (ret == 0 &&
94900fa208SPaolo Bonzini                 QTAILQ_EMPTY(&pool->request_list) &&
95900fa208SPaolo Bonzini                 pool->cur_threads > pool->min_threads) {
96900fa208SPaolo Bonzini                 /* Timed out + no work to do + no need for warm threads = exit.  */
97c2b38b27SPaolo Bonzini                 break;
98c2b38b27SPaolo Bonzini             }
99900fa208SPaolo Bonzini             /*
100900fa208SPaolo Bonzini              * Even if there was some work to do, check if there aren't
101900fa208SPaolo Bonzini              * too many worker threads before picking it up.
102900fa208SPaolo Bonzini              */
103900fa208SPaolo Bonzini             continue;
104900fa208SPaolo Bonzini         }
105c2b38b27SPaolo Bonzini 
106c2b38b27SPaolo Bonzini         req = QTAILQ_FIRST(&pool->request_list);
107c2b38b27SPaolo Bonzini         QTAILQ_REMOVE(&pool->request_list, req, reqs);
108c2b38b27SPaolo Bonzini         req->state = THREAD_ACTIVE;
109c2b38b27SPaolo Bonzini         qemu_mutex_unlock(&pool->lock);
110c2b38b27SPaolo Bonzini 
111c2b38b27SPaolo Bonzini         ret = req->func(req->arg);
112c2b38b27SPaolo Bonzini 
113c2b38b27SPaolo Bonzini         req->ret = ret;
114c2b38b27SPaolo Bonzini         /* Write ret before state.  */
115c2b38b27SPaolo Bonzini         smp_wmb();
116c2b38b27SPaolo Bonzini         req->state = THREAD_DONE;
117c2b38b27SPaolo Bonzini 
118c2b38b27SPaolo Bonzini         qemu_bh_schedule(pool->completion_bh);
1193c7b72ddSPaolo Bonzini         qemu_mutex_lock(&pool->lock);
120c2b38b27SPaolo Bonzini     }
121c2b38b27SPaolo Bonzini 
122c2b38b27SPaolo Bonzini     pool->cur_threads--;
123c2b38b27SPaolo Bonzini     qemu_cond_signal(&pool->worker_stopped);
124900fa208SPaolo Bonzini 
125900fa208SPaolo Bonzini     /*
126900fa208SPaolo Bonzini      * Wake up another thread, in case we got a wakeup but decided
127900fa208SPaolo Bonzini      * to exit due to pool->cur_threads > pool->max_threads.
128900fa208SPaolo Bonzini      */
129900fa208SPaolo Bonzini     qemu_cond_signal(&pool->request_cond);
130f4f71363SAnthony PERARD     qemu_mutex_unlock(&pool->lock);
131c2b38b27SPaolo Bonzini     return NULL;
132c2b38b27SPaolo Bonzini }
133c2b38b27SPaolo Bonzini 
do_spawn_thread(ThreadPool * pool)134c2b38b27SPaolo Bonzini static void do_spawn_thread(ThreadPool *pool)
135c2b38b27SPaolo Bonzini {
136c2b38b27SPaolo Bonzini     QemuThread t;
137c2b38b27SPaolo Bonzini 
138c2b38b27SPaolo Bonzini     /* Runs with lock taken.  */
139c2b38b27SPaolo Bonzini     if (!pool->new_threads) {
140c2b38b27SPaolo Bonzini         return;
141c2b38b27SPaolo Bonzini     }
142c2b38b27SPaolo Bonzini 
143c2b38b27SPaolo Bonzini     pool->new_threads--;
144c2b38b27SPaolo Bonzini     pool->pending_threads++;
145c2b38b27SPaolo Bonzini 
146c2b38b27SPaolo Bonzini     qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED);
147c2b38b27SPaolo Bonzini }
148c2b38b27SPaolo Bonzini 
spawn_thread_bh_fn(void * opaque)149c2b38b27SPaolo Bonzini static void spawn_thread_bh_fn(void *opaque)
150c2b38b27SPaolo Bonzini {
151c2b38b27SPaolo Bonzini     ThreadPool *pool = opaque;
152c2b38b27SPaolo Bonzini 
153c2b38b27SPaolo Bonzini     qemu_mutex_lock(&pool->lock);
154c2b38b27SPaolo Bonzini     do_spawn_thread(pool);
155c2b38b27SPaolo Bonzini     qemu_mutex_unlock(&pool->lock);
156c2b38b27SPaolo Bonzini }
157c2b38b27SPaolo Bonzini 
spawn_thread(ThreadPool * pool)158c2b38b27SPaolo Bonzini static void spawn_thread(ThreadPool *pool)
159c2b38b27SPaolo Bonzini {
160c2b38b27SPaolo Bonzini     pool->cur_threads++;
161c2b38b27SPaolo Bonzini     pool->new_threads++;
162c2b38b27SPaolo Bonzini     /* If there are threads being created, they will spawn new workers, so
163c2b38b27SPaolo Bonzini      * we don't spend time creating many threads in a loop holding a mutex or
164c2b38b27SPaolo Bonzini      * starving the current vcpu.
165c2b38b27SPaolo Bonzini      *
166c2b38b27SPaolo Bonzini      * If there are no idle threads, ask the main thread to create one, so we
167c2b38b27SPaolo Bonzini      * inherit the correct affinity instead of the vcpu affinity.
168c2b38b27SPaolo Bonzini      */
169c2b38b27SPaolo Bonzini     if (!pool->pending_threads) {
170c2b38b27SPaolo Bonzini         qemu_bh_schedule(pool->new_thread_bh);
171c2b38b27SPaolo Bonzini     }
172c2b38b27SPaolo Bonzini }
173c2b38b27SPaolo Bonzini 
thread_pool_completion_bh(void * opaque)174c2b38b27SPaolo Bonzini static void thread_pool_completion_bh(void *opaque)
175c2b38b27SPaolo Bonzini {
176c2b38b27SPaolo Bonzini     ThreadPool *pool = opaque;
177c2b38b27SPaolo Bonzini     ThreadPoolElement *elem, *next;
178c2b38b27SPaolo Bonzini 
179*84d61e5fSStefan Hajnoczi     defer_call_begin(); /* cb() may use defer_call() to coalesce work */
180*84d61e5fSStefan Hajnoczi 
181c2b38b27SPaolo Bonzini restart:
182c2b38b27SPaolo Bonzini     QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
183c2b38b27SPaolo Bonzini         if (elem->state != THREAD_DONE) {
184c2b38b27SPaolo Bonzini             continue;
185c2b38b27SPaolo Bonzini         }
186c2b38b27SPaolo Bonzini 
187c2b38b27SPaolo Bonzini         trace_thread_pool_complete(pool, elem, elem->common.opaque,
188c2b38b27SPaolo Bonzini                                    elem->ret);
189c2b38b27SPaolo Bonzini         QLIST_REMOVE(elem, all);
190c2b38b27SPaolo Bonzini 
191c2b38b27SPaolo Bonzini         if (elem->common.cb) {
192c2b38b27SPaolo Bonzini             /* Read state before ret.  */
193c2b38b27SPaolo Bonzini             smp_rmb();
194c2b38b27SPaolo Bonzini 
195c2b38b27SPaolo Bonzini             /* Schedule ourselves in case elem->common.cb() calls aio_poll() to
196c2b38b27SPaolo Bonzini              * wait for another request that completed at the same time.
197c2b38b27SPaolo Bonzini              */
198c2b38b27SPaolo Bonzini             qemu_bh_schedule(pool->completion_bh);
199c2b38b27SPaolo Bonzini 
200c2b38b27SPaolo Bonzini             elem->common.cb(elem->common.opaque, elem->ret);
201b7a745dcSPeter Lieven 
202b7a745dcSPeter Lieven             /* We can safely cancel the completion_bh here regardless of someone
203b7a745dcSPeter Lieven              * else having scheduled it meanwhile because we reenter the
204b7a745dcSPeter Lieven              * completion function anyway (goto restart).
205b7a745dcSPeter Lieven              */
206b7a745dcSPeter Lieven             qemu_bh_cancel(pool->completion_bh);
207b7a745dcSPeter Lieven 
208c2b38b27SPaolo Bonzini             qemu_aio_unref(elem);
209c2b38b27SPaolo Bonzini             goto restart;
210c2b38b27SPaolo Bonzini         } else {
211c2b38b27SPaolo Bonzini             qemu_aio_unref(elem);
212c2b38b27SPaolo Bonzini         }
213c2b38b27SPaolo Bonzini     }
214*84d61e5fSStefan Hajnoczi 
215*84d61e5fSStefan Hajnoczi     defer_call_end();
216c2b38b27SPaolo Bonzini }
217c2b38b27SPaolo Bonzini 
thread_pool_cancel(BlockAIOCB * acb)218c2b38b27SPaolo Bonzini static void thread_pool_cancel(BlockAIOCB *acb)
219c2b38b27SPaolo Bonzini {
220c2b38b27SPaolo Bonzini     ThreadPoolElement *elem = (ThreadPoolElement *)acb;
221c2b38b27SPaolo Bonzini     ThreadPool *pool = elem->pool;
222c2b38b27SPaolo Bonzini 
223c2b38b27SPaolo Bonzini     trace_thread_pool_cancel(elem, elem->common.opaque);
224c2b38b27SPaolo Bonzini 
2256e8a355dSDaniel Brodsky     QEMU_LOCK_GUARD(&pool->lock);
226900fa208SPaolo Bonzini     if (elem->state == THREAD_QUEUED) {
227c2b38b27SPaolo Bonzini         QTAILQ_REMOVE(&pool->request_list, elem, reqs);
228c2b38b27SPaolo Bonzini         qemu_bh_schedule(pool->completion_bh);
229c2b38b27SPaolo Bonzini 
230c2b38b27SPaolo Bonzini         elem->state = THREAD_DONE;
231c2b38b27SPaolo Bonzini         elem->ret = -ECANCELED;
232c2b38b27SPaolo Bonzini     }
233c2b38b27SPaolo Bonzini 
234c2b38b27SPaolo Bonzini }
235c2b38b27SPaolo Bonzini 
236c2b38b27SPaolo Bonzini static const AIOCBInfo thread_pool_aiocb_info = {
237c2b38b27SPaolo Bonzini     .aiocb_size         = sizeof(ThreadPoolElement),
238c2b38b27SPaolo Bonzini     .cancel_async       = thread_pool_cancel,
239c2b38b27SPaolo Bonzini };
240c2b38b27SPaolo Bonzini 
thread_pool_submit_aio(ThreadPoolFunc * func,void * arg,BlockCompletionFunc * cb,void * opaque)241aef04fc7SEmanuele Giuseppe Esposito BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
242c2b38b27SPaolo Bonzini                                    BlockCompletionFunc *cb, void *opaque)
243c2b38b27SPaolo Bonzini {
244c2b38b27SPaolo Bonzini     ThreadPoolElement *req;
245aef04fc7SEmanuele Giuseppe Esposito     AioContext *ctx = qemu_get_current_aio_context();
246aef04fc7SEmanuele Giuseppe Esposito     ThreadPool *pool = aio_get_thread_pool(ctx);
247c2b38b27SPaolo Bonzini 
2480fdb7311SEmanuele Giuseppe Esposito     /* Assert that the thread submitting work is the same running the pool */
2490fdb7311SEmanuele Giuseppe Esposito     assert(pool->ctx == qemu_get_current_aio_context());
2500fdb7311SEmanuele Giuseppe Esposito 
251c2b38b27SPaolo Bonzini     req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
252c2b38b27SPaolo Bonzini     req->func = func;
253c2b38b27SPaolo Bonzini     req->arg = arg;
254c2b38b27SPaolo Bonzini     req->state = THREAD_QUEUED;
255c2b38b27SPaolo Bonzini     req->pool = pool;
256c2b38b27SPaolo Bonzini 
257c2b38b27SPaolo Bonzini     QLIST_INSERT_HEAD(&pool->head, req, all);
258c2b38b27SPaolo Bonzini 
259c2b38b27SPaolo Bonzini     trace_thread_pool_submit(pool, req, arg);
260c2b38b27SPaolo Bonzini 
261c2b38b27SPaolo Bonzini     qemu_mutex_lock(&pool->lock);
262c2b38b27SPaolo Bonzini     if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
263c2b38b27SPaolo Bonzini         spawn_thread(pool);
264c2b38b27SPaolo Bonzini     }
265c2b38b27SPaolo Bonzini     QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
266c2b38b27SPaolo Bonzini     qemu_mutex_unlock(&pool->lock);
267900fa208SPaolo Bonzini     qemu_cond_signal(&pool->request_cond);
268c2b38b27SPaolo Bonzini     return &req->common;
269c2b38b27SPaolo Bonzini }
270c2b38b27SPaolo Bonzini 
271c2b38b27SPaolo Bonzini typedef struct ThreadPoolCo {
272c2b38b27SPaolo Bonzini     Coroutine *co;
273c2b38b27SPaolo Bonzini     int ret;
274c2b38b27SPaolo Bonzini } ThreadPoolCo;
275c2b38b27SPaolo Bonzini 
thread_pool_co_cb(void * opaque,int ret)276c2b38b27SPaolo Bonzini static void thread_pool_co_cb(void *opaque, int ret)
277c2b38b27SPaolo Bonzini {
278c2b38b27SPaolo Bonzini     ThreadPoolCo *co = opaque;
279c2b38b27SPaolo Bonzini 
280c2b38b27SPaolo Bonzini     co->ret = ret;
281b9e413ddSPaolo Bonzini     aio_co_wake(co->co);
282c2b38b27SPaolo Bonzini }
283c2b38b27SPaolo Bonzini 
thread_pool_submit_co(ThreadPoolFunc * func,void * arg)284aef04fc7SEmanuele Giuseppe Esposito int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
285c2b38b27SPaolo Bonzini {
286c2b38b27SPaolo Bonzini     ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
287c2b38b27SPaolo Bonzini     assert(qemu_in_coroutine());
288aef04fc7SEmanuele Giuseppe Esposito     thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
289c2b38b27SPaolo Bonzini     qemu_coroutine_yield();
290c2b38b27SPaolo Bonzini     return tpc.ret;
291c2b38b27SPaolo Bonzini }
292c2b38b27SPaolo Bonzini 
thread_pool_submit(ThreadPoolFunc * func,void * arg)293aef04fc7SEmanuele Giuseppe Esposito void thread_pool_submit(ThreadPoolFunc *func, void *arg)
294c2b38b27SPaolo Bonzini {
295aef04fc7SEmanuele Giuseppe Esposito     thread_pool_submit_aio(func, arg, NULL, NULL);
296c2b38b27SPaolo Bonzini }
297c2b38b27SPaolo Bonzini 
thread_pool_update_params(ThreadPool * pool,AioContext * ctx)29871ad4713SNicolas Saenz Julienne void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
29971ad4713SNicolas Saenz Julienne {
30071ad4713SNicolas Saenz Julienne     qemu_mutex_lock(&pool->lock);
30171ad4713SNicolas Saenz Julienne 
30271ad4713SNicolas Saenz Julienne     pool->min_threads = ctx->thread_pool_min;
30371ad4713SNicolas Saenz Julienne     pool->max_threads = ctx->thread_pool_max;
30471ad4713SNicolas Saenz Julienne 
30571ad4713SNicolas Saenz Julienne     /*
30671ad4713SNicolas Saenz Julienne      * We either have to:
30771ad4713SNicolas Saenz Julienne      *  - Increase the number available of threads until over the min_threads
30871ad4713SNicolas Saenz Julienne      *    threshold.
309900fa208SPaolo Bonzini      *  - Bump the worker threads so that they exit, until under the max_threads
31071ad4713SNicolas Saenz Julienne      *    threshold.
31171ad4713SNicolas Saenz Julienne      *  - Do nothing. The current number of threads fall in between the min and
31271ad4713SNicolas Saenz Julienne      *    max thresholds. We'll let the pool manage itself.
31371ad4713SNicolas Saenz Julienne      */
31471ad4713SNicolas Saenz Julienne     for (int i = pool->cur_threads; i < pool->min_threads; i++) {
31571ad4713SNicolas Saenz Julienne         spawn_thread(pool);
31671ad4713SNicolas Saenz Julienne     }
31771ad4713SNicolas Saenz Julienne 
31871ad4713SNicolas Saenz Julienne     for (int i = pool->cur_threads; i > pool->max_threads; i--) {
319900fa208SPaolo Bonzini         qemu_cond_signal(&pool->request_cond);
32071ad4713SNicolas Saenz Julienne     }
32171ad4713SNicolas Saenz Julienne 
32271ad4713SNicolas Saenz Julienne     qemu_mutex_unlock(&pool->lock);
32371ad4713SNicolas Saenz Julienne }
32471ad4713SNicolas Saenz Julienne 
thread_pool_init_one(ThreadPool * pool,AioContext * ctx)325c2b38b27SPaolo Bonzini static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
326c2b38b27SPaolo Bonzini {
327c2b38b27SPaolo Bonzini     if (!ctx) {
328c2b38b27SPaolo Bonzini         ctx = qemu_get_aio_context();
329c2b38b27SPaolo Bonzini     }
330c2b38b27SPaolo Bonzini 
331c2b38b27SPaolo Bonzini     memset(pool, 0, sizeof(*pool));
332c2b38b27SPaolo Bonzini     pool->ctx = ctx;
333c2b38b27SPaolo Bonzini     pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool);
334c2b38b27SPaolo Bonzini     qemu_mutex_init(&pool->lock);
335c2b38b27SPaolo Bonzini     qemu_cond_init(&pool->worker_stopped);
336900fa208SPaolo Bonzini     qemu_cond_init(&pool->request_cond);
337c2b38b27SPaolo Bonzini     pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
338c2b38b27SPaolo Bonzini 
339c2b38b27SPaolo Bonzini     QLIST_INIT(&pool->head);
340c2b38b27SPaolo Bonzini     QTAILQ_INIT(&pool->request_list);
34171ad4713SNicolas Saenz Julienne 
34271ad4713SNicolas Saenz Julienne     thread_pool_update_params(pool, ctx);
343c2b38b27SPaolo Bonzini }
344c2b38b27SPaolo Bonzini 
thread_pool_new(AioContext * ctx)345c2b38b27SPaolo Bonzini ThreadPool *thread_pool_new(AioContext *ctx)
346c2b38b27SPaolo Bonzini {
347c2b38b27SPaolo Bonzini     ThreadPool *pool = g_new(ThreadPool, 1);
348c2b38b27SPaolo Bonzini     thread_pool_init_one(pool, ctx);
349c2b38b27SPaolo Bonzini     return pool;
350c2b38b27SPaolo Bonzini }
351c2b38b27SPaolo Bonzini 
thread_pool_free(ThreadPool * pool)352c2b38b27SPaolo Bonzini void thread_pool_free(ThreadPool *pool)
353c2b38b27SPaolo Bonzini {
354c2b38b27SPaolo Bonzini     if (!pool) {
355c2b38b27SPaolo Bonzini         return;
356c2b38b27SPaolo Bonzini     }
357c2b38b27SPaolo Bonzini 
358c2b38b27SPaolo Bonzini     assert(QLIST_EMPTY(&pool->head));
359c2b38b27SPaolo Bonzini 
360c2b38b27SPaolo Bonzini     qemu_mutex_lock(&pool->lock);
361c2b38b27SPaolo Bonzini 
362c2b38b27SPaolo Bonzini     /* Stop new threads from spawning */
363c2b38b27SPaolo Bonzini     qemu_bh_delete(pool->new_thread_bh);
364c2b38b27SPaolo Bonzini     pool->cur_threads -= pool->new_threads;
365c2b38b27SPaolo Bonzini     pool->new_threads = 0;
366c2b38b27SPaolo Bonzini 
367c2b38b27SPaolo Bonzini     /* Wait for worker threads to terminate */
368232e9255SPaolo Bonzini     pool->max_threads = 0;
369900fa208SPaolo Bonzini     qemu_cond_broadcast(&pool->request_cond);
370c2b38b27SPaolo Bonzini     while (pool->cur_threads > 0) {
371c2b38b27SPaolo Bonzini         qemu_cond_wait(&pool->worker_stopped, &pool->lock);
372c2b38b27SPaolo Bonzini     }
373c2b38b27SPaolo Bonzini 
374c2b38b27SPaolo Bonzini     qemu_mutex_unlock(&pool->lock);
375c2b38b27SPaolo Bonzini 
376c2b38b27SPaolo Bonzini     qemu_bh_delete(pool->completion_bh);
377900fa208SPaolo Bonzini     qemu_cond_destroy(&pool->request_cond);
378c2b38b27SPaolo Bonzini     qemu_cond_destroy(&pool->worker_stopped);
379c2b38b27SPaolo Bonzini     qemu_mutex_destroy(&pool->lock);
380c2b38b27SPaolo Bonzini     g_free(pool);
381c2b38b27SPaolo Bonzini }
382