1 /* 2 * QEMU block layer thread pool 3 * 4 * Copyright IBM, Corp. 2008 5 * Copyright Red Hat, Inc. 2012 6 * 7 * Authors: 8 * Anthony Liguori <aliguori@us.ibm.com> 9 * Paolo Bonzini <pbonzini@redhat.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2. See 12 * the COPYING file in the top-level directory. 13 * 14 * Contributions after 2012-01-13 are licensed under the terms of the 15 * GNU GPL, version 2 or (at your option) any later version. 16 */ 17 #include "qemu/osdep.h" 18 #include "qemu-common.h" 19 #include "qemu/queue.h" 20 #include "qemu/thread.h" 21 #include "qemu/coroutine.h" 22 #include "trace.h" 23 #include "block/thread-pool.h" 24 #include "qemu/main-loop.h" 25 26 static void do_spawn_thread(ThreadPool *pool); 27 28 typedef struct ThreadPoolElement ThreadPoolElement; 29 30 enum ThreadState { 31 THREAD_QUEUED, 32 THREAD_ACTIVE, 33 THREAD_DONE, 34 }; 35 36 struct ThreadPoolElement { 37 BlockAIOCB common; 38 ThreadPool *pool; 39 ThreadPoolFunc *func; 40 void *arg; 41 42 /* Moving state out of THREAD_QUEUED is protected by lock. After 43 * that, only the worker thread can write to it. Reads and writes 44 * of state and ret are ordered with memory barriers. 45 */ 46 enum ThreadState state; 47 int ret; 48 49 /* Access to this list is protected by lock. */ 50 QTAILQ_ENTRY(ThreadPoolElement) reqs; 51 52 /* Access to this list is protected by the global mutex. */ 53 QLIST_ENTRY(ThreadPoolElement) all; 54 }; 55 56 struct ThreadPool { 57 AioContext *ctx; 58 QEMUBH *completion_bh; 59 QemuMutex lock; 60 QemuCond worker_stopped; 61 QemuSemaphore sem; 62 int max_threads; 63 QEMUBH *new_thread_bh; 64 65 /* The following variables are only accessed from one AioContext. */ 66 QLIST_HEAD(, ThreadPoolElement) head; 67 68 /* The following variables are protected by lock. */ 69 QTAILQ_HEAD(, ThreadPoolElement) request_list; 70 int cur_threads; 71 int idle_threads; 72 int new_threads; /* backlog of threads we need to create */ 73 int pending_threads; /* threads created but not running yet */ 74 bool stopping; 75 }; 76 77 static void *worker_thread(void *opaque) 78 { 79 ThreadPool *pool = opaque; 80 81 qemu_mutex_lock(&pool->lock); 82 pool->pending_threads--; 83 do_spawn_thread(pool); 84 85 while (!pool->stopping) { 86 ThreadPoolElement *req; 87 int ret; 88 89 do { 90 pool->idle_threads++; 91 qemu_mutex_unlock(&pool->lock); 92 ret = qemu_sem_timedwait(&pool->sem, 10000); 93 qemu_mutex_lock(&pool->lock); 94 pool->idle_threads--; 95 } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list)); 96 if (ret == -1 || pool->stopping) { 97 break; 98 } 99 100 req = QTAILQ_FIRST(&pool->request_list); 101 QTAILQ_REMOVE(&pool->request_list, req, reqs); 102 req->state = THREAD_ACTIVE; 103 qemu_mutex_unlock(&pool->lock); 104 105 ret = req->func(req->arg); 106 107 req->ret = ret; 108 /* Write ret before state. */ 109 smp_wmb(); 110 req->state = THREAD_DONE; 111 112 qemu_mutex_lock(&pool->lock); 113 114 qemu_bh_schedule(pool->completion_bh); 115 } 116 117 pool->cur_threads--; 118 qemu_cond_signal(&pool->worker_stopped); 119 qemu_mutex_unlock(&pool->lock); 120 return NULL; 121 } 122 123 static void do_spawn_thread(ThreadPool *pool) 124 { 125 QemuThread t; 126 127 /* Runs with lock taken. */ 128 if (!pool->new_threads) { 129 return; 130 } 131 132 pool->new_threads--; 133 pool->pending_threads++; 134 135 qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED); 136 } 137 138 static void spawn_thread_bh_fn(void *opaque) 139 { 140 ThreadPool *pool = opaque; 141 142 qemu_mutex_lock(&pool->lock); 143 do_spawn_thread(pool); 144 qemu_mutex_unlock(&pool->lock); 145 } 146 147 static void spawn_thread(ThreadPool *pool) 148 { 149 pool->cur_threads++; 150 pool->new_threads++; 151 /* If there are threads being created, they will spawn new workers, so 152 * we don't spend time creating many threads in a loop holding a mutex or 153 * starving the current vcpu. 154 * 155 * If there are no idle threads, ask the main thread to create one, so we 156 * inherit the correct affinity instead of the vcpu affinity. 157 */ 158 if (!pool->pending_threads) { 159 qemu_bh_schedule(pool->new_thread_bh); 160 } 161 } 162 163 static void thread_pool_completion_bh(void *opaque) 164 { 165 ThreadPool *pool = opaque; 166 ThreadPoolElement *elem, *next; 167 168 restart: 169 QLIST_FOREACH_SAFE(elem, &pool->head, all, next) { 170 if (elem->state != THREAD_DONE) { 171 continue; 172 } 173 174 trace_thread_pool_complete(pool, elem, elem->common.opaque, 175 elem->ret); 176 QLIST_REMOVE(elem, all); 177 178 if (elem->common.cb) { 179 /* Read state before ret. */ 180 smp_rmb(); 181 182 /* Schedule ourselves in case elem->common.cb() calls aio_poll() to 183 * wait for another request that completed at the same time. 184 */ 185 qemu_bh_schedule(pool->completion_bh); 186 187 elem->common.cb(elem->common.opaque, elem->ret); 188 qemu_aio_unref(elem); 189 goto restart; 190 } else { 191 qemu_aio_unref(elem); 192 } 193 } 194 } 195 196 static void thread_pool_cancel(BlockAIOCB *acb) 197 { 198 ThreadPoolElement *elem = (ThreadPoolElement *)acb; 199 ThreadPool *pool = elem->pool; 200 201 trace_thread_pool_cancel(elem, elem->common.opaque); 202 203 qemu_mutex_lock(&pool->lock); 204 if (elem->state == THREAD_QUEUED && 205 /* No thread has yet started working on elem. we can try to "steal" 206 * the item from the worker if we can get a signal from the 207 * semaphore. Because this is non-blocking, we can do it with 208 * the lock taken and ensure that elem will remain THREAD_QUEUED. 209 */ 210 qemu_sem_timedwait(&pool->sem, 0) == 0) { 211 QTAILQ_REMOVE(&pool->request_list, elem, reqs); 212 qemu_bh_schedule(pool->completion_bh); 213 214 elem->state = THREAD_DONE; 215 elem->ret = -ECANCELED; 216 } 217 218 qemu_mutex_unlock(&pool->lock); 219 } 220 221 static AioContext *thread_pool_get_aio_context(BlockAIOCB *acb) 222 { 223 ThreadPoolElement *elem = (ThreadPoolElement *)acb; 224 ThreadPool *pool = elem->pool; 225 return pool->ctx; 226 } 227 228 static const AIOCBInfo thread_pool_aiocb_info = { 229 .aiocb_size = sizeof(ThreadPoolElement), 230 .cancel_async = thread_pool_cancel, 231 .get_aio_context = thread_pool_get_aio_context, 232 }; 233 234 BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool, 235 ThreadPoolFunc *func, void *arg, 236 BlockCompletionFunc *cb, void *opaque) 237 { 238 ThreadPoolElement *req; 239 240 req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque); 241 req->func = func; 242 req->arg = arg; 243 req->state = THREAD_QUEUED; 244 req->pool = pool; 245 246 QLIST_INSERT_HEAD(&pool->head, req, all); 247 248 trace_thread_pool_submit(pool, req, arg); 249 250 qemu_mutex_lock(&pool->lock); 251 if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) { 252 spawn_thread(pool); 253 } 254 QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); 255 qemu_mutex_unlock(&pool->lock); 256 qemu_sem_post(&pool->sem); 257 return &req->common; 258 } 259 260 typedef struct ThreadPoolCo { 261 Coroutine *co; 262 int ret; 263 } ThreadPoolCo; 264 265 static void thread_pool_co_cb(void *opaque, int ret) 266 { 267 ThreadPoolCo *co = opaque; 268 269 co->ret = ret; 270 qemu_coroutine_enter(co->co); 271 } 272 273 int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, 274 void *arg) 275 { 276 ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; 277 assert(qemu_in_coroutine()); 278 thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc); 279 qemu_coroutine_yield(); 280 return tpc.ret; 281 } 282 283 void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg) 284 { 285 thread_pool_submit_aio(pool, func, arg, NULL, NULL); 286 } 287 288 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) 289 { 290 if (!ctx) { 291 ctx = qemu_get_aio_context(); 292 } 293 294 memset(pool, 0, sizeof(*pool)); 295 pool->ctx = ctx; 296 pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool); 297 qemu_mutex_init(&pool->lock); 298 qemu_cond_init(&pool->worker_stopped); 299 qemu_sem_init(&pool->sem, 0); 300 pool->max_threads = 64; 301 pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); 302 303 QLIST_INIT(&pool->head); 304 QTAILQ_INIT(&pool->request_list); 305 } 306 307 ThreadPool *thread_pool_new(AioContext *ctx) 308 { 309 ThreadPool *pool = g_new(ThreadPool, 1); 310 thread_pool_init_one(pool, ctx); 311 return pool; 312 } 313 314 void thread_pool_free(ThreadPool *pool) 315 { 316 if (!pool) { 317 return; 318 } 319 320 assert(QLIST_EMPTY(&pool->head)); 321 322 qemu_mutex_lock(&pool->lock); 323 324 /* Stop new threads from spawning */ 325 qemu_bh_delete(pool->new_thread_bh); 326 pool->cur_threads -= pool->new_threads; 327 pool->new_threads = 0; 328 329 /* Wait for worker threads to terminate */ 330 pool->stopping = true; 331 while (pool->cur_threads > 0) { 332 qemu_sem_post(&pool->sem); 333 qemu_cond_wait(&pool->worker_stopped, &pool->lock); 334 } 335 336 qemu_mutex_unlock(&pool->lock); 337 338 qemu_bh_delete(pool->completion_bh); 339 qemu_sem_destroy(&pool->sem); 340 qemu_cond_destroy(&pool->worker_stopped); 341 qemu_mutex_destroy(&pool->lock); 342 g_free(pool); 343 } 344