Lines Matching refs:pool
26 static void do_spawn_thread(ThreadPool *pool);
38 ThreadPool *pool; member
79 ThreadPool *pool = opaque; in worker_thread() local
81 qemu_mutex_lock(&pool->lock); in worker_thread()
82 pool->pending_threads--; in worker_thread()
83 do_spawn_thread(pool); in worker_thread()
85 while (pool->cur_threads <= pool->max_threads) { in worker_thread()
89 if (QTAILQ_EMPTY(&pool->request_list)) { in worker_thread()
90 pool->idle_threads++; in worker_thread()
91 ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000); in worker_thread()
92 pool->idle_threads--; in worker_thread()
94 QTAILQ_EMPTY(&pool->request_list) && in worker_thread()
95 pool->cur_threads > pool->min_threads) { in worker_thread()
106 req = QTAILQ_FIRST(&pool->request_list); in worker_thread()
107 QTAILQ_REMOVE(&pool->request_list, req, reqs); in worker_thread()
109 qemu_mutex_unlock(&pool->lock); in worker_thread()
118 qemu_bh_schedule(pool->completion_bh); in worker_thread()
119 qemu_mutex_lock(&pool->lock); in worker_thread()
122 pool->cur_threads--; in worker_thread()
123 qemu_cond_signal(&pool->worker_stopped); in worker_thread()
129 qemu_cond_signal(&pool->request_cond); in worker_thread()
130 qemu_mutex_unlock(&pool->lock); in worker_thread()
134 static void do_spawn_thread(ThreadPool *pool) in do_spawn_thread() argument
139 if (!pool->new_threads) { in do_spawn_thread()
143 pool->new_threads--; in do_spawn_thread()
144 pool->pending_threads++; in do_spawn_thread()
146 qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED); in do_spawn_thread()
151 ThreadPool *pool = opaque; in spawn_thread_bh_fn() local
153 qemu_mutex_lock(&pool->lock); in spawn_thread_bh_fn()
154 do_spawn_thread(pool); in spawn_thread_bh_fn()
155 qemu_mutex_unlock(&pool->lock); in spawn_thread_bh_fn()
158 static void spawn_thread(ThreadPool *pool) in spawn_thread() argument
160 pool->cur_threads++; in spawn_thread()
161 pool->new_threads++; in spawn_thread()
169 if (!pool->pending_threads) { in spawn_thread()
170 qemu_bh_schedule(pool->new_thread_bh); in spawn_thread()
176 ThreadPool *pool = opaque; in thread_pool_completion_bh() local
182 QLIST_FOREACH_SAFE(elem, &pool->head, all, next) { in thread_pool_completion_bh()
187 trace_thread_pool_complete(pool, elem, elem->common.opaque, in thread_pool_completion_bh()
198 qemu_bh_schedule(pool->completion_bh); in thread_pool_completion_bh()
206 qemu_bh_cancel(pool->completion_bh); in thread_pool_completion_bh()
221 ThreadPool *pool = elem->pool; in thread_pool_cancel() local
225 QEMU_LOCK_GUARD(&pool->lock); in thread_pool_cancel()
227 QTAILQ_REMOVE(&pool->request_list, elem, reqs); in thread_pool_cancel()
228 qemu_bh_schedule(pool->completion_bh); in thread_pool_cancel()
246 ThreadPool *pool = aio_get_thread_pool(ctx); in thread_pool_submit_aio() local
249 assert(pool->ctx == qemu_get_current_aio_context()); in thread_pool_submit_aio()
255 req->pool = pool; in thread_pool_submit_aio()
257 QLIST_INSERT_HEAD(&pool->head, req, all); in thread_pool_submit_aio()
259 trace_thread_pool_submit(pool, req, arg); in thread_pool_submit_aio()
261 qemu_mutex_lock(&pool->lock); in thread_pool_submit_aio()
262 if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) { in thread_pool_submit_aio()
263 spawn_thread(pool); in thread_pool_submit_aio()
265 QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); in thread_pool_submit_aio()
266 qemu_mutex_unlock(&pool->lock); in thread_pool_submit_aio()
267 qemu_cond_signal(&pool->request_cond); in thread_pool_submit_aio()
298 void thread_pool_update_params(ThreadPool *pool, AioContext *ctx) in thread_pool_update_params() argument
300 qemu_mutex_lock(&pool->lock); in thread_pool_update_params()
302 pool->min_threads = ctx->thread_pool_min; in thread_pool_update_params()
303 pool->max_threads = ctx->thread_pool_max; in thread_pool_update_params()
314 for (int i = pool->cur_threads; i < pool->min_threads; i++) { in thread_pool_update_params()
315 spawn_thread(pool); in thread_pool_update_params()
318 for (int i = pool->cur_threads; i > pool->max_threads; i--) { in thread_pool_update_params()
319 qemu_cond_signal(&pool->request_cond); in thread_pool_update_params()
322 qemu_mutex_unlock(&pool->lock); in thread_pool_update_params()
325 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) in thread_pool_init_one() argument
331 memset(pool, 0, sizeof(*pool)); in thread_pool_init_one()
332 pool->ctx = ctx; in thread_pool_init_one()
333 pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool); in thread_pool_init_one()
334 qemu_mutex_init(&pool->lock); in thread_pool_init_one()
335 qemu_cond_init(&pool->worker_stopped); in thread_pool_init_one()
336 qemu_cond_init(&pool->request_cond); in thread_pool_init_one()
337 pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); in thread_pool_init_one()
339 QLIST_INIT(&pool->head); in thread_pool_init_one()
340 QTAILQ_INIT(&pool->request_list); in thread_pool_init_one()
342 thread_pool_update_params(pool, ctx); in thread_pool_init_one()
347 ThreadPool *pool = g_new(ThreadPool, 1); in thread_pool_new() local
348 thread_pool_init_one(pool, ctx); in thread_pool_new()
349 return pool; in thread_pool_new()
352 void thread_pool_free(ThreadPool *pool) in thread_pool_free() argument
354 if (!pool) { in thread_pool_free()
358 assert(QLIST_EMPTY(&pool->head)); in thread_pool_free()
360 qemu_mutex_lock(&pool->lock); in thread_pool_free()
363 qemu_bh_delete(pool->new_thread_bh); in thread_pool_free()
364 pool->cur_threads -= pool->new_threads; in thread_pool_free()
365 pool->new_threads = 0; in thread_pool_free()
368 pool->max_threads = 0; in thread_pool_free()
369 qemu_cond_broadcast(&pool->request_cond); in thread_pool_free()
370 while (pool->cur_threads > 0) { in thread_pool_free()
371 qemu_cond_wait(&pool->worker_stopped, &pool->lock); in thread_pool_free()
374 qemu_mutex_unlock(&pool->lock); in thread_pool_free()
376 qemu_bh_delete(pool->completion_bh); in thread_pool_free()
377 qemu_cond_destroy(&pool->request_cond); in thread_pool_free()
378 qemu_cond_destroy(&pool->worker_stopped); in thread_pool_free()
379 qemu_mutex_destroy(&pool->lock); in thread_pool_free()
380 g_free(pool); in thread_pool_free()