1 /* 2 * QEMU block layer thread pool 3 * 4 * Copyright IBM, Corp. 2008 5 * Copyright Red Hat, Inc. 2012 6 * 7 * Authors: 8 * Anthony Liguori <aliguori@us.ibm.com> 9 * Paolo Bonzini <pbonzini@redhat.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2. See 12 * the COPYING file in the top-level directory. 13 * 14 * Contributions after 2012-01-13 are licensed under the terms of the 15 * GNU GPL, version 2 or (at your option) any later version. 16 */ 17 #include "qemu/osdep.h" 18 #include "qemu-common.h" 19 #include "qemu/queue.h" 20 #include "qemu/thread.h" 21 #include "qemu/coroutine.h" 22 #include "trace.h" 23 #include "block/thread-pool.h" 24 #include "qemu/main-loop.h" 25 26 static void do_spawn_thread(ThreadPool *pool); 27 28 typedef struct ThreadPoolElement ThreadPoolElement; 29 30 enum ThreadState { 31 THREAD_QUEUED, 32 THREAD_ACTIVE, 33 THREAD_DONE, 34 }; 35 36 struct ThreadPoolElement { 37 BlockAIOCB common; 38 ThreadPool *pool; 39 ThreadPoolFunc *func; 40 void *arg; 41 42 /* Moving state out of THREAD_QUEUED is protected by lock. After 43 * that, only the worker thread can write to it. Reads and writes 44 * of state and ret are ordered with memory barriers. 45 */ 46 enum ThreadState state; 47 int ret; 48 49 /* Access to this list is protected by lock. */ 50 QTAILQ_ENTRY(ThreadPoolElement) reqs; 51 52 /* Access to this list is protected by the global mutex. */ 53 QLIST_ENTRY(ThreadPoolElement) all; 54 }; 55 56 struct ThreadPool { 57 AioContext *ctx; 58 QEMUBH *completion_bh; 59 QemuMutex lock; 60 QemuCond worker_stopped; 61 QemuSemaphore sem; 62 int max_threads; 63 QEMUBH *new_thread_bh; 64 65 /* The following variables are only accessed from one AioContext. */ 66 QLIST_HEAD(, ThreadPoolElement) head; 67 68 /* The following variables are protected by lock. */ 69 QTAILQ_HEAD(, ThreadPoolElement) request_list; 70 int cur_threads; 71 int idle_threads; 72 int new_threads; /* backlog of threads we need to create */ 73 int pending_threads; /* threads created but not running yet */ 74 bool stopping; 75 }; 76 77 static void *worker_thread(void *opaque) 78 { 79 ThreadPool *pool = opaque; 80 81 qemu_mutex_lock(&pool->lock); 82 pool->pending_threads--; 83 do_spawn_thread(pool); 84 85 while (!pool->stopping) { 86 ThreadPoolElement *req; 87 int ret; 88 89 do { 90 pool->idle_threads++; 91 qemu_mutex_unlock(&pool->lock); 92 ret = qemu_sem_timedwait(&pool->sem, 10000); 93 qemu_mutex_lock(&pool->lock); 94 pool->idle_threads--; 95 } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list)); 96 if (ret == -1 || pool->stopping) { 97 break; 98 } 99 100 req = QTAILQ_FIRST(&pool->request_list); 101 QTAILQ_REMOVE(&pool->request_list, req, reqs); 102 req->state = THREAD_ACTIVE; 103 qemu_mutex_unlock(&pool->lock); 104 105 ret = req->func(req->arg); 106 107 req->ret = ret; 108 /* Write ret before state. */ 109 smp_wmb(); 110 req->state = THREAD_DONE; 111 112 qemu_mutex_lock(&pool->lock); 113 114 qemu_bh_schedule(pool->completion_bh); 115 } 116 117 pool->cur_threads--; 118 qemu_cond_signal(&pool->worker_stopped); 119 qemu_mutex_unlock(&pool->lock); 120 return NULL; 121 } 122 123 static void do_spawn_thread(ThreadPool *pool) 124 { 125 QemuThread t; 126 127 /* Runs with lock taken. */ 128 if (!pool->new_threads) { 129 return; 130 } 131 132 pool->new_threads--; 133 pool->pending_threads++; 134 135 qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED); 136 } 137 138 static void spawn_thread_bh_fn(void *opaque) 139 { 140 ThreadPool *pool = opaque; 141 142 qemu_mutex_lock(&pool->lock); 143 do_spawn_thread(pool); 144 qemu_mutex_unlock(&pool->lock); 145 } 146 147 static void spawn_thread(ThreadPool *pool) 148 { 149 pool->cur_threads++; 150 pool->new_threads++; 151 /* If there are threads being created, they will spawn new workers, so 152 * we don't spend time creating many threads in a loop holding a mutex or 153 * starving the current vcpu. 154 * 155 * If there are no idle threads, ask the main thread to create one, so we 156 * inherit the correct affinity instead of the vcpu affinity. 157 */ 158 if (!pool->pending_threads) { 159 qemu_bh_schedule(pool->new_thread_bh); 160 } 161 } 162 163 static void thread_pool_completion_bh(void *opaque) 164 { 165 ThreadPool *pool = opaque; 166 ThreadPoolElement *elem, *next; 167 168 aio_context_acquire(pool->ctx); 169 restart: 170 QLIST_FOREACH_SAFE(elem, &pool->head, all, next) { 171 if (elem->state != THREAD_DONE) { 172 continue; 173 } 174 175 trace_thread_pool_complete(pool, elem, elem->common.opaque, 176 elem->ret); 177 QLIST_REMOVE(elem, all); 178 179 if (elem->common.cb) { 180 /* Read state before ret. */ 181 smp_rmb(); 182 183 /* Schedule ourselves in case elem->common.cb() calls aio_poll() to 184 * wait for another request that completed at the same time. 185 */ 186 qemu_bh_schedule(pool->completion_bh); 187 188 aio_context_release(pool->ctx); 189 elem->common.cb(elem->common.opaque, elem->ret); 190 aio_context_acquire(pool->ctx); 191 192 /* We can safely cancel the completion_bh here regardless of someone 193 * else having scheduled it meanwhile because we reenter the 194 * completion function anyway (goto restart). 195 */ 196 qemu_bh_cancel(pool->completion_bh); 197 198 qemu_aio_unref(elem); 199 goto restart; 200 } else { 201 qemu_aio_unref(elem); 202 } 203 } 204 aio_context_release(pool->ctx); 205 } 206 207 static void thread_pool_cancel(BlockAIOCB *acb) 208 { 209 ThreadPoolElement *elem = (ThreadPoolElement *)acb; 210 ThreadPool *pool = elem->pool; 211 212 trace_thread_pool_cancel(elem, elem->common.opaque); 213 214 qemu_mutex_lock(&pool->lock); 215 if (elem->state == THREAD_QUEUED && 216 /* No thread has yet started working on elem. we can try to "steal" 217 * the item from the worker if we can get a signal from the 218 * semaphore. Because this is non-blocking, we can do it with 219 * the lock taken and ensure that elem will remain THREAD_QUEUED. 220 */ 221 qemu_sem_timedwait(&pool->sem, 0) == 0) { 222 QTAILQ_REMOVE(&pool->request_list, elem, reqs); 223 qemu_bh_schedule(pool->completion_bh); 224 225 elem->state = THREAD_DONE; 226 elem->ret = -ECANCELED; 227 } 228 229 qemu_mutex_unlock(&pool->lock); 230 } 231 232 static AioContext *thread_pool_get_aio_context(BlockAIOCB *acb) 233 { 234 ThreadPoolElement *elem = (ThreadPoolElement *)acb; 235 ThreadPool *pool = elem->pool; 236 return pool->ctx; 237 } 238 239 static const AIOCBInfo thread_pool_aiocb_info = { 240 .aiocb_size = sizeof(ThreadPoolElement), 241 .cancel_async = thread_pool_cancel, 242 .get_aio_context = thread_pool_get_aio_context, 243 }; 244 245 BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool, 246 ThreadPoolFunc *func, void *arg, 247 BlockCompletionFunc *cb, void *opaque) 248 { 249 ThreadPoolElement *req; 250 251 req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque); 252 req->func = func; 253 req->arg = arg; 254 req->state = THREAD_QUEUED; 255 req->pool = pool; 256 257 QLIST_INSERT_HEAD(&pool->head, req, all); 258 259 trace_thread_pool_submit(pool, req, arg); 260 261 qemu_mutex_lock(&pool->lock); 262 if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) { 263 spawn_thread(pool); 264 } 265 QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); 266 qemu_mutex_unlock(&pool->lock); 267 qemu_sem_post(&pool->sem); 268 return &req->common; 269 } 270 271 typedef struct ThreadPoolCo { 272 Coroutine *co; 273 int ret; 274 } ThreadPoolCo; 275 276 static void thread_pool_co_cb(void *opaque, int ret) 277 { 278 ThreadPoolCo *co = opaque; 279 280 co->ret = ret; 281 aio_co_wake(co->co); 282 } 283 284 int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, 285 void *arg) 286 { 287 ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; 288 assert(qemu_in_coroutine()); 289 thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc); 290 qemu_coroutine_yield(); 291 return tpc.ret; 292 } 293 294 void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg) 295 { 296 thread_pool_submit_aio(pool, func, arg, NULL, NULL); 297 } 298 299 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) 300 { 301 if (!ctx) { 302 ctx = qemu_get_aio_context(); 303 } 304 305 memset(pool, 0, sizeof(*pool)); 306 pool->ctx = ctx; 307 pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool); 308 qemu_mutex_init(&pool->lock); 309 qemu_cond_init(&pool->worker_stopped); 310 qemu_sem_init(&pool->sem, 0); 311 pool->max_threads = 64; 312 pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); 313 314 QLIST_INIT(&pool->head); 315 QTAILQ_INIT(&pool->request_list); 316 } 317 318 ThreadPool *thread_pool_new(AioContext *ctx) 319 { 320 ThreadPool *pool = g_new(ThreadPool, 1); 321 thread_pool_init_one(pool, ctx); 322 return pool; 323 } 324 325 void thread_pool_free(ThreadPool *pool) 326 { 327 if (!pool) { 328 return; 329 } 330 331 assert(QLIST_EMPTY(&pool->head)); 332 333 qemu_mutex_lock(&pool->lock); 334 335 /* Stop new threads from spawning */ 336 qemu_bh_delete(pool->new_thread_bh); 337 pool->cur_threads -= pool->new_threads; 338 pool->new_threads = 0; 339 340 /* Wait for worker threads to terminate */ 341 pool->stopping = true; 342 while (pool->cur_threads > 0) { 343 qemu_sem_post(&pool->sem); 344 qemu_cond_wait(&pool->worker_stopped, &pool->lock); 345 } 346 347 qemu_mutex_unlock(&pool->lock); 348 349 qemu_bh_delete(pool->completion_bh); 350 qemu_sem_destroy(&pool->sem); 351 qemu_cond_destroy(&pool->worker_stopped); 352 qemu_mutex_destroy(&pool->lock); 353 g_free(pool); 354 } 355