1 /* 2 * QEMU block layer thread pool 3 * 4 * Copyright IBM, Corp. 2008 5 * Copyright Red Hat, Inc. 2012 6 * 7 * Authors: 8 * Anthony Liguori <aliguori@us.ibm.com> 9 * Paolo Bonzini <pbonzini@redhat.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2. See 12 * the COPYING file in the top-level directory. 13 * 14 * Contributions after 2012-01-13 are licensed under the terms of the 15 * GNU GPL, version 2 or (at your option) any later version. 16 */ 17 #include "qemu/osdep.h" 18 #include "qemu/queue.h" 19 #include "qemu/thread.h" 20 #include "qemu/coroutine.h" 21 #include "trace.h" 22 #include "block/thread-pool.h" 23 #include "qemu/main-loop.h" 24 25 static void do_spawn_thread(ThreadPool *pool); 26 27 typedef struct ThreadPoolElement ThreadPoolElement; 28 29 enum ThreadState { 30 THREAD_QUEUED, 31 THREAD_ACTIVE, 32 THREAD_DONE, 33 }; 34 35 struct ThreadPoolElement { 36 BlockAIOCB common; 37 ThreadPool *pool; 38 ThreadPoolFunc *func; 39 void *arg; 40 41 /* Moving state out of THREAD_QUEUED is protected by lock. After 42 * that, only the worker thread can write to it. Reads and writes 43 * of state and ret are ordered with memory barriers. 44 */ 45 enum ThreadState state; 46 int ret; 47 48 /* Access to this list is protected by lock. */ 49 QTAILQ_ENTRY(ThreadPoolElement) reqs; 50 51 /* Access to this list is protected by the global mutex. */ 52 QLIST_ENTRY(ThreadPoolElement) all; 53 }; 54 55 struct ThreadPool { 56 AioContext *ctx; 57 QEMUBH *completion_bh; 58 QemuMutex lock; 59 QemuCond worker_stopped; 60 QemuSemaphore sem; 61 int max_threads; 62 QEMUBH *new_thread_bh; 63 64 /* The following variables are only accessed from one AioContext. */ 65 QLIST_HEAD(, ThreadPoolElement) head; 66 67 /* The following variables are protected by lock. */ 68 QTAILQ_HEAD(, ThreadPoolElement) request_list; 69 int cur_threads; 70 int idle_threads; 71 int new_threads; /* backlog of threads we need to create */ 72 int pending_threads; /* threads created but not running yet */ 73 bool stopping; 74 }; 75 76 static void *worker_thread(void *opaque) 77 { 78 ThreadPool *pool = opaque; 79 80 qemu_mutex_lock(&pool->lock); 81 pool->pending_threads--; 82 do_spawn_thread(pool); 83 84 while (!pool->stopping) { 85 ThreadPoolElement *req; 86 int ret; 87 88 do { 89 pool->idle_threads++; 90 qemu_mutex_unlock(&pool->lock); 91 ret = qemu_sem_timedwait(&pool->sem, 10000); 92 qemu_mutex_lock(&pool->lock); 93 pool->idle_threads--; 94 } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list)); 95 if (ret == -1 || pool->stopping) { 96 break; 97 } 98 99 req = QTAILQ_FIRST(&pool->request_list); 100 QTAILQ_REMOVE(&pool->request_list, req, reqs); 101 req->state = THREAD_ACTIVE; 102 qemu_mutex_unlock(&pool->lock); 103 104 ret = req->func(req->arg); 105 106 req->ret = ret; 107 /* Write ret before state. */ 108 smp_wmb(); 109 req->state = THREAD_DONE; 110 111 qemu_mutex_lock(&pool->lock); 112 113 qemu_bh_schedule(pool->completion_bh); 114 } 115 116 pool->cur_threads--; 117 qemu_cond_signal(&pool->worker_stopped); 118 qemu_mutex_unlock(&pool->lock); 119 return NULL; 120 } 121 122 static void do_spawn_thread(ThreadPool *pool) 123 { 124 QemuThread t; 125 126 /* Runs with lock taken. */ 127 if (!pool->new_threads) { 128 return; 129 } 130 131 pool->new_threads--; 132 pool->pending_threads++; 133 134 qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED); 135 } 136 137 static void spawn_thread_bh_fn(void *opaque) 138 { 139 ThreadPool *pool = opaque; 140 141 qemu_mutex_lock(&pool->lock); 142 do_spawn_thread(pool); 143 qemu_mutex_unlock(&pool->lock); 144 } 145 146 static void spawn_thread(ThreadPool *pool) 147 { 148 pool->cur_threads++; 149 pool->new_threads++; 150 /* If there are threads being created, they will spawn new workers, so 151 * we don't spend time creating many threads in a loop holding a mutex or 152 * starving the current vcpu. 153 * 154 * If there are no idle threads, ask the main thread to create one, so we 155 * inherit the correct affinity instead of the vcpu affinity. 156 */ 157 if (!pool->pending_threads) { 158 qemu_bh_schedule(pool->new_thread_bh); 159 } 160 } 161 162 static void thread_pool_completion_bh(void *opaque) 163 { 164 ThreadPool *pool = opaque; 165 ThreadPoolElement *elem, *next; 166 167 aio_context_acquire(pool->ctx); 168 restart: 169 QLIST_FOREACH_SAFE(elem, &pool->head, all, next) { 170 if (elem->state != THREAD_DONE) { 171 continue; 172 } 173 174 trace_thread_pool_complete(pool, elem, elem->common.opaque, 175 elem->ret); 176 QLIST_REMOVE(elem, all); 177 178 if (elem->common.cb) { 179 /* Read state before ret. */ 180 smp_rmb(); 181 182 /* Schedule ourselves in case elem->common.cb() calls aio_poll() to 183 * wait for another request that completed at the same time. 184 */ 185 qemu_bh_schedule(pool->completion_bh); 186 187 aio_context_release(pool->ctx); 188 elem->common.cb(elem->common.opaque, elem->ret); 189 aio_context_acquire(pool->ctx); 190 191 /* We can safely cancel the completion_bh here regardless of someone 192 * else having scheduled it meanwhile because we reenter the 193 * completion function anyway (goto restart). 194 */ 195 qemu_bh_cancel(pool->completion_bh); 196 197 qemu_aio_unref(elem); 198 goto restart; 199 } else { 200 qemu_aio_unref(elem); 201 } 202 } 203 aio_context_release(pool->ctx); 204 } 205 206 static void thread_pool_cancel(BlockAIOCB *acb) 207 { 208 ThreadPoolElement *elem = (ThreadPoolElement *)acb; 209 ThreadPool *pool = elem->pool; 210 211 trace_thread_pool_cancel(elem, elem->common.opaque); 212 213 QEMU_LOCK_GUARD(&pool->lock); 214 if (elem->state == THREAD_QUEUED && 215 /* No thread has yet started working on elem. we can try to "steal" 216 * the item from the worker if we can get a signal from the 217 * semaphore. Because this is non-blocking, we can do it with 218 * the lock taken and ensure that elem will remain THREAD_QUEUED. 219 */ 220 qemu_sem_timedwait(&pool->sem, 0) == 0) { 221 QTAILQ_REMOVE(&pool->request_list, elem, reqs); 222 qemu_bh_schedule(pool->completion_bh); 223 224 elem->state = THREAD_DONE; 225 elem->ret = -ECANCELED; 226 } 227 228 } 229 230 static AioContext *thread_pool_get_aio_context(BlockAIOCB *acb) 231 { 232 ThreadPoolElement *elem = (ThreadPoolElement *)acb; 233 ThreadPool *pool = elem->pool; 234 return pool->ctx; 235 } 236 237 static const AIOCBInfo thread_pool_aiocb_info = { 238 .aiocb_size = sizeof(ThreadPoolElement), 239 .cancel_async = thread_pool_cancel, 240 .get_aio_context = thread_pool_get_aio_context, 241 }; 242 243 BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool, 244 ThreadPoolFunc *func, void *arg, 245 BlockCompletionFunc *cb, void *opaque) 246 { 247 ThreadPoolElement *req; 248 249 req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque); 250 req->func = func; 251 req->arg = arg; 252 req->state = THREAD_QUEUED; 253 req->pool = pool; 254 255 QLIST_INSERT_HEAD(&pool->head, req, all); 256 257 trace_thread_pool_submit(pool, req, arg); 258 259 qemu_mutex_lock(&pool->lock); 260 if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) { 261 spawn_thread(pool); 262 } 263 QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); 264 qemu_mutex_unlock(&pool->lock); 265 qemu_sem_post(&pool->sem); 266 return &req->common; 267 } 268 269 typedef struct ThreadPoolCo { 270 Coroutine *co; 271 int ret; 272 } ThreadPoolCo; 273 274 static void thread_pool_co_cb(void *opaque, int ret) 275 { 276 ThreadPoolCo *co = opaque; 277 278 co->ret = ret; 279 aio_co_wake(co->co); 280 } 281 282 int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, 283 void *arg) 284 { 285 ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; 286 assert(qemu_in_coroutine()); 287 thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc); 288 qemu_coroutine_yield(); 289 return tpc.ret; 290 } 291 292 void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg) 293 { 294 thread_pool_submit_aio(pool, func, arg, NULL, NULL); 295 } 296 297 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) 298 { 299 if (!ctx) { 300 ctx = qemu_get_aio_context(); 301 } 302 303 memset(pool, 0, sizeof(*pool)); 304 pool->ctx = ctx; 305 pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool); 306 qemu_mutex_init(&pool->lock); 307 qemu_cond_init(&pool->worker_stopped); 308 qemu_sem_init(&pool->sem, 0); 309 pool->max_threads = 64; 310 pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); 311 312 QLIST_INIT(&pool->head); 313 QTAILQ_INIT(&pool->request_list); 314 } 315 316 ThreadPool *thread_pool_new(AioContext *ctx) 317 { 318 ThreadPool *pool = g_new(ThreadPool, 1); 319 thread_pool_init_one(pool, ctx); 320 return pool; 321 } 322 323 void thread_pool_free(ThreadPool *pool) 324 { 325 if (!pool) { 326 return; 327 } 328 329 assert(QLIST_EMPTY(&pool->head)); 330 331 qemu_mutex_lock(&pool->lock); 332 333 /* Stop new threads from spawning */ 334 qemu_bh_delete(pool->new_thread_bh); 335 pool->cur_threads -= pool->new_threads; 336 pool->new_threads = 0; 337 338 /* Wait for worker threads to terminate */ 339 pool->stopping = true; 340 while (pool->cur_threads > 0) { 341 qemu_sem_post(&pool->sem); 342 qemu_cond_wait(&pool->worker_stopped, &pool->lock); 343 } 344 345 qemu_mutex_unlock(&pool->lock); 346 347 qemu_bh_delete(pool->completion_bh); 348 qemu_sem_destroy(&pool->sem); 349 qemu_cond_destroy(&pool->worker_stopped); 350 qemu_mutex_destroy(&pool->lock); 351 g_free(pool); 352 } 353