1#
2# Migration test main engine
3#
4# Copyright (c) 2016 Red Hat, Inc.
5#
6# This library is free software; you can redistribute it and/or
7# modify it under the terms of the GNU Lesser General Public
8# License as published by the Free Software Foundation; either
9# version 2.1 of the License, or (at your option) any later version.
10#
11# This library is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14# Lesser General Public License for more details.
15#
16# You should have received a copy of the GNU Lesser General Public
17# License along with this library; if not, see <http://www.gnu.org/licenses/>.
18#
19
20
21import os
22import re
23import sys
24import time
25
26from guestperf.progress import Progress, ProgressStats
27from guestperf.report import Report
28from guestperf.timings import TimingRecord, Timings
29
30sys.path.append(os.path.join(os.path.dirname(__file__),
31                             '..', '..', '..', 'python'))
32from qemu.machine import QEMUMachine
33
34
35class Engine(object):
36
37    def __init__(self, binary, dst_host, kernel, initrd, transport="tcp",
38                 sleep=15, verbose=False, debug=False):
39
40        self._binary = binary # Path to QEMU binary
41        self._dst_host = dst_host # Hostname of target host
42        self._kernel = kernel # Path to kernel image
43        self._initrd = initrd # Path to stress initrd
44        self._transport = transport # 'unix' or 'tcp' or 'rdma'
45        self._sleep = sleep
46        self._verbose = verbose
47        self._debug = debug
48
49        if debug:
50            self._verbose = debug
51
52    def _vcpu_timing(self, pid, tid_list):
53        records = []
54        now = time.time()
55
56        jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
57        for tid in tid_list:
58            statfile = "/proc/%d/task/%d/stat" % (pid, tid)
59            with open(statfile, "r") as fh:
60                stat = fh.readline()
61                fields = stat.split(" ")
62                stime = int(fields[13])
63                utime = int(fields[14])
64                records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec))
65        return records
66
67    def _cpu_timing(self, pid):
68        records = []
69        now = time.time()
70
71        jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
72        statfile = "/proc/%d/stat" % pid
73        with open(statfile, "r") as fh:
74            stat = fh.readline()
75            fields = stat.split(" ")
76            stime = int(fields[13])
77            utime = int(fields[14])
78            return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec)
79
80    def _migrate_progress(self, vm):
81        info = vm.command("query-migrate")
82
83        if "ram" not in info:
84            info["ram"] = {}
85
86        return Progress(
87            info.get("status", "active"),
88            ProgressStats(
89                info["ram"].get("transferred", 0),
90                info["ram"].get("remaining", 0),
91                info["ram"].get("total", 0),
92                info["ram"].get("duplicate", 0),
93                info["ram"].get("skipped", 0),
94                info["ram"].get("normal", 0),
95                info["ram"].get("normal-bytes", 0),
96                info["ram"].get("dirty-pages-rate", 0),
97                info["ram"].get("mbps", 0),
98                info["ram"].get("dirty-sync-count", 0)
99            ),
100            time.time(),
101            info.get("total-time", 0),
102            info.get("downtime", 0),
103            info.get("expected-downtime", 0),
104            info.get("setup-time", 0),
105            info.get("x-cpu-throttle-percentage", 0),
106        )
107
108    def _migrate(self, hardware, scenario, src, dst, connect_uri):
109        src_qemu_time = []
110        src_vcpu_time = []
111        src_pid = src.get_pid()
112
113        vcpus = src.command("query-cpus-fast")
114        src_threads = []
115        for vcpu in vcpus:
116            src_threads.append(vcpu["thread_id"])
117
118        # XXX how to get dst timings on remote host ?
119
120        if self._verbose:
121            print("Sleeping %d seconds for initial guest workload run" % self._sleep)
122        sleep_secs = self._sleep
123        while sleep_secs > 1:
124            src_qemu_time.append(self._cpu_timing(src_pid))
125            src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
126            time.sleep(1)
127            sleep_secs -= 1
128
129        if self._verbose:
130            print("Starting migration")
131        if scenario._auto_converge:
132            resp = src.command("migrate-set-capabilities",
133                               capabilities = [
134                                   { "capability": "auto-converge",
135                                     "state": True }
136                               ])
137            resp = src.command("migrate-set-parameters",
138                               x_cpu_throttle_increment=scenario._auto_converge_step)
139
140        if scenario._post_copy:
141            resp = src.command("migrate-set-capabilities",
142                               capabilities = [
143                                   { "capability": "postcopy-ram",
144                                     "state": True }
145                               ])
146            resp = dst.command("migrate-set-capabilities",
147                               capabilities = [
148                                   { "capability": "postcopy-ram",
149                                     "state": True }
150                               ])
151
152        resp = src.command("migrate-set-parameters",
153                           max_bandwidth=scenario._bandwidth * 1024 * 1024)
154
155        resp = src.command("migrate-set-parameters",
156                           downtime_limit=scenario._downtime / 1024.0)
157
158        if scenario._compression_mt:
159            resp = src.command("migrate-set-capabilities",
160                               capabilities = [
161                                   { "capability": "compress",
162                                     "state": True }
163                               ])
164            resp = src.command("migrate-set-parameters",
165                               compress_threads=scenario._compression_mt_threads)
166            resp = dst.command("migrate-set-capabilities",
167                               capabilities = [
168                                   { "capability": "compress",
169                                     "state": True }
170                               ])
171            resp = dst.command("migrate-set-parameters",
172                               decompress_threads=scenario._compression_mt_threads)
173
174        if scenario._compression_xbzrle:
175            resp = src.command("migrate-set-capabilities",
176                               capabilities = [
177                                   { "capability": "xbzrle",
178                                     "state": True }
179                               ])
180            resp = dst.command("migrate-set-capabilities",
181                               capabilities = [
182                                   { "capability": "xbzrle",
183                                     "state": True }
184                               ])
185            resp = src.command("migrate-set-parameters",
186                               xbzrle_cache_size=(
187                                   hardware._mem *
188                                   1024 * 1024 * 1024 / 100 *
189                                   scenario._compression_xbzrle_cache))
190
191        resp = src.command("migrate", uri=connect_uri)
192
193        post_copy = False
194        paused = False
195
196        progress_history = []
197
198        start = time.time()
199        loop = 0
200        while True:
201            loop = loop + 1
202            time.sleep(0.05)
203
204            progress = self._migrate_progress(src)
205            if (loop % 20) == 0:
206                src_qemu_time.append(self._cpu_timing(src_pid))
207                src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
208
209            if (len(progress_history) == 0 or
210                (progress_history[-1]._ram._iterations <
211                 progress._ram._iterations)):
212                progress_history.append(progress)
213
214            if progress._status in ("completed", "failed", "cancelled"):
215                if progress._status == "completed" and paused:
216                    dst.command("cont")
217                if progress_history[-1] != progress:
218                    progress_history.append(progress)
219
220                if progress._status == "completed":
221                    if self._verbose:
222                        print("Sleeping %d seconds for final guest workload run" % self._sleep)
223                    sleep_secs = self._sleep
224                    while sleep_secs > 1:
225                        time.sleep(1)
226                        src_qemu_time.append(self._cpu_timing(src_pid))
227                        src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
228                        sleep_secs -= 1
229
230                return [progress_history, src_qemu_time, src_vcpu_time]
231
232            if self._verbose and (loop % 20) == 0:
233                print("Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % (
234                    progress._ram._iterations,
235                    progress._ram._remaining_bytes / (1024 * 1024),
236                    progress._ram._total_bytes / (1024 * 1024),
237                    progress._ram._transferred_bytes / (1024 * 1024),
238                    progress._ram._transfer_rate_mbs,
239                ))
240
241            if progress._ram._iterations > scenario._max_iters:
242                if self._verbose:
243                    print("No completion after %d iterations over RAM" % scenario._max_iters)
244                src.command("migrate_cancel")
245                continue
246
247            if time.time() > (start + scenario._max_time):
248                if self._verbose:
249                    print("No completion after %d seconds" % scenario._max_time)
250                src.command("migrate_cancel")
251                continue
252
253            if (scenario._post_copy and
254                progress._ram._iterations >= scenario._post_copy_iters and
255                not post_copy):
256                if self._verbose:
257                    print("Switching to post-copy after %d iterations" % scenario._post_copy_iters)
258                resp = src.command("migrate-start-postcopy")
259                post_copy = True
260
261            if (scenario._pause and
262                progress._ram._iterations >= scenario._pause_iters and
263                not paused):
264                if self._verbose:
265                    print("Pausing VM after %d iterations" % scenario._pause_iters)
266                resp = src.command("stop")
267                paused = True
268
269    def _get_common_args(self, hardware, tunnelled=False):
270        args = [
271            "noapic",
272            "edd=off",
273            "printk.time=1",
274            "noreplace-smp",
275            "cgroup_disable=memory",
276            "pci=noearly",
277            "console=ttyS0",
278        ]
279        if self._debug:
280            args.append("debug")
281        else:
282            args.append("quiet")
283
284        args.append("ramsize=%s" % hardware._mem)
285
286        cmdline = " ".join(args)
287        if tunnelled:
288            cmdline = "'" + cmdline + "'"
289
290        argv = [
291            "-accel", "kvm",
292            "-cpu", "host",
293            "-kernel", self._kernel,
294            "-initrd", self._initrd,
295            "-append", cmdline,
296            "-chardev", "stdio,id=cdev0",
297            "-device", "isa-serial,chardev=cdev0",
298            "-m", str((hardware._mem * 1024) + 512),
299            "-smp", str(hardware._cpus),
300        ]
301
302        if self._debug:
303            argv.extend(["-device", "sga"])
304
305        if hardware._prealloc_pages:
306            argv_source += ["-mem-path", "/dev/shm",
307                            "-mem-prealloc"]
308        if hardware._locked_pages:
309            argv_source += ["-overcommit", "mem-lock=on"]
310        if hardware._huge_pages:
311            pass
312
313        return argv
314
315    def _get_src_args(self, hardware):
316        return self._get_common_args(hardware)
317
318    def _get_dst_args(self, hardware, uri):
319        tunnelled = False
320        if self._dst_host != "localhost":
321            tunnelled = True
322        argv = self._get_common_args(hardware, tunnelled)
323        return argv + ["-incoming", uri]
324
325    @staticmethod
326    def _get_common_wrapper(cpu_bind, mem_bind):
327        wrapper = []
328        if len(cpu_bind) > 0 or len(mem_bind) > 0:
329            wrapper.append("numactl")
330            if cpu_bind:
331                wrapper.append("--physcpubind=%s" % ",".join(cpu_bind))
332            if mem_bind:
333                wrapper.append("--membind=%s" % ",".join(mem_bind))
334
335        return wrapper
336
337    def _get_src_wrapper(self, hardware):
338        return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind)
339
340    def _get_dst_wrapper(self, hardware):
341        wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind)
342        if self._dst_host != "localhost":
343            return ["ssh",
344                    "-R", "9001:localhost:9001",
345                    self._dst_host] + wrapper
346        else:
347            return wrapper
348
349    def _get_timings(self, vm):
350        log = vm.get_log()
351        if not log:
352            return []
353        if self._debug:
354            print(log)
355
356        regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms"
357        matcher = re.compile(regex)
358        records = []
359        for line in log.split("\n"):
360            match = matcher.match(line)
361            if match:
362                records.append(TimingRecord(int(match.group(1)),
363                                            int(match.group(2)) / 1000.0,
364                                            int(match.group(3))))
365        return records
366
367    def run(self, hardware, scenario, result_dir=os.getcwd()):
368        abs_result_dir = os.path.join(result_dir, scenario._name)
369
370        if self._transport == "tcp":
371            uri = "tcp:%s:9000" % self._dst_host
372        elif self._transport == "rdma":
373            uri = "rdma:%s:9000" % self._dst_host
374        elif self._transport == "unix":
375            if self._dst_host != "localhost":
376                raise Exception("Running use unix migration transport for non-local host")
377            uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid()
378            try:
379                os.remove(uri[5:])
380                os.remove(monaddr)
381            except:
382                pass
383
384        if self._dst_host != "localhost":
385            dstmonaddr = ("localhost", 9001)
386        else:
387            dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid()
388        srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid()
389
390        src = QEMUMachine(self._binary,
391                          args=self._get_src_args(hardware),
392                          wrapper=self._get_src_wrapper(hardware),
393                          name="qemu-src-%d" % os.getpid(),
394                          monitor_address=srcmonaddr)
395
396        dst = QEMUMachine(self._binary,
397                          args=self._get_dst_args(hardware, uri),
398                          wrapper=self._get_dst_wrapper(hardware),
399                          name="qemu-dst-%d" % os.getpid(),
400                          monitor_address=dstmonaddr)
401
402        try:
403            src.launch()
404            dst.launch()
405
406            ret = self._migrate(hardware, scenario, src, dst, uri)
407            progress_history = ret[0]
408            qemu_timings = ret[1]
409            vcpu_timings = ret[2]
410            if uri[0:5] == "unix:":
411                os.remove(uri[5:])
412
413            if os.path.exists(srcmonaddr):
414                os.remove(srcmonaddr)
415
416            if self._dst_host == "localhost" and os.path.exists(dstmonaddr):
417                os.remove(dstmonaddr)
418
419            if self._verbose:
420                print("Finished migration")
421
422            src.shutdown()
423            dst.shutdown()
424
425            return Report(hardware, scenario, progress_history,
426                          Timings(self._get_timings(src) + self._get_timings(dst)),
427                          Timings(qemu_timings),
428                          Timings(vcpu_timings),
429                          self._binary, self._dst_host, self._kernel,
430                          self._initrd, self._transport, self._sleep)
431        except Exception as e:
432            if self._debug:
433                print("Failed: %s" % str(e))
434            try:
435                src.shutdown()
436            except:
437                pass
438            try:
439                dst.shutdown()
440            except:
441                pass
442
443            if self._debug:
444                print(src.get_log())
445                print(dst.get_log())
446            raise
447
448