1#
2# Migration test main engine
3#
4# Copyright (c) 2016 Red Hat, Inc.
5#
6# This library is free software; you can redistribute it and/or
7# modify it under the terms of the GNU Lesser General Public
8# License as published by the Free Software Foundation; either
9# version 2.1 of the License, or (at your option) any later version.
10#
11# This library is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14# Lesser General Public License for more details.
15#
16# You should have received a copy of the GNU Lesser General Public
17# License along with this library; if not, see <http://www.gnu.org/licenses/>.
18#
19
20
21import os
22import re
23import sys
24import time
25
26from guestperf.progress import Progress, ProgressStats
27from guestperf.report import Report
28from guestperf.timings import TimingRecord, Timings
29
30sys.path.append(os.path.join(os.path.dirname(__file__),
31                             '..', '..', '..', 'python'))
32from qemu.machine import QEMUMachine
33
34
35class Engine(object):
36
37    def __init__(self, binary, dst_host, kernel, initrd, transport="tcp",
38                 sleep=15, verbose=False, debug=False):
39
40        self._binary = binary # Path to QEMU binary
41        self._dst_host = dst_host # Hostname of target host
42        self._kernel = kernel # Path to kernel image
43        self._initrd = initrd # Path to stress initrd
44        self._transport = transport # 'unix' or 'tcp' or 'rdma'
45        self._sleep = sleep
46        self._verbose = verbose
47        self._debug = debug
48
49        if debug:
50            self._verbose = debug
51
52    def _vcpu_timing(self, pid, tid_list):
53        records = []
54        now = time.time()
55
56        jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
57        for tid in tid_list:
58            statfile = "/proc/%d/task/%d/stat" % (pid, tid)
59            with open(statfile, "r") as fh:
60                stat = fh.readline()
61                fields = stat.split(" ")
62                stime = int(fields[13])
63                utime = int(fields[14])
64                records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec))
65        return records
66
67    def _cpu_timing(self, pid):
68        now = time.time()
69
70        jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
71        statfile = "/proc/%d/stat" % pid
72        with open(statfile, "r") as fh:
73            stat = fh.readline()
74            fields = stat.split(" ")
75            stime = int(fields[13])
76            utime = int(fields[14])
77            return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec)
78
79    def _migrate_progress(self, vm):
80        info = vm.cmd("query-migrate")
81
82        if "ram" not in info:
83            info["ram"] = {}
84
85        return Progress(
86            info.get("status", "active"),
87            ProgressStats(
88                info["ram"].get("transferred", 0),
89                info["ram"].get("remaining", 0),
90                info["ram"].get("total", 0),
91                info["ram"].get("duplicate", 0),
92                info["ram"].get("skipped", 0),
93                info["ram"].get("normal", 0),
94                info["ram"].get("normal-bytes", 0),
95                info["ram"].get("dirty-pages-rate", 0),
96                info["ram"].get("mbps", 0),
97                info["ram"].get("dirty-sync-count", 0)
98            ),
99            time.time(),
100            info.get("total-time", 0),
101            info.get("downtime", 0),
102            info.get("expected-downtime", 0),
103            info.get("setup-time", 0),
104            info.get("cpu-throttle-percentage", 0),
105            info.get("dirty-limit-throttle-time-per-round", 0),
106            info.get("dirty-limit-ring-full-time", 0),
107        )
108
109    def _migrate(self, hardware, scenario, src, dst, connect_uri):
110        src_qemu_time = []
111        src_vcpu_time = []
112        src_pid = src.get_pid()
113
114        vcpus = src.cmd("query-cpus-fast")
115        src_threads = []
116        for vcpu in vcpus:
117            src_threads.append(vcpu["thread-id"])
118
119        # XXX how to get dst timings on remote host ?
120
121        if self._verbose:
122            print("Sleeping %d seconds for initial guest workload run" % self._sleep)
123        sleep_secs = self._sleep
124        while sleep_secs > 1:
125            src_qemu_time.append(self._cpu_timing(src_pid))
126            src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
127            time.sleep(1)
128            sleep_secs -= 1
129
130        if self._verbose:
131            print("Starting migration")
132        if scenario._auto_converge:
133            resp = src.cmd("migrate-set-capabilities",
134                           capabilities = [
135                               { "capability": "auto-converge",
136                                 "state": True }
137                           ])
138            resp = src.cmd("migrate-set-parameters",
139                           cpu_throttle_increment=scenario._auto_converge_step)
140
141        if scenario._post_copy:
142            resp = src.cmd("migrate-set-capabilities",
143                           capabilities = [
144                               { "capability": "postcopy-ram",
145                                 "state": True }
146                           ])
147            resp = dst.cmd("migrate-set-capabilities",
148                           capabilities = [
149                               { "capability": "postcopy-ram",
150                                 "state": True }
151                           ])
152
153        resp = src.cmd("migrate-set-parameters",
154                       max_bandwidth=scenario._bandwidth * 1024 * 1024)
155
156        resp = src.cmd("migrate-set-parameters",
157                       downtime_limit=scenario._downtime)
158
159        if scenario._compression_mt:
160            resp = src.cmd("migrate-set-capabilities",
161                           capabilities = [
162                               { "capability": "compress",
163                                 "state": True }
164                           ])
165            resp = src.cmd("migrate-set-parameters",
166                           compress_threads=scenario._compression_mt_threads)
167            resp = dst.cmd("migrate-set-capabilities",
168                           capabilities = [
169                               { "capability": "compress",
170                                 "state": True }
171                           ])
172            resp = dst.cmd("migrate-set-parameters",
173                           decompress_threads=scenario._compression_mt_threads)
174
175        if scenario._compression_xbzrle:
176            resp = src.cmd("migrate-set-capabilities",
177                           capabilities = [
178                               { "capability": "xbzrle",
179                                 "state": True }
180                           ])
181            resp = dst.cmd("migrate-set-capabilities",
182                           capabilities = [
183                               { "capability": "xbzrle",
184                                 "state": True }
185                           ])
186            resp = src.cmd("migrate-set-parameters",
187                           xbzrle_cache_size=(
188                               hardware._mem *
189                               1024 * 1024 * 1024 / 100 *
190                               scenario._compression_xbzrle_cache))
191
192        if scenario._multifd:
193            resp = src.cmd("migrate-set-capabilities",
194                           capabilities = [
195                               { "capability": "multifd",
196                                 "state": True }
197                           ])
198            resp = src.cmd("migrate-set-parameters",
199                           multifd_channels=scenario._multifd_channels)
200            resp = dst.cmd("migrate-set-capabilities",
201                           capabilities = [
202                               { "capability": "multifd",
203                                 "state": True }
204                           ])
205            resp = dst.cmd("migrate-set-parameters",
206                           multifd_channels=scenario._multifd_channels)
207
208        if scenario._dirty_limit:
209            if not hardware._dirty_ring_size:
210                raise Exception("dirty ring size must be configured when "
211                                "testing dirty limit migration")
212
213            resp = src.cmd("migrate-set-capabilities",
214                           capabilities = [
215                               { "capability": "dirty-limit",
216                                 "state": True }
217                           ])
218            resp = src.cmd("migrate-set-parameters",
219                x_vcpu_dirty_limit_period=scenario._x_vcpu_dirty_limit_period)
220            resp = src.cmd("migrate-set-parameters",
221                           vcpu_dirty_limit=scenario._vcpu_dirty_limit)
222
223        resp = src.cmd("migrate", uri=connect_uri)
224
225        post_copy = False
226        paused = False
227
228        progress_history = []
229
230        start = time.time()
231        loop = 0
232        while True:
233            loop = loop + 1
234            time.sleep(0.05)
235
236            progress = self._migrate_progress(src)
237            if (loop % 20) == 0:
238                src_qemu_time.append(self._cpu_timing(src_pid))
239                src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
240
241            if (len(progress_history) == 0 or
242                (progress_history[-1]._ram._iterations <
243                 progress._ram._iterations)):
244                progress_history.append(progress)
245
246            if progress._status in ("completed", "failed", "cancelled"):
247                if progress._status == "completed" and paused:
248                    dst.cmd("cont")
249                if progress_history[-1] != progress:
250                    progress_history.append(progress)
251
252                if progress._status == "completed":
253                    if self._verbose:
254                        print("Sleeping %d seconds for final guest workload run" % self._sleep)
255                    sleep_secs = self._sleep
256                    while sleep_secs > 1:
257                        time.sleep(1)
258                        src_qemu_time.append(self._cpu_timing(src_pid))
259                        src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
260                        sleep_secs -= 1
261
262                return [progress_history, src_qemu_time, src_vcpu_time]
263
264            if self._verbose and (loop % 20) == 0:
265                print("Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % (
266                    progress._ram._iterations,
267                    progress._ram._remaining_bytes / (1024 * 1024),
268                    progress._ram._total_bytes / (1024 * 1024),
269                    progress._ram._transferred_bytes / (1024 * 1024),
270                    progress._ram._transfer_rate_mbs,
271                ))
272
273            if progress._ram._iterations > scenario._max_iters:
274                if self._verbose:
275                    print("No completion after %d iterations over RAM" % scenario._max_iters)
276                src.cmd("migrate_cancel")
277                continue
278
279            if time.time() > (start + scenario._max_time):
280                if self._verbose:
281                    print("No completion after %d seconds" % scenario._max_time)
282                src.cmd("migrate_cancel")
283                continue
284
285            if (scenario._post_copy and
286                progress._ram._iterations >= scenario._post_copy_iters and
287                not post_copy):
288                if self._verbose:
289                    print("Switching to post-copy after %d iterations" % scenario._post_copy_iters)
290                resp = src.cmd("migrate-start-postcopy")
291                post_copy = True
292
293            if (scenario._pause and
294                progress._ram._iterations >= scenario._pause_iters and
295                not paused):
296                if self._verbose:
297                    print("Pausing VM after %d iterations" % scenario._pause_iters)
298                resp = src.cmd("stop")
299                paused = True
300
301    def _is_ppc64le(self):
302        _, _, _, _, machine = os.uname()
303        if machine == "ppc64le":
304            return True
305        return False
306
307    def _get_guest_console_args(self):
308        if self._is_ppc64le():
309            return "console=hvc0"
310        else:
311            return "console=ttyS0"
312
313    def _get_qemu_serial_args(self):
314        if self._is_ppc64le():
315            return ["-chardev", "stdio,id=cdev0",
316                    "-device", "spapr-vty,chardev=cdev0"]
317        else:
318            return ["-chardev", "stdio,id=cdev0",
319                    "-device", "isa-serial,chardev=cdev0"]
320
321    def _get_common_args(self, hardware, tunnelled=False):
322        args = [
323            "noapic",
324            "edd=off",
325            "printk.time=1",
326            "noreplace-smp",
327            "cgroup_disable=memory",
328            "pci=noearly",
329        ]
330
331        args.append(self._get_guest_console_args())
332
333        if self._debug:
334            args.append("debug")
335        else:
336            args.append("quiet")
337
338        args.append("ramsize=%s" % hardware._mem)
339
340        cmdline = " ".join(args)
341        if tunnelled:
342            cmdline = "'" + cmdline + "'"
343
344        argv = [
345            "-cpu", "host",
346            "-kernel", self._kernel,
347            "-initrd", self._initrd,
348            "-append", cmdline,
349            "-m", str((hardware._mem * 1024) + 512),
350            "-smp", str(hardware._cpus),
351        ]
352        if hardware._dirty_ring_size:
353            argv.extend(["-accel", "kvm,dirty-ring-size=%s" %
354                         hardware._dirty_ring_size])
355        else:
356            argv.extend(["-accel", "kvm"])
357
358        argv.extend(self._get_qemu_serial_args())
359
360        if self._debug:
361            argv.extend(["-machine", "graphics=off"])
362
363        if hardware._prealloc_pages:
364            argv_source += ["-mem-path", "/dev/shm",
365                            "-mem-prealloc"]
366        if hardware._locked_pages:
367            argv_source += ["-overcommit", "mem-lock=on"]
368        if hardware._huge_pages:
369            pass
370
371        return argv
372
373    def _get_src_args(self, hardware):
374        return self._get_common_args(hardware)
375
376    def _get_dst_args(self, hardware, uri):
377        tunnelled = False
378        if self._dst_host != "localhost":
379            tunnelled = True
380        argv = self._get_common_args(hardware, tunnelled)
381        return argv + ["-incoming", uri]
382
383    @staticmethod
384    def _get_common_wrapper(cpu_bind, mem_bind):
385        wrapper = []
386        if len(cpu_bind) > 0 or len(mem_bind) > 0:
387            wrapper.append("numactl")
388            if cpu_bind:
389                wrapper.append("--physcpubind=%s" % ",".join(cpu_bind))
390            if mem_bind:
391                wrapper.append("--membind=%s" % ",".join(mem_bind))
392
393        return wrapper
394
395    def _get_src_wrapper(self, hardware):
396        return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind)
397
398    def _get_dst_wrapper(self, hardware):
399        wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind)
400        if self._dst_host != "localhost":
401            return ["ssh",
402                    "-R", "9001:localhost:9001",
403                    self._dst_host] + wrapper
404        else:
405            return wrapper
406
407    def _get_timings(self, vm):
408        log = vm.get_log()
409        if not log:
410            return []
411        if self._debug:
412            print(log)
413
414        regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms"
415        matcher = re.compile(regex)
416        records = []
417        for line in log.split("\n"):
418            match = matcher.match(line)
419            if match:
420                records.append(TimingRecord(int(match.group(1)),
421                                            int(match.group(2)) / 1000.0,
422                                            int(match.group(3))))
423        return records
424
425    def run(self, hardware, scenario, result_dir=os.getcwd()):
426        abs_result_dir = os.path.join(result_dir, scenario._name)
427
428        if self._transport == "tcp":
429            uri = "tcp:%s:9000" % self._dst_host
430        elif self._transport == "rdma":
431            uri = "rdma:%s:9000" % self._dst_host
432        elif self._transport == "unix":
433            if self._dst_host != "localhost":
434                raise Exception("Running use unix migration transport for non-local host")
435            uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid()
436            try:
437                os.remove(uri[5:])
438                os.remove(monaddr)
439            except:
440                pass
441
442        if self._dst_host != "localhost":
443            dstmonaddr = ("localhost", 9001)
444        else:
445            dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid()
446        srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid()
447
448        src = QEMUMachine(self._binary,
449                          args=self._get_src_args(hardware),
450                          wrapper=self._get_src_wrapper(hardware),
451                          name="qemu-src-%d" % os.getpid(),
452                          monitor_address=srcmonaddr)
453
454        dst = QEMUMachine(self._binary,
455                          args=self._get_dst_args(hardware, uri),
456                          wrapper=self._get_dst_wrapper(hardware),
457                          name="qemu-dst-%d" % os.getpid(),
458                          monitor_address=dstmonaddr)
459
460        try:
461            src.launch()
462            dst.launch()
463
464            ret = self._migrate(hardware, scenario, src, dst, uri)
465            progress_history = ret[0]
466            qemu_timings = ret[1]
467            vcpu_timings = ret[2]
468            if uri[0:5] == "unix:" and os.path.exists(uri[5:]):
469                os.remove(uri[5:])
470
471            if os.path.exists(srcmonaddr):
472                os.remove(srcmonaddr)
473
474            if self._dst_host == "localhost" and os.path.exists(dstmonaddr):
475                os.remove(dstmonaddr)
476
477            if self._verbose:
478                print("Finished migration")
479
480            src.shutdown()
481            dst.shutdown()
482
483            return Report(hardware, scenario, progress_history,
484                          Timings(self._get_timings(src) + self._get_timings(dst)),
485                          Timings(qemu_timings),
486                          Timings(vcpu_timings),
487                          self._binary, self._dst_host, self._kernel,
488                          self._initrd, self._transport, self._sleep)
489        except Exception as e:
490            if self._debug:
491                print("Failed: %s" % str(e))
492            try:
493                src.shutdown()
494            except:
495                pass
496            try:
497                dst.shutdown()
498            except:
499                pass
500
501            if self._debug:
502                print(src.get_log())
503                print(dst.get_log())
504            raise
505
506