1from __future__ import print_function 2# 3# Migration test main engine 4# 5# Copyright (c) 2016 Red Hat, Inc. 6# 7# This library is free software; you can redistribute it and/or 8# modify it under the terms of the GNU Lesser General Public 9# License as published by the Free Software Foundation; either 10# version 2 of the License, or (at your option) any later version. 11# 12# This library is distributed in the hope that it will be useful, 13# but WITHOUT ANY WARRANTY; without even the implied warranty of 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 15# Lesser General Public License for more details. 16# 17# You should have received a copy of the GNU Lesser General Public 18# License along with this library; if not, see <http://www.gnu.org/licenses/>. 19# 20 21 22import os 23import re 24import sys 25import time 26 27from guestperf.progress import Progress, ProgressStats 28from guestperf.report import Report 29from guestperf.timings import TimingRecord, Timings 30 31sys.path.append(os.path.join(os.path.dirname(__file__), 32 '..', '..', '..', 'python')) 33import qemu 34 35 36class Engine(object): 37 38 def __init__(self, binary, dst_host, kernel, initrd, transport="tcp", 39 sleep=15, verbose=False, debug=False): 40 41 self._binary = binary # Path to QEMU binary 42 self._dst_host = dst_host # Hostname of target host 43 self._kernel = kernel # Path to kernel image 44 self._initrd = initrd # Path to stress initrd 45 self._transport = transport # 'unix' or 'tcp' or 'rdma' 46 self._sleep = sleep 47 self._verbose = verbose 48 self._debug = debug 49 50 if debug: 51 self._verbose = debug 52 53 def _vcpu_timing(self, pid, tid_list): 54 records = [] 55 now = time.time() 56 57 jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK']) 58 for tid in tid_list: 59 statfile = "/proc/%d/task/%d/stat" % (pid, tid) 60 with open(statfile, "r") as fh: 61 stat = fh.readline() 62 fields = stat.split(" ") 63 stime = int(fields[13]) 64 utime = int(fields[14]) 65 records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec)) 66 return records 67 68 def _cpu_timing(self, pid): 69 records = [] 70 now = time.time() 71 72 jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK']) 73 statfile = "/proc/%d/stat" % pid 74 with open(statfile, "r") as fh: 75 stat = fh.readline() 76 fields = stat.split(" ") 77 stime = int(fields[13]) 78 utime = int(fields[14]) 79 return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec) 80 81 def _migrate_progress(self, vm): 82 info = vm.command("query-migrate") 83 84 if "ram" not in info: 85 info["ram"] = {} 86 87 return Progress( 88 info.get("status", "active"), 89 ProgressStats( 90 info["ram"].get("transferred", 0), 91 info["ram"].get("remaining", 0), 92 info["ram"].get("total", 0), 93 info["ram"].get("duplicate", 0), 94 info["ram"].get("skipped", 0), 95 info["ram"].get("normal", 0), 96 info["ram"].get("normal-bytes", 0), 97 info["ram"].get("dirty-pages-rate", 0), 98 info["ram"].get("mbps", 0), 99 info["ram"].get("dirty-sync-count", 0) 100 ), 101 time.time(), 102 info.get("total-time", 0), 103 info.get("downtime", 0), 104 info.get("expected-downtime", 0), 105 info.get("setup-time", 0), 106 info.get("x-cpu-throttle-percentage", 0), 107 ) 108 109 def _migrate(self, hardware, scenario, src, dst, connect_uri): 110 src_qemu_time = [] 111 src_vcpu_time = [] 112 src_pid = src.get_pid() 113 114 vcpus = src.command("query-cpus") 115 src_threads = [] 116 for vcpu in vcpus: 117 src_threads.append(vcpu["thread_id"]) 118 119 # XXX how to get dst timings on remote host ? 120 121 if self._verbose: 122 print("Sleeping %d seconds for initial guest workload run" % self._sleep) 123 sleep_secs = self._sleep 124 while sleep_secs > 1: 125 src_qemu_time.append(self._cpu_timing(src_pid)) 126 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads)) 127 time.sleep(1) 128 sleep_secs -= 1 129 130 if self._verbose: 131 print("Starting migration") 132 if scenario._auto_converge: 133 resp = src.command("migrate-set-capabilities", 134 capabilities = [ 135 { "capability": "auto-converge", 136 "state": True } 137 ]) 138 resp = src.command("migrate-set-parameters", 139 x_cpu_throttle_increment=scenario._auto_converge_step) 140 141 if scenario._post_copy: 142 resp = src.command("migrate-set-capabilities", 143 capabilities = [ 144 { "capability": "postcopy-ram", 145 "state": True } 146 ]) 147 resp = dst.command("migrate-set-capabilities", 148 capabilities = [ 149 { "capability": "postcopy-ram", 150 "state": True } 151 ]) 152 153 resp = src.command("migrate_set_speed", 154 value=scenario._bandwidth * 1024 * 1024) 155 156 resp = src.command("migrate_set_downtime", 157 value=scenario._downtime / 1024.0) 158 159 if scenario._compression_mt: 160 resp = src.command("migrate-set-capabilities", 161 capabilities = [ 162 { "capability": "compress", 163 "state": True } 164 ]) 165 resp = src.command("migrate-set-parameters", 166 compress_threads=scenario._compression_mt_threads) 167 resp = dst.command("migrate-set-capabilities", 168 capabilities = [ 169 { "capability": "compress", 170 "state": True } 171 ]) 172 resp = dst.command("migrate-set-parameters", 173 decompress_threads=scenario._compression_mt_threads) 174 175 if scenario._compression_xbzrle: 176 resp = src.command("migrate-set-capabilities", 177 capabilities = [ 178 { "capability": "xbzrle", 179 "state": True } 180 ]) 181 resp = dst.command("migrate-set-capabilities", 182 capabilities = [ 183 { "capability": "xbzrle", 184 "state": True } 185 ]) 186 resp = src.command("migrate-set-cache-size", 187 value=(hardware._mem * 1024 * 1024 * 1024 / 100 * 188 scenario._compression_xbzrle_cache)) 189 190 resp = src.command("migrate", uri=connect_uri) 191 192 post_copy = False 193 paused = False 194 195 progress_history = [] 196 197 start = time.time() 198 loop = 0 199 while True: 200 loop = loop + 1 201 time.sleep(0.05) 202 203 progress = self._migrate_progress(src) 204 if (loop % 20) == 0: 205 src_qemu_time.append(self._cpu_timing(src_pid)) 206 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads)) 207 208 if (len(progress_history) == 0 or 209 (progress_history[-1]._ram._iterations < 210 progress._ram._iterations)): 211 progress_history.append(progress) 212 213 if progress._status in ("completed", "failed", "cancelled"): 214 if progress._status == "completed" and paused: 215 dst.command("cont") 216 if progress_history[-1] != progress: 217 progress_history.append(progress) 218 219 if progress._status == "completed": 220 if self._verbose: 221 print("Sleeping %d seconds for final guest workload run" % self._sleep) 222 sleep_secs = self._sleep 223 while sleep_secs > 1: 224 time.sleep(1) 225 src_qemu_time.append(self._cpu_timing(src_pid)) 226 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads)) 227 sleep_secs -= 1 228 229 return [progress_history, src_qemu_time, src_vcpu_time] 230 231 if self._verbose and (loop % 20) == 0: 232 print("Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % ( 233 progress._ram._iterations, 234 progress._ram._remaining_bytes / (1024 * 1024), 235 progress._ram._total_bytes / (1024 * 1024), 236 progress._ram._transferred_bytes / (1024 * 1024), 237 progress._ram._transfer_rate_mbs, 238 )) 239 240 if progress._ram._iterations > scenario._max_iters: 241 if self._verbose: 242 print("No completion after %d iterations over RAM" % scenario._max_iters) 243 src.command("migrate_cancel") 244 continue 245 246 if time.time() > (start + scenario._max_time): 247 if self._verbose: 248 print("No completion after %d seconds" % scenario._max_time) 249 src.command("migrate_cancel") 250 continue 251 252 if (scenario._post_copy and 253 progress._ram._iterations >= scenario._post_copy_iters and 254 not post_copy): 255 if self._verbose: 256 print("Switching to post-copy after %d iterations" % scenario._post_copy_iters) 257 resp = src.command("migrate-start-postcopy") 258 post_copy = True 259 260 if (scenario._pause and 261 progress._ram._iterations >= scenario._pause_iters and 262 not paused): 263 if self._verbose: 264 print("Pausing VM after %d iterations" % scenario._pause_iters) 265 resp = src.command("stop") 266 paused = True 267 268 def _get_common_args(self, hardware, tunnelled=False): 269 args = [ 270 "noapic", 271 "edd=off", 272 "printk.time=1", 273 "noreplace-smp", 274 "cgroup_disable=memory", 275 "pci=noearly", 276 "console=ttyS0", 277 ] 278 if self._debug: 279 args.append("debug") 280 else: 281 args.append("quiet") 282 283 args.append("ramsize=%s" % hardware._mem) 284 285 cmdline = " ".join(args) 286 if tunnelled: 287 cmdline = "'" + cmdline + "'" 288 289 argv = [ 290 "-machine", "accel=kvm", 291 "-cpu", "host", 292 "-kernel", self._kernel, 293 "-initrd", self._initrd, 294 "-append", cmdline, 295 "-chardev", "stdio,id=cdev0", 296 "-device", "isa-serial,chardev=cdev0", 297 "-m", str((hardware._mem * 1024) + 512), 298 "-smp", str(hardware._cpus), 299 ] 300 301 if self._debug: 302 argv.extend(["-device", "sga"]) 303 304 if hardware._prealloc_pages: 305 argv_source += ["-mem-path", "/dev/shm", 306 "-mem-prealloc"] 307 if hardware._locked_pages: 308 argv_source += ["-realtime", "mlock=on"] 309 if hardware._huge_pages: 310 pass 311 312 return argv 313 314 def _get_src_args(self, hardware): 315 return self._get_common_args(hardware) 316 317 def _get_dst_args(self, hardware, uri): 318 tunnelled = False 319 if self._dst_host != "localhost": 320 tunnelled = True 321 argv = self._get_common_args(hardware, tunnelled) 322 return argv + ["-incoming", uri] 323 324 @staticmethod 325 def _get_common_wrapper(cpu_bind, mem_bind): 326 wrapper = [] 327 if len(cpu_bind) > 0 or len(mem_bind) > 0: 328 wrapper.append("numactl") 329 if cpu_bind: 330 wrapper.append("--physcpubind=%s" % ",".join(cpu_bind)) 331 if mem_bind: 332 wrapper.append("--membind=%s" % ",".join(mem_bind)) 333 334 return wrapper 335 336 def _get_src_wrapper(self, hardware): 337 return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind) 338 339 def _get_dst_wrapper(self, hardware): 340 wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind) 341 if self._dst_host != "localhost": 342 return ["ssh", 343 "-R", "9001:localhost:9001", 344 self._dst_host] + wrapper 345 else: 346 return wrapper 347 348 def _get_timings(self, vm): 349 log = vm.get_log() 350 if not log: 351 return [] 352 if self._debug: 353 print(log) 354 355 regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms" 356 matcher = re.compile(regex) 357 records = [] 358 for line in log.split("\n"): 359 match = matcher.match(line) 360 if match: 361 records.append(TimingRecord(int(match.group(1)), 362 int(match.group(2)) / 1000.0, 363 int(match.group(3)))) 364 return records 365 366 def run(self, hardware, scenario, result_dir=os.getcwd()): 367 abs_result_dir = os.path.join(result_dir, scenario._name) 368 369 if self._transport == "tcp": 370 uri = "tcp:%s:9000" % self._dst_host 371 elif self._transport == "rdma": 372 uri = "rdma:%s:9000" % self._dst_host 373 elif self._transport == "unix": 374 if self._dst_host != "localhost": 375 raise Exception("Running use unix migration transport for non-local host") 376 uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid() 377 try: 378 os.remove(uri[5:]) 379 os.remove(monaddr) 380 except: 381 pass 382 383 if self._dst_host != "localhost": 384 dstmonaddr = ("localhost", 9001) 385 else: 386 dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid() 387 srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid() 388 389 src = qemu.QEMUMachine(self._binary, 390 args=self._get_src_args(hardware), 391 wrapper=self._get_src_wrapper(hardware), 392 name="qemu-src-%d" % os.getpid(), 393 monitor_address=srcmonaddr) 394 395 dst = qemu.QEMUMachine(self._binary, 396 args=self._get_dst_args(hardware, uri), 397 wrapper=self._get_dst_wrapper(hardware), 398 name="qemu-dst-%d" % os.getpid(), 399 monitor_address=dstmonaddr) 400 401 try: 402 src.launch() 403 dst.launch() 404 405 ret = self._migrate(hardware, scenario, src, dst, uri) 406 progress_history = ret[0] 407 qemu_timings = ret[1] 408 vcpu_timings = ret[2] 409 if uri[0:5] == "unix:": 410 os.remove(uri[5:]) 411 if self._verbose: 412 print("Finished migration") 413 414 src.shutdown() 415 dst.shutdown() 416 417 return Report(hardware, scenario, progress_history, 418 Timings(self._get_timings(src) + self._get_timings(dst)), 419 Timings(qemu_timings), 420 Timings(vcpu_timings), 421 self._binary, self._dst_host, self._kernel, 422 self._initrd, self._transport, self._sleep) 423 except Exception as e: 424 if self._debug: 425 print("Failed: %s" % str(e)) 426 try: 427 src.shutdown() 428 except: 429 pass 430 try: 431 dst.shutdown() 432 except: 433 pass 434 435 if self._debug: 436 print(src.get_log()) 437 print(dst.get_log()) 438 raise 439 440