1 /* 2 * QEMU I/O task 3 * 4 * Copyright (c) 2015 Red Hat, Inc. 5 * 6 * This library is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public 8 * License as published by the Free Software Foundation; either 9 * version 2 of the License, or (at your option) any later version. 10 * 11 * This library is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General Public 17 * License along with this library; if not, see <http://www.gnu.org/licenses/>. 18 * 19 */ 20 21 #include "qemu/osdep.h" 22 #include "io/task.h" 23 #include "qapi/error.h" 24 #include "qemu/thread.h" 25 #include "trace.h" 26 27 struct QIOTaskThreadData { 28 QIOTaskWorker worker; 29 gpointer opaque; 30 GDestroyNotify destroy; 31 GMainContext *context; 32 GSource *completion; 33 }; 34 35 36 struct QIOTask { 37 Object *source; 38 QIOTaskFunc func; 39 gpointer opaque; 40 GDestroyNotify destroy; 41 Error *err; 42 gpointer result; 43 GDestroyNotify destroyResult; 44 QemuMutex thread_lock; 45 QemuCond thread_cond; 46 struct QIOTaskThreadData *thread; 47 }; 48 49 50 QIOTask *qio_task_new(Object *source, 51 QIOTaskFunc func, 52 gpointer opaque, 53 GDestroyNotify destroy) 54 { 55 QIOTask *task; 56 57 task = g_new0(QIOTask, 1); 58 59 task->source = source; 60 object_ref(source); 61 task->func = func; 62 task->opaque = opaque; 63 task->destroy = destroy; 64 qemu_mutex_init(&task->thread_lock); 65 qemu_cond_init(&task->thread_cond); 66 67 trace_qio_task_new(task, source, func, opaque); 68 69 return task; 70 } 71 72 static void qio_task_free(QIOTask *task) 73 { 74 qemu_mutex_lock(&task->thread_lock); 75 if (task->thread) { 76 if (task->thread->destroy) { 77 task->thread->destroy(task->thread->opaque); 78 } 79 80 if (task->thread->context) { 81 g_main_context_unref(task->thread->context); 82 } 83 84 g_free(task->thread); 85 } 86 87 if (task->destroy) { 88 task->destroy(task->opaque); 89 } 90 if (task->destroyResult) { 91 task->destroyResult(task->result); 92 } 93 if (task->err) { 94 error_free(task->err); 95 } 96 object_unref(task->source); 97 98 qemu_mutex_unlock(&task->thread_lock); 99 qemu_mutex_destroy(&task->thread_lock); 100 qemu_cond_destroy(&task->thread_cond); 101 102 g_free(task); 103 } 104 105 106 static gboolean qio_task_thread_result(gpointer opaque) 107 { 108 QIOTask *task = opaque; 109 110 trace_qio_task_thread_result(task); 111 qio_task_complete(task); 112 113 return FALSE; 114 } 115 116 117 static gpointer qio_task_thread_worker(gpointer opaque) 118 { 119 QIOTask *task = opaque; 120 121 trace_qio_task_thread_run(task); 122 123 task->thread->worker(task, task->thread->opaque); 124 125 /* We're running in the background thread, and must only 126 * ever report the task results in the main event loop 127 * thread. So we schedule an idle callback to report 128 * the worker results 129 */ 130 trace_qio_task_thread_exit(task); 131 132 qemu_mutex_lock(&task->thread_lock); 133 134 task->thread->completion = g_idle_source_new(); 135 g_source_set_callback(task->thread->completion, 136 qio_task_thread_result, task, NULL); 137 g_source_attach(task->thread->completion, 138 task->thread->context); 139 g_source_unref(task->thread->completion); 140 trace_qio_task_thread_source_attach(task, task->thread->completion); 141 142 qemu_cond_signal(&task->thread_cond); 143 qemu_mutex_unlock(&task->thread_lock); 144 145 return NULL; 146 } 147 148 149 void qio_task_run_in_thread(QIOTask *task, 150 QIOTaskWorker worker, 151 gpointer opaque, 152 GDestroyNotify destroy, 153 GMainContext *context) 154 { 155 struct QIOTaskThreadData *data = g_new0(struct QIOTaskThreadData, 1); 156 QemuThread thread; 157 158 if (context) { 159 g_main_context_ref(context); 160 } 161 162 data->worker = worker; 163 data->opaque = opaque; 164 data->destroy = destroy; 165 data->context = context; 166 167 task->thread = data; 168 169 trace_qio_task_thread_start(task, worker, opaque); 170 qemu_thread_create(&thread, 171 "io-task-worker", 172 qio_task_thread_worker, 173 task, 174 QEMU_THREAD_DETACHED); 175 } 176 177 178 void qio_task_wait_thread(QIOTask *task) 179 { 180 qemu_mutex_lock(&task->thread_lock); 181 g_assert(task->thread != NULL); 182 while (task->thread->completion == NULL) { 183 qemu_cond_wait(&task->thread_cond, &task->thread_lock); 184 } 185 186 trace_qio_task_thread_source_cancel(task, task->thread->completion); 187 g_source_destroy(task->thread->completion); 188 qemu_mutex_unlock(&task->thread_lock); 189 190 qio_task_thread_result(task); 191 } 192 193 194 void qio_task_complete(QIOTask *task) 195 { 196 task->func(task, task->opaque); 197 trace_qio_task_complete(task); 198 qio_task_free(task); 199 } 200 201 202 void qio_task_set_error(QIOTask *task, 203 Error *err) 204 { 205 error_propagate(&task->err, err); 206 } 207 208 209 bool qio_task_propagate_error(QIOTask *task, 210 Error **errp) 211 { 212 if (task->err) { 213 error_propagate(errp, task->err); 214 task->err = NULL; 215 return true; 216 } 217 218 return false; 219 } 220 221 222 void qio_task_set_result_pointer(QIOTask *task, 223 gpointer result, 224 GDestroyNotify destroy) 225 { 226 task->result = result; 227 task->destroyResult = destroy; 228 } 229 230 231 gpointer qio_task_get_result_pointer(QIOTask *task) 232 { 233 return task->result; 234 } 235 236 237 Object *qio_task_get_source(QIOTask *task) 238 { 239 return task->source; 240 } 241