1# 2# Migration test main engine 3# 4# Copyright (c) 2016 Red Hat, Inc. 5# 6# This library is free software; you can redistribute it and/or 7# modify it under the terms of the GNU Lesser General Public 8# License as published by the Free Software Foundation; either 9# version 2.1 of the License, or (at your option) any later version. 10# 11# This library is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14# Lesser General Public License for more details. 15# 16# You should have received a copy of the GNU Lesser General Public 17# License along with this library; if not, see <http://www.gnu.org/licenses/>. 18# 19 20 21import os 22import re 23import sys 24import time 25 26from guestperf.progress import Progress, ProgressStats 27from guestperf.report import Report 28from guestperf.timings import TimingRecord, Timings 29 30sys.path.append(os.path.join(os.path.dirname(__file__), 31 '..', '..', '..', 'python')) 32from qemu.machine import QEMUMachine 33 34 35class Engine(object): 36 37 def __init__(self, binary, dst_host, kernel, initrd, transport="tcp", 38 sleep=15, verbose=False, debug=False): 39 40 self._binary = binary # Path to QEMU binary 41 self._dst_host = dst_host # Hostname of target host 42 self._kernel = kernel # Path to kernel image 43 self._initrd = initrd # Path to stress initrd 44 self._transport = transport # 'unix' or 'tcp' or 'rdma' 45 self._sleep = sleep 46 self._verbose = verbose 47 self._debug = debug 48 49 if debug: 50 self._verbose = debug 51 52 def _vcpu_timing(self, pid, tid_list): 53 records = [] 54 now = time.time() 55 56 jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK']) 57 for tid in tid_list: 58 statfile = "/proc/%d/task/%d/stat" % (pid, tid) 59 with open(statfile, "r") as fh: 60 stat = fh.readline() 61 fields = stat.split(" ") 62 stime = int(fields[13]) 63 utime = int(fields[14]) 64 records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec)) 65 return records 66 67 def _cpu_timing(self, pid): 68 records = [] 69 now = time.time() 70 71 jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK']) 72 statfile = "/proc/%d/stat" % pid 73 with open(statfile, "r") as fh: 74 stat = fh.readline() 75 fields = stat.split(" ") 76 stime = int(fields[13]) 77 utime = int(fields[14]) 78 return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec) 79 80 def _migrate_progress(self, vm): 81 info = vm.command("query-migrate") 82 83 if "ram" not in info: 84 info["ram"] = {} 85 86 return Progress( 87 info.get("status", "active"), 88 ProgressStats( 89 info["ram"].get("transferred", 0), 90 info["ram"].get("remaining", 0), 91 info["ram"].get("total", 0), 92 info["ram"].get("duplicate", 0), 93 info["ram"].get("skipped", 0), 94 info["ram"].get("normal", 0), 95 info["ram"].get("normal-bytes", 0), 96 info["ram"].get("dirty-pages-rate", 0), 97 info["ram"].get("mbps", 0), 98 info["ram"].get("dirty-sync-count", 0) 99 ), 100 time.time(), 101 info.get("total-time", 0), 102 info.get("downtime", 0), 103 info.get("expected-downtime", 0), 104 info.get("setup-time", 0), 105 info.get("cpu-throttle-percentage", 0), 106 ) 107 108 def _migrate(self, hardware, scenario, src, dst, connect_uri): 109 src_qemu_time = [] 110 src_vcpu_time = [] 111 src_pid = src.get_pid() 112 113 vcpus = src.command("query-cpus-fast") 114 src_threads = [] 115 for vcpu in vcpus: 116 src_threads.append(vcpu["thread_id"]) 117 118 # XXX how to get dst timings on remote host ? 119 120 if self._verbose: 121 print("Sleeping %d seconds for initial guest workload run" % self._sleep) 122 sleep_secs = self._sleep 123 while sleep_secs > 1: 124 src_qemu_time.append(self._cpu_timing(src_pid)) 125 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads)) 126 time.sleep(1) 127 sleep_secs -= 1 128 129 if self._verbose: 130 print("Starting migration") 131 if scenario._auto_converge: 132 resp = src.command("migrate-set-capabilities", 133 capabilities = [ 134 { "capability": "auto-converge", 135 "state": True } 136 ]) 137 resp = src.command("migrate-set-parameters", 138 cpu_throttle_increment=scenario._auto_converge_step) 139 140 if scenario._post_copy: 141 resp = src.command("migrate-set-capabilities", 142 capabilities = [ 143 { "capability": "postcopy-ram", 144 "state": True } 145 ]) 146 resp = dst.command("migrate-set-capabilities", 147 capabilities = [ 148 { "capability": "postcopy-ram", 149 "state": True } 150 ]) 151 152 resp = src.command("migrate-set-parameters", 153 max_bandwidth=scenario._bandwidth * 1024 * 1024) 154 155 resp = src.command("migrate-set-parameters", 156 downtime_limit=scenario._downtime / 1024.0) 157 158 if scenario._compression_mt: 159 resp = src.command("migrate-set-capabilities", 160 capabilities = [ 161 { "capability": "compress", 162 "state": True } 163 ]) 164 resp = src.command("migrate-set-parameters", 165 compress_threads=scenario._compression_mt_threads) 166 resp = dst.command("migrate-set-capabilities", 167 capabilities = [ 168 { "capability": "compress", 169 "state": True } 170 ]) 171 resp = dst.command("migrate-set-parameters", 172 decompress_threads=scenario._compression_mt_threads) 173 174 if scenario._compression_xbzrle: 175 resp = src.command("migrate-set-capabilities", 176 capabilities = [ 177 { "capability": "xbzrle", 178 "state": True } 179 ]) 180 resp = dst.command("migrate-set-capabilities", 181 capabilities = [ 182 { "capability": "xbzrle", 183 "state": True } 184 ]) 185 resp = src.command("migrate-set-parameters", 186 xbzrle_cache_size=( 187 hardware._mem * 188 1024 * 1024 * 1024 / 100 * 189 scenario._compression_xbzrle_cache)) 190 191 if scenario._multifd: 192 resp = src.command("migrate-set-capabilities", 193 capabilities = [ 194 { "capability": "multifd", 195 "state": True } 196 ]) 197 resp = src.command("migrate-set-parameters", 198 multifd_channels=scenario._multifd_channels) 199 resp = dst.command("migrate-set-capabilities", 200 capabilities = [ 201 { "capability": "multifd", 202 "state": True } 203 ]) 204 resp = dst.command("migrate-set-parameters", 205 multifd_channels=scenario._multifd_channels) 206 207 resp = src.command("migrate", uri=connect_uri) 208 209 post_copy = False 210 paused = False 211 212 progress_history = [] 213 214 start = time.time() 215 loop = 0 216 while True: 217 loop = loop + 1 218 time.sleep(0.05) 219 220 progress = self._migrate_progress(src) 221 if (loop % 20) == 0: 222 src_qemu_time.append(self._cpu_timing(src_pid)) 223 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads)) 224 225 if (len(progress_history) == 0 or 226 (progress_history[-1]._ram._iterations < 227 progress._ram._iterations)): 228 progress_history.append(progress) 229 230 if progress._status in ("completed", "failed", "cancelled"): 231 if progress._status == "completed" and paused: 232 dst.command("cont") 233 if progress_history[-1] != progress: 234 progress_history.append(progress) 235 236 if progress._status == "completed": 237 if self._verbose: 238 print("Sleeping %d seconds for final guest workload run" % self._sleep) 239 sleep_secs = self._sleep 240 while sleep_secs > 1: 241 time.sleep(1) 242 src_qemu_time.append(self._cpu_timing(src_pid)) 243 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads)) 244 sleep_secs -= 1 245 246 return [progress_history, src_qemu_time, src_vcpu_time] 247 248 if self._verbose and (loop % 20) == 0: 249 print("Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % ( 250 progress._ram._iterations, 251 progress._ram._remaining_bytes / (1024 * 1024), 252 progress._ram._total_bytes / (1024 * 1024), 253 progress._ram._transferred_bytes / (1024 * 1024), 254 progress._ram._transfer_rate_mbs, 255 )) 256 257 if progress._ram._iterations > scenario._max_iters: 258 if self._verbose: 259 print("No completion after %d iterations over RAM" % scenario._max_iters) 260 src.command("migrate_cancel") 261 continue 262 263 if time.time() > (start + scenario._max_time): 264 if self._verbose: 265 print("No completion after %d seconds" % scenario._max_time) 266 src.command("migrate_cancel") 267 continue 268 269 if (scenario._post_copy and 270 progress._ram._iterations >= scenario._post_copy_iters and 271 not post_copy): 272 if self._verbose: 273 print("Switching to post-copy after %d iterations" % scenario._post_copy_iters) 274 resp = src.command("migrate-start-postcopy") 275 post_copy = True 276 277 if (scenario._pause and 278 progress._ram._iterations >= scenario._pause_iters and 279 not paused): 280 if self._verbose: 281 print("Pausing VM after %d iterations" % scenario._pause_iters) 282 resp = src.command("stop") 283 paused = True 284 285 def _get_common_args(self, hardware, tunnelled=False): 286 args = [ 287 "noapic", 288 "edd=off", 289 "printk.time=1", 290 "noreplace-smp", 291 "cgroup_disable=memory", 292 "pci=noearly", 293 "console=ttyS0", 294 ] 295 if self._debug: 296 args.append("debug") 297 else: 298 args.append("quiet") 299 300 args.append("ramsize=%s" % hardware._mem) 301 302 cmdline = " ".join(args) 303 if tunnelled: 304 cmdline = "'" + cmdline + "'" 305 306 argv = [ 307 "-accel", "kvm", 308 "-cpu", "host", 309 "-kernel", self._kernel, 310 "-initrd", self._initrd, 311 "-append", cmdline, 312 "-chardev", "stdio,id=cdev0", 313 "-device", "isa-serial,chardev=cdev0", 314 "-m", str((hardware._mem * 1024) + 512), 315 "-smp", str(hardware._cpus), 316 ] 317 318 if self._debug: 319 argv.extend(["-device", "sga"]) 320 321 if hardware._prealloc_pages: 322 argv_source += ["-mem-path", "/dev/shm", 323 "-mem-prealloc"] 324 if hardware._locked_pages: 325 argv_source += ["-overcommit", "mem-lock=on"] 326 if hardware._huge_pages: 327 pass 328 329 return argv 330 331 def _get_src_args(self, hardware): 332 return self._get_common_args(hardware) 333 334 def _get_dst_args(self, hardware, uri): 335 tunnelled = False 336 if self._dst_host != "localhost": 337 tunnelled = True 338 argv = self._get_common_args(hardware, tunnelled) 339 return argv + ["-incoming", uri] 340 341 @staticmethod 342 def _get_common_wrapper(cpu_bind, mem_bind): 343 wrapper = [] 344 if len(cpu_bind) > 0 or len(mem_bind) > 0: 345 wrapper.append("numactl") 346 if cpu_bind: 347 wrapper.append("--physcpubind=%s" % ",".join(cpu_bind)) 348 if mem_bind: 349 wrapper.append("--membind=%s" % ",".join(mem_bind)) 350 351 return wrapper 352 353 def _get_src_wrapper(self, hardware): 354 return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind) 355 356 def _get_dst_wrapper(self, hardware): 357 wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind) 358 if self._dst_host != "localhost": 359 return ["ssh", 360 "-R", "9001:localhost:9001", 361 self._dst_host] + wrapper 362 else: 363 return wrapper 364 365 def _get_timings(self, vm): 366 log = vm.get_log() 367 if not log: 368 return [] 369 if self._debug: 370 print(log) 371 372 regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms" 373 matcher = re.compile(regex) 374 records = [] 375 for line in log.split("\n"): 376 match = matcher.match(line) 377 if match: 378 records.append(TimingRecord(int(match.group(1)), 379 int(match.group(2)) / 1000.0, 380 int(match.group(3)))) 381 return records 382 383 def run(self, hardware, scenario, result_dir=os.getcwd()): 384 abs_result_dir = os.path.join(result_dir, scenario._name) 385 386 if self._transport == "tcp": 387 uri = "tcp:%s:9000" % self._dst_host 388 elif self._transport == "rdma": 389 uri = "rdma:%s:9000" % self._dst_host 390 elif self._transport == "unix": 391 if self._dst_host != "localhost": 392 raise Exception("Running use unix migration transport for non-local host") 393 uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid() 394 try: 395 os.remove(uri[5:]) 396 os.remove(monaddr) 397 except: 398 pass 399 400 if self._dst_host != "localhost": 401 dstmonaddr = ("localhost", 9001) 402 else: 403 dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid() 404 srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid() 405 406 src = QEMUMachine(self._binary, 407 args=self._get_src_args(hardware), 408 wrapper=self._get_src_wrapper(hardware), 409 name="qemu-src-%d" % os.getpid(), 410 monitor_address=srcmonaddr) 411 412 dst = QEMUMachine(self._binary, 413 args=self._get_dst_args(hardware, uri), 414 wrapper=self._get_dst_wrapper(hardware), 415 name="qemu-dst-%d" % os.getpid(), 416 monitor_address=dstmonaddr) 417 418 try: 419 src.launch() 420 dst.launch() 421 422 ret = self._migrate(hardware, scenario, src, dst, uri) 423 progress_history = ret[0] 424 qemu_timings = ret[1] 425 vcpu_timings = ret[2] 426 if uri[0:5] == "unix:": 427 os.remove(uri[5:]) 428 429 if os.path.exists(srcmonaddr): 430 os.remove(srcmonaddr) 431 432 if self._dst_host == "localhost" and os.path.exists(dstmonaddr): 433 os.remove(dstmonaddr) 434 435 if self._verbose: 436 print("Finished migration") 437 438 src.shutdown() 439 dst.shutdown() 440 441 return Report(hardware, scenario, progress_history, 442 Timings(self._get_timings(src) + self._get_timings(dst)), 443 Timings(qemu_timings), 444 Timings(vcpu_timings), 445 self._binary, self._dst_host, self._kernel, 446 self._initrd, self._transport, self._sleep) 447 except Exception as e: 448 if self._debug: 449 print("Failed: %s" % str(e)) 450 try: 451 src.shutdown() 452 except: 453 pass 454 try: 455 dst.shutdown() 456 except: 457 pass 458 459 if self._debug: 460 print(src.get_log()) 461 print(dst.get_log()) 462 raise 463 464