1 /* 2 * QEMU block layer thread pool 3 * 4 * Copyright IBM, Corp. 2008 5 * Copyright Red Hat, Inc. 2012 6 * 7 * Authors: 8 * Anthony Liguori <aliguori@us.ibm.com> 9 * Paolo Bonzini <pbonzini@redhat.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2. See 12 * the COPYING file in the top-level directory. 13 * 14 * Contributions after 2012-01-13 are licensed under the terms of the 15 * GNU GPL, version 2 or (at your option) any later version. 16 */ 17 #include "qemu/osdep.h" 18 #include "qemu/queue.h" 19 #include "qemu/thread.h" 20 #include "qemu/coroutine.h" 21 #include "trace.h" 22 #include "block/thread-pool.h" 23 #include "qemu/main-loop.h" 24 25 static void do_spawn_thread(ThreadPool *pool); 26 27 typedef struct ThreadPoolElement ThreadPoolElement; 28 29 enum ThreadState { 30 THREAD_QUEUED, 31 THREAD_ACTIVE, 32 THREAD_DONE, 33 }; 34 35 struct ThreadPoolElement { 36 BlockAIOCB common; 37 ThreadPool *pool; 38 ThreadPoolFunc *func; 39 void *arg; 40 41 /* Moving state out of THREAD_QUEUED is protected by lock. After 42 * that, only the worker thread can write to it. Reads and writes 43 * of state and ret are ordered with memory barriers. 44 */ 45 enum ThreadState state; 46 int ret; 47 48 /* Access to this list is protected by lock. */ 49 QTAILQ_ENTRY(ThreadPoolElement) reqs; 50 51 /* This list is only written by the thread pool's mother thread. */ 52 QLIST_ENTRY(ThreadPoolElement) all; 53 }; 54 55 struct ThreadPool { 56 AioContext *ctx; 57 QEMUBH *completion_bh; 58 QemuMutex lock; 59 QemuCond worker_stopped; 60 QemuCond request_cond; 61 QEMUBH *new_thread_bh; 62 63 /* The following variables are only accessed from one AioContext. */ 64 QLIST_HEAD(, ThreadPoolElement) head; 65 66 /* The following variables are protected by lock. */ 67 QTAILQ_HEAD(, ThreadPoolElement) request_list; 68 int cur_threads; 69 int idle_threads; 70 int new_threads; /* backlog of threads we need to create */ 71 int pending_threads; /* threads created but not running yet */ 72 int min_threads; 73 int max_threads; 74 }; 75 76 static void *worker_thread(void *opaque) 77 { 78 ThreadPool *pool = opaque; 79 80 qemu_mutex_lock(&pool->lock); 81 pool->pending_threads--; 82 do_spawn_thread(pool); 83 84 while (pool->cur_threads <= pool->max_threads) { 85 ThreadPoolElement *req; 86 int ret; 87 88 if (QTAILQ_EMPTY(&pool->request_list)) { 89 pool->idle_threads++; 90 ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000); 91 pool->idle_threads--; 92 if (ret == 0 && 93 QTAILQ_EMPTY(&pool->request_list) && 94 pool->cur_threads > pool->min_threads) { 95 /* Timed out + no work to do + no need for warm threads = exit. */ 96 break; 97 } 98 /* 99 * Even if there was some work to do, check if there aren't 100 * too many worker threads before picking it up. 101 */ 102 continue; 103 } 104 105 req = QTAILQ_FIRST(&pool->request_list); 106 QTAILQ_REMOVE(&pool->request_list, req, reqs); 107 req->state = THREAD_ACTIVE; 108 qemu_mutex_unlock(&pool->lock); 109 110 ret = req->func(req->arg); 111 112 req->ret = ret; 113 /* Write ret before state. */ 114 smp_wmb(); 115 req->state = THREAD_DONE; 116 117 qemu_bh_schedule(pool->completion_bh); 118 qemu_mutex_lock(&pool->lock); 119 } 120 121 pool->cur_threads--; 122 qemu_cond_signal(&pool->worker_stopped); 123 124 /* 125 * Wake up another thread, in case we got a wakeup but decided 126 * to exit due to pool->cur_threads > pool->max_threads. 127 */ 128 qemu_cond_signal(&pool->request_cond); 129 qemu_mutex_unlock(&pool->lock); 130 return NULL; 131 } 132 133 static void do_spawn_thread(ThreadPool *pool) 134 { 135 QemuThread t; 136 137 /* Runs with lock taken. */ 138 if (!pool->new_threads) { 139 return; 140 } 141 142 pool->new_threads--; 143 pool->pending_threads++; 144 145 qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED); 146 } 147 148 static void spawn_thread_bh_fn(void *opaque) 149 { 150 ThreadPool *pool = opaque; 151 152 qemu_mutex_lock(&pool->lock); 153 do_spawn_thread(pool); 154 qemu_mutex_unlock(&pool->lock); 155 } 156 157 static void spawn_thread(ThreadPool *pool) 158 { 159 pool->cur_threads++; 160 pool->new_threads++; 161 /* If there are threads being created, they will spawn new workers, so 162 * we don't spend time creating many threads in a loop holding a mutex or 163 * starving the current vcpu. 164 * 165 * If there are no idle threads, ask the main thread to create one, so we 166 * inherit the correct affinity instead of the vcpu affinity. 167 */ 168 if (!pool->pending_threads) { 169 qemu_bh_schedule(pool->new_thread_bh); 170 } 171 } 172 173 static void thread_pool_completion_bh(void *opaque) 174 { 175 ThreadPool *pool = opaque; 176 ThreadPoolElement *elem, *next; 177 178 restart: 179 QLIST_FOREACH_SAFE(elem, &pool->head, all, next) { 180 if (elem->state != THREAD_DONE) { 181 continue; 182 } 183 184 trace_thread_pool_complete(pool, elem, elem->common.opaque, 185 elem->ret); 186 QLIST_REMOVE(elem, all); 187 188 if (elem->common.cb) { 189 /* Read state before ret. */ 190 smp_rmb(); 191 192 /* Schedule ourselves in case elem->common.cb() calls aio_poll() to 193 * wait for another request that completed at the same time. 194 */ 195 qemu_bh_schedule(pool->completion_bh); 196 197 elem->common.cb(elem->common.opaque, elem->ret); 198 199 /* We can safely cancel the completion_bh here regardless of someone 200 * else having scheduled it meanwhile because we reenter the 201 * completion function anyway (goto restart). 202 */ 203 qemu_bh_cancel(pool->completion_bh); 204 205 qemu_aio_unref(elem); 206 goto restart; 207 } else { 208 qemu_aio_unref(elem); 209 } 210 } 211 } 212 213 static void thread_pool_cancel(BlockAIOCB *acb) 214 { 215 ThreadPoolElement *elem = (ThreadPoolElement *)acb; 216 ThreadPool *pool = elem->pool; 217 218 trace_thread_pool_cancel(elem, elem->common.opaque); 219 220 QEMU_LOCK_GUARD(&pool->lock); 221 if (elem->state == THREAD_QUEUED) { 222 QTAILQ_REMOVE(&pool->request_list, elem, reqs); 223 qemu_bh_schedule(pool->completion_bh); 224 225 elem->state = THREAD_DONE; 226 elem->ret = -ECANCELED; 227 } 228 229 } 230 231 static const AIOCBInfo thread_pool_aiocb_info = { 232 .aiocb_size = sizeof(ThreadPoolElement), 233 .cancel_async = thread_pool_cancel, 234 }; 235 236 BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, 237 BlockCompletionFunc *cb, void *opaque) 238 { 239 ThreadPoolElement *req; 240 AioContext *ctx = qemu_get_current_aio_context(); 241 ThreadPool *pool = aio_get_thread_pool(ctx); 242 243 /* Assert that the thread submitting work is the same running the pool */ 244 assert(pool->ctx == qemu_get_current_aio_context()); 245 246 req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque); 247 req->func = func; 248 req->arg = arg; 249 req->state = THREAD_QUEUED; 250 req->pool = pool; 251 252 QLIST_INSERT_HEAD(&pool->head, req, all); 253 254 trace_thread_pool_submit(pool, req, arg); 255 256 qemu_mutex_lock(&pool->lock); 257 if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) { 258 spawn_thread(pool); 259 } 260 QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); 261 qemu_mutex_unlock(&pool->lock); 262 qemu_cond_signal(&pool->request_cond); 263 return &req->common; 264 } 265 266 typedef struct ThreadPoolCo { 267 Coroutine *co; 268 int ret; 269 } ThreadPoolCo; 270 271 static void thread_pool_co_cb(void *opaque, int ret) 272 { 273 ThreadPoolCo *co = opaque; 274 275 co->ret = ret; 276 aio_co_wake(co->co); 277 } 278 279 int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg) 280 { 281 ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; 282 assert(qemu_in_coroutine()); 283 thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc); 284 qemu_coroutine_yield(); 285 return tpc.ret; 286 } 287 288 void thread_pool_submit(ThreadPoolFunc *func, void *arg) 289 { 290 thread_pool_submit_aio(func, arg, NULL, NULL); 291 } 292 293 void thread_pool_update_params(ThreadPool *pool, AioContext *ctx) 294 { 295 qemu_mutex_lock(&pool->lock); 296 297 pool->min_threads = ctx->thread_pool_min; 298 pool->max_threads = ctx->thread_pool_max; 299 300 /* 301 * We either have to: 302 * - Increase the number available of threads until over the min_threads 303 * threshold. 304 * - Bump the worker threads so that they exit, until under the max_threads 305 * threshold. 306 * - Do nothing. The current number of threads fall in between the min and 307 * max thresholds. We'll let the pool manage itself. 308 */ 309 for (int i = pool->cur_threads; i < pool->min_threads; i++) { 310 spawn_thread(pool); 311 } 312 313 for (int i = pool->cur_threads; i > pool->max_threads; i--) { 314 qemu_cond_signal(&pool->request_cond); 315 } 316 317 qemu_mutex_unlock(&pool->lock); 318 } 319 320 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) 321 { 322 if (!ctx) { 323 ctx = qemu_get_aio_context(); 324 } 325 326 memset(pool, 0, sizeof(*pool)); 327 pool->ctx = ctx; 328 pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool); 329 qemu_mutex_init(&pool->lock); 330 qemu_cond_init(&pool->worker_stopped); 331 qemu_cond_init(&pool->request_cond); 332 pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); 333 334 QLIST_INIT(&pool->head); 335 QTAILQ_INIT(&pool->request_list); 336 337 thread_pool_update_params(pool, ctx); 338 } 339 340 ThreadPool *thread_pool_new(AioContext *ctx) 341 { 342 ThreadPool *pool = g_new(ThreadPool, 1); 343 thread_pool_init_one(pool, ctx); 344 return pool; 345 } 346 347 void thread_pool_free(ThreadPool *pool) 348 { 349 if (!pool) { 350 return; 351 } 352 353 assert(QLIST_EMPTY(&pool->head)); 354 355 qemu_mutex_lock(&pool->lock); 356 357 /* Stop new threads from spawning */ 358 qemu_bh_delete(pool->new_thread_bh); 359 pool->cur_threads -= pool->new_threads; 360 pool->new_threads = 0; 361 362 /* Wait for worker threads to terminate */ 363 pool->max_threads = 0; 364 qemu_cond_broadcast(&pool->request_cond); 365 while (pool->cur_threads > 0) { 366 qemu_cond_wait(&pool->worker_stopped, &pool->lock); 367 } 368 369 qemu_mutex_unlock(&pool->lock); 370 371 qemu_bh_delete(pool->completion_bh); 372 qemu_cond_destroy(&pool->request_cond); 373 qemu_cond_destroy(&pool->worker_stopped); 374 qemu_mutex_destroy(&pool->lock); 375 g_free(pool); 376 } 377