1# 2# Migration test main engine 3# 4# Copyright (c) 2016 Red Hat, Inc. 5# 6# This library is free software; you can redistribute it and/or 7# modify it under the terms of the GNU Lesser General Public 8# License as published by the Free Software Foundation; either 9# version 2.1 of the License, or (at your option) any later version. 10# 11# This library is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14# Lesser General Public License for more details. 15# 16# You should have received a copy of the GNU Lesser General Public 17# License along with this library; if not, see <http://www.gnu.org/licenses/>. 18# 19 20 21import os 22import re 23import sys 24import time 25 26from guestperf.progress import Progress, ProgressStats 27from guestperf.report import Report 28from guestperf.timings import TimingRecord, Timings 29 30sys.path.append(os.path.join(os.path.dirname(__file__), 31 '..', '..', '..', 'python')) 32from qemu.machine import QEMUMachine 33 34 35class Engine(object): 36 37 def __init__(self, binary, dst_host, kernel, initrd, transport="tcp", 38 sleep=15, verbose=False, debug=False): 39 40 self._binary = binary # Path to QEMU binary 41 self._dst_host = dst_host # Hostname of target host 42 self._kernel = kernel # Path to kernel image 43 self._initrd = initrd # Path to stress initrd 44 self._transport = transport # 'unix' or 'tcp' or 'rdma' 45 self._sleep = sleep 46 self._verbose = verbose 47 self._debug = debug 48 49 if debug: 50 self._verbose = debug 51 52 def _vcpu_timing(self, pid, tid_list): 53 records = [] 54 now = time.time() 55 56 jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK']) 57 for tid in tid_list: 58 statfile = "/proc/%d/task/%d/stat" % (pid, tid) 59 with open(statfile, "r") as fh: 60 stat = fh.readline() 61 fields = stat.split(" ") 62 stime = int(fields[13]) 63 utime = int(fields[14]) 64 records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec)) 65 return records 66 67 def _cpu_timing(self, pid): 68 now = time.time() 69 70 jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK']) 71 statfile = "/proc/%d/stat" % pid 72 with open(statfile, "r") as fh: 73 stat = fh.readline() 74 fields = stat.split(" ") 75 stime = int(fields[13]) 76 utime = int(fields[14]) 77 return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec) 78 79 def _migrate_progress(self, vm): 80 info = vm.command("query-migrate") 81 82 if "ram" not in info: 83 info["ram"] = {} 84 85 return Progress( 86 info.get("status", "active"), 87 ProgressStats( 88 info["ram"].get("transferred", 0), 89 info["ram"].get("remaining", 0), 90 info["ram"].get("total", 0), 91 info["ram"].get("duplicate", 0), 92 info["ram"].get("skipped", 0), 93 info["ram"].get("normal", 0), 94 info["ram"].get("normal-bytes", 0), 95 info["ram"].get("dirty-pages-rate", 0), 96 info["ram"].get("mbps", 0), 97 info["ram"].get("dirty-sync-count", 0) 98 ), 99 time.time(), 100 info.get("total-time", 0), 101 info.get("downtime", 0), 102 info.get("expected-downtime", 0), 103 info.get("setup-time", 0), 104 info.get("cpu-throttle-percentage", 0), 105 ) 106 107 def _migrate(self, hardware, scenario, src, dst, connect_uri): 108 src_qemu_time = [] 109 src_vcpu_time = [] 110 src_pid = src.get_pid() 111 112 vcpus = src.command("query-cpus-fast") 113 src_threads = [] 114 for vcpu in vcpus: 115 src_threads.append(vcpu["thread-id"]) 116 117 # XXX how to get dst timings on remote host ? 118 119 if self._verbose: 120 print("Sleeping %d seconds for initial guest workload run" % self._sleep) 121 sleep_secs = self._sleep 122 while sleep_secs > 1: 123 src_qemu_time.append(self._cpu_timing(src_pid)) 124 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads)) 125 time.sleep(1) 126 sleep_secs -= 1 127 128 if self._verbose: 129 print("Starting migration") 130 if scenario._auto_converge: 131 resp = src.command("migrate-set-capabilities", 132 capabilities = [ 133 { "capability": "auto-converge", 134 "state": True } 135 ]) 136 resp = src.command("migrate-set-parameters", 137 cpu_throttle_increment=scenario._auto_converge_step) 138 139 if scenario._post_copy: 140 resp = src.command("migrate-set-capabilities", 141 capabilities = [ 142 { "capability": "postcopy-ram", 143 "state": True } 144 ]) 145 resp = dst.command("migrate-set-capabilities", 146 capabilities = [ 147 { "capability": "postcopy-ram", 148 "state": True } 149 ]) 150 151 resp = src.command("migrate-set-parameters", 152 max_bandwidth=scenario._bandwidth * 1024 * 1024) 153 154 resp = src.command("migrate-set-parameters", 155 downtime_limit=scenario._downtime) 156 157 if scenario._compression_mt: 158 resp = src.command("migrate-set-capabilities", 159 capabilities = [ 160 { "capability": "compress", 161 "state": True } 162 ]) 163 resp = src.command("migrate-set-parameters", 164 compress_threads=scenario._compression_mt_threads) 165 resp = dst.command("migrate-set-capabilities", 166 capabilities = [ 167 { "capability": "compress", 168 "state": True } 169 ]) 170 resp = dst.command("migrate-set-parameters", 171 decompress_threads=scenario._compression_mt_threads) 172 173 if scenario._compression_xbzrle: 174 resp = src.command("migrate-set-capabilities", 175 capabilities = [ 176 { "capability": "xbzrle", 177 "state": True } 178 ]) 179 resp = dst.command("migrate-set-capabilities", 180 capabilities = [ 181 { "capability": "xbzrle", 182 "state": True } 183 ]) 184 resp = src.command("migrate-set-parameters", 185 xbzrle_cache_size=( 186 hardware._mem * 187 1024 * 1024 * 1024 / 100 * 188 scenario._compression_xbzrle_cache)) 189 190 if scenario._multifd: 191 resp = src.command("migrate-set-capabilities", 192 capabilities = [ 193 { "capability": "multifd", 194 "state": True } 195 ]) 196 resp = src.command("migrate-set-parameters", 197 multifd_channels=scenario._multifd_channels) 198 resp = dst.command("migrate-set-capabilities", 199 capabilities = [ 200 { "capability": "multifd", 201 "state": True } 202 ]) 203 resp = dst.command("migrate-set-parameters", 204 multifd_channels=scenario._multifd_channels) 205 206 resp = src.command("migrate", uri=connect_uri) 207 208 post_copy = False 209 paused = False 210 211 progress_history = [] 212 213 start = time.time() 214 loop = 0 215 while True: 216 loop = loop + 1 217 time.sleep(0.05) 218 219 progress = self._migrate_progress(src) 220 if (loop % 20) == 0: 221 src_qemu_time.append(self._cpu_timing(src_pid)) 222 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads)) 223 224 if (len(progress_history) == 0 or 225 (progress_history[-1]._ram._iterations < 226 progress._ram._iterations)): 227 progress_history.append(progress) 228 229 if progress._status in ("completed", "failed", "cancelled"): 230 if progress._status == "completed" and paused: 231 dst.command("cont") 232 if progress_history[-1] != progress: 233 progress_history.append(progress) 234 235 if progress._status == "completed": 236 if self._verbose: 237 print("Sleeping %d seconds for final guest workload run" % self._sleep) 238 sleep_secs = self._sleep 239 while sleep_secs > 1: 240 time.sleep(1) 241 src_qemu_time.append(self._cpu_timing(src_pid)) 242 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads)) 243 sleep_secs -= 1 244 245 return [progress_history, src_qemu_time, src_vcpu_time] 246 247 if self._verbose and (loop % 20) == 0: 248 print("Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % ( 249 progress._ram._iterations, 250 progress._ram._remaining_bytes / (1024 * 1024), 251 progress._ram._total_bytes / (1024 * 1024), 252 progress._ram._transferred_bytes / (1024 * 1024), 253 progress._ram._transfer_rate_mbs, 254 )) 255 256 if progress._ram._iterations > scenario._max_iters: 257 if self._verbose: 258 print("No completion after %d iterations over RAM" % scenario._max_iters) 259 src.command("migrate_cancel") 260 continue 261 262 if time.time() > (start + scenario._max_time): 263 if self._verbose: 264 print("No completion after %d seconds" % scenario._max_time) 265 src.command("migrate_cancel") 266 continue 267 268 if (scenario._post_copy and 269 progress._ram._iterations >= scenario._post_copy_iters and 270 not post_copy): 271 if self._verbose: 272 print("Switching to post-copy after %d iterations" % scenario._post_copy_iters) 273 resp = src.command("migrate-start-postcopy") 274 post_copy = True 275 276 if (scenario._pause and 277 progress._ram._iterations >= scenario._pause_iters and 278 not paused): 279 if self._verbose: 280 print("Pausing VM after %d iterations" % scenario._pause_iters) 281 resp = src.command("stop") 282 paused = True 283 284 def _is_ppc64le(self): 285 _, _, _, _, machine = os.uname() 286 if machine == "ppc64le": 287 return True 288 return False 289 290 def _get_guest_console_args(self): 291 if self._is_ppc64le(): 292 return "console=hvc0" 293 else: 294 return "console=ttyS0" 295 296 def _get_qemu_serial_args(self): 297 if self._is_ppc64le(): 298 return ["-chardev", "stdio,id=cdev0", 299 "-device", "spapr-vty,chardev=cdev0"] 300 else: 301 return ["-chardev", "stdio,id=cdev0", 302 "-device", "isa-serial,chardev=cdev0"] 303 304 def _get_common_args(self, hardware, tunnelled=False): 305 args = [ 306 "noapic", 307 "edd=off", 308 "printk.time=1", 309 "noreplace-smp", 310 "cgroup_disable=memory", 311 "pci=noearly", 312 ] 313 314 args.append(self._get_guest_console_args()) 315 316 if self._debug: 317 args.append("debug") 318 else: 319 args.append("quiet") 320 321 args.append("ramsize=%s" % hardware._mem) 322 323 cmdline = " ".join(args) 324 if tunnelled: 325 cmdline = "'" + cmdline + "'" 326 327 argv = [ 328 "-accel", "kvm", 329 "-cpu", "host", 330 "-kernel", self._kernel, 331 "-initrd", self._initrd, 332 "-append", cmdline, 333 "-m", str((hardware._mem * 1024) + 512), 334 "-smp", str(hardware._cpus), 335 ] 336 337 argv.extend(self._get_qemu_serial_args()) 338 339 if self._debug: 340 argv.extend(["-device", "sga"]) 341 342 if hardware._prealloc_pages: 343 argv_source += ["-mem-path", "/dev/shm", 344 "-mem-prealloc"] 345 if hardware._locked_pages: 346 argv_source += ["-overcommit", "mem-lock=on"] 347 if hardware._huge_pages: 348 pass 349 350 return argv 351 352 def _get_src_args(self, hardware): 353 return self._get_common_args(hardware) 354 355 def _get_dst_args(self, hardware, uri): 356 tunnelled = False 357 if self._dst_host != "localhost": 358 tunnelled = True 359 argv = self._get_common_args(hardware, tunnelled) 360 return argv + ["-incoming", uri] 361 362 @staticmethod 363 def _get_common_wrapper(cpu_bind, mem_bind): 364 wrapper = [] 365 if len(cpu_bind) > 0 or len(mem_bind) > 0: 366 wrapper.append("numactl") 367 if cpu_bind: 368 wrapper.append("--physcpubind=%s" % ",".join(cpu_bind)) 369 if mem_bind: 370 wrapper.append("--membind=%s" % ",".join(mem_bind)) 371 372 return wrapper 373 374 def _get_src_wrapper(self, hardware): 375 return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind) 376 377 def _get_dst_wrapper(self, hardware): 378 wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind) 379 if self._dst_host != "localhost": 380 return ["ssh", 381 "-R", "9001:localhost:9001", 382 self._dst_host] + wrapper 383 else: 384 return wrapper 385 386 def _get_timings(self, vm): 387 log = vm.get_log() 388 if not log: 389 return [] 390 if self._debug: 391 print(log) 392 393 regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms" 394 matcher = re.compile(regex) 395 records = [] 396 for line in log.split("\n"): 397 match = matcher.match(line) 398 if match: 399 records.append(TimingRecord(int(match.group(1)), 400 int(match.group(2)) / 1000.0, 401 int(match.group(3)))) 402 return records 403 404 def run(self, hardware, scenario, result_dir=os.getcwd()): 405 abs_result_dir = os.path.join(result_dir, scenario._name) 406 407 if self._transport == "tcp": 408 uri = "tcp:%s:9000" % self._dst_host 409 elif self._transport == "rdma": 410 uri = "rdma:%s:9000" % self._dst_host 411 elif self._transport == "unix": 412 if self._dst_host != "localhost": 413 raise Exception("Running use unix migration transport for non-local host") 414 uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid() 415 try: 416 os.remove(uri[5:]) 417 os.remove(monaddr) 418 except: 419 pass 420 421 if self._dst_host != "localhost": 422 dstmonaddr = ("localhost", 9001) 423 else: 424 dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid() 425 srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid() 426 427 src = QEMUMachine(self._binary, 428 args=self._get_src_args(hardware), 429 wrapper=self._get_src_wrapper(hardware), 430 name="qemu-src-%d" % os.getpid(), 431 monitor_address=srcmonaddr) 432 433 dst = QEMUMachine(self._binary, 434 args=self._get_dst_args(hardware, uri), 435 wrapper=self._get_dst_wrapper(hardware), 436 name="qemu-dst-%d" % os.getpid(), 437 monitor_address=dstmonaddr) 438 439 try: 440 src.launch() 441 dst.launch() 442 443 ret = self._migrate(hardware, scenario, src, dst, uri) 444 progress_history = ret[0] 445 qemu_timings = ret[1] 446 vcpu_timings = ret[2] 447 if uri[0:5] == "unix:" and os.path.exists(uri[5:]): 448 os.remove(uri[5:]) 449 450 if os.path.exists(srcmonaddr): 451 os.remove(srcmonaddr) 452 453 if self._dst_host == "localhost" and os.path.exists(dstmonaddr): 454 os.remove(dstmonaddr) 455 456 if self._verbose: 457 print("Finished migration") 458 459 src.shutdown() 460 dst.shutdown() 461 462 return Report(hardware, scenario, progress_history, 463 Timings(self._get_timings(src) + self._get_timings(dst)), 464 Timings(qemu_timings), 465 Timings(vcpu_timings), 466 self._binary, self._dst_host, self._kernel, 467 self._initrd, self._transport, self._sleep) 468 except Exception as e: 469 if self._debug: 470 print("Failed: %s" % str(e)) 471 try: 472 src.shutdown() 473 except: 474 pass 475 try: 476 dst.shutdown() 477 except: 478 pass 479 480 if self._debug: 481 print(src.get_log()) 482 print(dst.get_log()) 483 raise 484 485