1 /* 2 * QEMU I/O task 3 * 4 * Copyright (c) 2015 Red Hat, Inc. 5 * 6 * This library is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public 8 * License as published by the Free Software Foundation; either 9 * version 2 of the License, or (at your option) any later version. 10 * 11 * This library is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General Public 17 * License along with this library; if not, see <http://www.gnu.org/licenses/>. 18 * 19 */ 20 21 #include "qemu/osdep.h" 22 #include "io/task.h" 23 #include "qapi/error.h" 24 #include "qemu/thread.h" 25 #include "trace.h" 26 27 struct QIOTaskThreadData { 28 QIOTaskWorker worker; 29 gpointer opaque; 30 GDestroyNotify destroy; 31 GMainContext *context; 32 GSource *completion; 33 }; 34 35 36 struct QIOTask { 37 Object *source; 38 QIOTaskFunc func; 39 gpointer opaque; 40 GDestroyNotify destroy; 41 Error *err; 42 gpointer result; 43 GDestroyNotify destroyResult; 44 QemuMutex thread_lock; 45 QemuCond thread_cond; 46 struct QIOTaskThreadData *thread; 47 }; 48 49 50 QIOTask *qio_task_new(Object *source, 51 QIOTaskFunc func, 52 gpointer opaque, 53 GDestroyNotify destroy) 54 { 55 QIOTask *task; 56 57 task = g_new0(QIOTask, 1); 58 59 task->source = source; 60 object_ref(source); 61 task->func = func; 62 task->opaque = opaque; 63 task->destroy = destroy; 64 qemu_mutex_init(&task->thread_lock); 65 qemu_cond_init(&task->thread_cond); 66 67 trace_qio_task_new(task, source, func, opaque); 68 69 return task; 70 } 71 72 static void qio_task_free(QIOTask *task) 73 { 74 qemu_mutex_lock(&task->thread_lock); 75 if (task->thread) { 76 if (task->thread->destroy) { 77 task->thread->destroy(task->thread->opaque); 78 } 79 80 if (task->thread->context) { 81 g_main_context_unref(task->thread->context); 82 } 83 84 g_free(task->thread); 85 } 86 87 if (task->destroy) { 88 task->destroy(task->opaque); 89 } 90 if (task->destroyResult) { 91 task->destroyResult(task->result); 92 } 93 if (task->err) { 94 error_free(task->err); 95 } 96 object_unref(task->source); 97 98 qemu_mutex_unlock(&task->thread_lock); 99 qemu_mutex_destroy(&task->thread_lock); 100 qemu_cond_destroy(&task->thread_cond); 101 102 g_free(task); 103 } 104 105 106 static gboolean qio_task_thread_result(gpointer opaque) 107 { 108 QIOTask *task = opaque; 109 110 trace_qio_task_thread_result(task); 111 qio_task_complete(task); 112 113 return FALSE; 114 } 115 116 117 static gpointer qio_task_thread_worker(gpointer opaque) 118 { 119 QIOTask *task = opaque; 120 121 trace_qio_task_thread_run(task); 122 123 task->thread->worker(task, task->thread->opaque); 124 125 /* We're running in the background thread, and must only 126 * ever report the task results in the main event loop 127 * thread. So we schedule an idle callback to report 128 * the worker results 129 */ 130 trace_qio_task_thread_exit(task); 131 132 qemu_mutex_lock(&task->thread_lock); 133 134 task->thread->completion = g_idle_source_new(); 135 g_source_set_callback(task->thread->completion, 136 qio_task_thread_result, task, NULL); 137 g_source_attach(task->thread->completion, 138 task->thread->context); 139 trace_qio_task_thread_source_attach(task, task->thread->completion); 140 141 qemu_cond_signal(&task->thread_cond); 142 qemu_mutex_unlock(&task->thread_lock); 143 144 return NULL; 145 } 146 147 148 void qio_task_run_in_thread(QIOTask *task, 149 QIOTaskWorker worker, 150 gpointer opaque, 151 GDestroyNotify destroy, 152 GMainContext *context) 153 { 154 struct QIOTaskThreadData *data = g_new0(struct QIOTaskThreadData, 1); 155 QemuThread thread; 156 157 if (context) { 158 g_main_context_ref(context); 159 } 160 161 data->worker = worker; 162 data->opaque = opaque; 163 data->destroy = destroy; 164 data->context = context; 165 166 task->thread = data; 167 168 trace_qio_task_thread_start(task, worker, opaque); 169 qemu_thread_create(&thread, 170 "io-task-worker", 171 qio_task_thread_worker, 172 task, 173 QEMU_THREAD_DETACHED); 174 } 175 176 177 void qio_task_wait_thread(QIOTask *task) 178 { 179 qemu_mutex_lock(&task->thread_lock); 180 g_assert(task->thread != NULL); 181 while (task->thread->completion == NULL) { 182 qemu_cond_wait(&task->thread_cond, &task->thread_lock); 183 } 184 185 trace_qio_task_thread_source_cancel(task, task->thread->completion); 186 g_source_destroy(task->thread->completion); 187 qemu_mutex_unlock(&task->thread_lock); 188 189 qio_task_thread_result(task); 190 } 191 192 193 void qio_task_complete(QIOTask *task) 194 { 195 task->func(task, task->opaque); 196 trace_qio_task_complete(task); 197 qio_task_free(task); 198 } 199 200 201 void qio_task_set_error(QIOTask *task, 202 Error *err) 203 { 204 error_propagate(&task->err, err); 205 } 206 207 208 bool qio_task_propagate_error(QIOTask *task, 209 Error **errp) 210 { 211 if (task->err) { 212 error_propagate(errp, task->err); 213 task->err = NULL; 214 return true; 215 } 216 217 return false; 218 } 219 220 221 void qio_task_set_result_pointer(QIOTask *task, 222 gpointer result, 223 GDestroyNotify destroy) 224 { 225 task->result = result; 226 task->destroyResult = destroy; 227 } 228 229 230 gpointer qio_task_get_result_pointer(QIOTask *task) 231 { 232 return task->result; 233 } 234 235 236 Object *qio_task_get_source(QIOTask *task) 237 { 238 return task->source; 239 } 240