1from __future__ import print_function
2#
3# Migration test main engine
4#
5# Copyright (c) 2016 Red Hat, Inc.
6#
7# This library is free software; you can redistribute it and/or
8# modify it under the terms of the GNU Lesser General Public
9# License as published by the Free Software Foundation; either
10# version 2 of the License, or (at your option) any later version.
11#
12# This library is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15# Lesser General Public License for more details.
16#
17# You should have received a copy of the GNU Lesser General Public
18# License along with this library; if not, see <http://www.gnu.org/licenses/>.
19#
20
21
22import os
23import re
24import sys
25import time
26
27from guestperf.progress import Progress, ProgressStats
28from guestperf.report import Report
29from guestperf.timings import TimingRecord, Timings
30
31sys.path.append(os.path.join(os.path.dirname(__file__),
32                             '..', '..', '..', 'python'))
33from qemu.machine import QEMUMachine
34
35
36class Engine(object):
37
38    def __init__(self, binary, dst_host, kernel, initrd, transport="tcp",
39                 sleep=15, verbose=False, debug=False):
40
41        self._binary = binary # Path to QEMU binary
42        self._dst_host = dst_host # Hostname of target host
43        self._kernel = kernel # Path to kernel image
44        self._initrd = initrd # Path to stress initrd
45        self._transport = transport # 'unix' or 'tcp' or 'rdma'
46        self._sleep = sleep
47        self._verbose = verbose
48        self._debug = debug
49
50        if debug:
51            self._verbose = debug
52
53    def _vcpu_timing(self, pid, tid_list):
54        records = []
55        now = time.time()
56
57        jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
58        for tid in tid_list:
59            statfile = "/proc/%d/task/%d/stat" % (pid, tid)
60            with open(statfile, "r") as fh:
61                stat = fh.readline()
62                fields = stat.split(" ")
63                stime = int(fields[13])
64                utime = int(fields[14])
65                records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec))
66        return records
67
68    def _cpu_timing(self, pid):
69        records = []
70        now = time.time()
71
72        jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
73        statfile = "/proc/%d/stat" % pid
74        with open(statfile, "r") as fh:
75            stat = fh.readline()
76            fields = stat.split(" ")
77            stime = int(fields[13])
78            utime = int(fields[14])
79            return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec)
80
81    def _migrate_progress(self, vm):
82        info = vm.command("query-migrate")
83
84        if "ram" not in info:
85            info["ram"] = {}
86
87        return Progress(
88            info.get("status", "active"),
89            ProgressStats(
90                info["ram"].get("transferred", 0),
91                info["ram"].get("remaining", 0),
92                info["ram"].get("total", 0),
93                info["ram"].get("duplicate", 0),
94                info["ram"].get("skipped", 0),
95                info["ram"].get("normal", 0),
96                info["ram"].get("normal-bytes", 0),
97                info["ram"].get("dirty-pages-rate", 0),
98                info["ram"].get("mbps", 0),
99                info["ram"].get("dirty-sync-count", 0)
100            ),
101            time.time(),
102            info.get("total-time", 0),
103            info.get("downtime", 0),
104            info.get("expected-downtime", 0),
105            info.get("setup-time", 0),
106            info.get("x-cpu-throttle-percentage", 0),
107        )
108
109    def _migrate(self, hardware, scenario, src, dst, connect_uri):
110        src_qemu_time = []
111        src_vcpu_time = []
112        src_pid = src.get_pid()
113
114        vcpus = src.command("query-cpus")
115        src_threads = []
116        for vcpu in vcpus:
117            src_threads.append(vcpu["thread_id"])
118
119        # XXX how to get dst timings on remote host ?
120
121        if self._verbose:
122            print("Sleeping %d seconds for initial guest workload run" % self._sleep)
123        sleep_secs = self._sleep
124        while sleep_secs > 1:
125            src_qemu_time.append(self._cpu_timing(src_pid))
126            src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
127            time.sleep(1)
128            sleep_secs -= 1
129
130        if self._verbose:
131            print("Starting migration")
132        if scenario._auto_converge:
133            resp = src.command("migrate-set-capabilities",
134                               capabilities = [
135                                   { "capability": "auto-converge",
136                                     "state": True }
137                               ])
138            resp = src.command("migrate-set-parameters",
139                               x_cpu_throttle_increment=scenario._auto_converge_step)
140
141        if scenario._post_copy:
142            resp = src.command("migrate-set-capabilities",
143                               capabilities = [
144                                   { "capability": "postcopy-ram",
145                                     "state": True }
146                               ])
147            resp = dst.command("migrate-set-capabilities",
148                               capabilities = [
149                                   { "capability": "postcopy-ram",
150                                     "state": True }
151                               ])
152
153        resp = src.command("migrate_set_speed",
154                           value=scenario._bandwidth * 1024 * 1024)
155
156        resp = src.command("migrate_set_downtime",
157                           value=scenario._downtime / 1024.0)
158
159        if scenario._compression_mt:
160            resp = src.command("migrate-set-capabilities",
161                               capabilities = [
162                                   { "capability": "compress",
163                                     "state": True }
164                               ])
165            resp = src.command("migrate-set-parameters",
166                               compress_threads=scenario._compression_mt_threads)
167            resp = dst.command("migrate-set-capabilities",
168                               capabilities = [
169                                   { "capability": "compress",
170                                     "state": True }
171                               ])
172            resp = dst.command("migrate-set-parameters",
173                               decompress_threads=scenario._compression_mt_threads)
174
175        if scenario._compression_xbzrle:
176            resp = src.command("migrate-set-capabilities",
177                               capabilities = [
178                                   { "capability": "xbzrle",
179                                     "state": True }
180                               ])
181            resp = dst.command("migrate-set-capabilities",
182                               capabilities = [
183                                   { "capability": "xbzrle",
184                                     "state": True }
185                               ])
186            resp = src.command("migrate-set-cache-size",
187                               value=(hardware._mem * 1024 * 1024 * 1024 / 100 *
188                                      scenario._compression_xbzrle_cache))
189
190        resp = src.command("migrate", uri=connect_uri)
191
192        post_copy = False
193        paused = False
194
195        progress_history = []
196
197        start = time.time()
198        loop = 0
199        while True:
200            loop = loop + 1
201            time.sleep(0.05)
202
203            progress = self._migrate_progress(src)
204            if (loop % 20) == 0:
205                src_qemu_time.append(self._cpu_timing(src_pid))
206                src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
207
208            if (len(progress_history) == 0 or
209                (progress_history[-1]._ram._iterations <
210                 progress._ram._iterations)):
211                progress_history.append(progress)
212
213            if progress._status in ("completed", "failed", "cancelled"):
214                if progress._status == "completed" and paused:
215                    dst.command("cont")
216                if progress_history[-1] != progress:
217                    progress_history.append(progress)
218
219                if progress._status == "completed":
220                    if self._verbose:
221                        print("Sleeping %d seconds for final guest workload run" % self._sleep)
222                    sleep_secs = self._sleep
223                    while sleep_secs > 1:
224                        time.sleep(1)
225                        src_qemu_time.append(self._cpu_timing(src_pid))
226                        src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
227                        sleep_secs -= 1
228
229                return [progress_history, src_qemu_time, src_vcpu_time]
230
231            if self._verbose and (loop % 20) == 0:
232                print("Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % (
233                    progress._ram._iterations,
234                    progress._ram._remaining_bytes / (1024 * 1024),
235                    progress._ram._total_bytes / (1024 * 1024),
236                    progress._ram._transferred_bytes / (1024 * 1024),
237                    progress._ram._transfer_rate_mbs,
238                ))
239
240            if progress._ram._iterations > scenario._max_iters:
241                if self._verbose:
242                    print("No completion after %d iterations over RAM" % scenario._max_iters)
243                src.command("migrate_cancel")
244                continue
245
246            if time.time() > (start + scenario._max_time):
247                if self._verbose:
248                    print("No completion after %d seconds" % scenario._max_time)
249                src.command("migrate_cancel")
250                continue
251
252            if (scenario._post_copy and
253                progress._ram._iterations >= scenario._post_copy_iters and
254                not post_copy):
255                if self._verbose:
256                    print("Switching to post-copy after %d iterations" % scenario._post_copy_iters)
257                resp = src.command("migrate-start-postcopy")
258                post_copy = True
259
260            if (scenario._pause and
261                progress._ram._iterations >= scenario._pause_iters and
262                not paused):
263                if self._verbose:
264                    print("Pausing VM after %d iterations" % scenario._pause_iters)
265                resp = src.command("stop")
266                paused = True
267
268    def _get_common_args(self, hardware, tunnelled=False):
269        args = [
270            "noapic",
271            "edd=off",
272            "printk.time=1",
273            "noreplace-smp",
274            "cgroup_disable=memory",
275            "pci=noearly",
276            "console=ttyS0",
277        ]
278        if self._debug:
279            args.append("debug")
280        else:
281            args.append("quiet")
282
283        args.append("ramsize=%s" % hardware._mem)
284
285        cmdline = " ".join(args)
286        if tunnelled:
287            cmdline = "'" + cmdline + "'"
288
289        argv = [
290            "-accel", "kvm",
291            "-cpu", "host",
292            "-kernel", self._kernel,
293            "-initrd", self._initrd,
294            "-append", cmdline,
295            "-chardev", "stdio,id=cdev0",
296            "-device", "isa-serial,chardev=cdev0",
297            "-m", str((hardware._mem * 1024) + 512),
298            "-smp", str(hardware._cpus),
299        ]
300
301        if self._debug:
302            argv.extend(["-device", "sga"])
303
304        if hardware._prealloc_pages:
305            argv_source += ["-mem-path", "/dev/shm",
306                            "-mem-prealloc"]
307        if hardware._locked_pages:
308            argv_source += ["-realtime", "mlock=on"]
309        if hardware._huge_pages:
310            pass
311
312        return argv
313
314    def _get_src_args(self, hardware):
315        return self._get_common_args(hardware)
316
317    def _get_dst_args(self, hardware, uri):
318        tunnelled = False
319        if self._dst_host != "localhost":
320            tunnelled = True
321        argv = self._get_common_args(hardware, tunnelled)
322        return argv + ["-incoming", uri]
323
324    @staticmethod
325    def _get_common_wrapper(cpu_bind, mem_bind):
326        wrapper = []
327        if len(cpu_bind) > 0 or len(mem_bind) > 0:
328            wrapper.append("numactl")
329            if cpu_bind:
330                wrapper.append("--physcpubind=%s" % ",".join(cpu_bind))
331            if mem_bind:
332                wrapper.append("--membind=%s" % ",".join(mem_bind))
333
334        return wrapper
335
336    def _get_src_wrapper(self, hardware):
337        return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind)
338
339    def _get_dst_wrapper(self, hardware):
340        wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind)
341        if self._dst_host != "localhost":
342            return ["ssh",
343                    "-R", "9001:localhost:9001",
344                    self._dst_host] + wrapper
345        else:
346            return wrapper
347
348    def _get_timings(self, vm):
349        log = vm.get_log()
350        if not log:
351            return []
352        if self._debug:
353            print(log)
354
355        regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms"
356        matcher = re.compile(regex)
357        records = []
358        for line in log.split("\n"):
359            match = matcher.match(line)
360            if match:
361                records.append(TimingRecord(int(match.group(1)),
362                                            int(match.group(2)) / 1000.0,
363                                            int(match.group(3))))
364        return records
365
366    def run(self, hardware, scenario, result_dir=os.getcwd()):
367        abs_result_dir = os.path.join(result_dir, scenario._name)
368
369        if self._transport == "tcp":
370            uri = "tcp:%s:9000" % self._dst_host
371        elif self._transport == "rdma":
372            uri = "rdma:%s:9000" % self._dst_host
373        elif self._transport == "unix":
374            if self._dst_host != "localhost":
375                raise Exception("Running use unix migration transport for non-local host")
376            uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid()
377            try:
378                os.remove(uri[5:])
379                os.remove(monaddr)
380            except:
381                pass
382
383        if self._dst_host != "localhost":
384            dstmonaddr = ("localhost", 9001)
385        else:
386            dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid()
387        srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid()
388
389        src = QEMUMachine(self._binary,
390                          args=self._get_src_args(hardware),
391                          wrapper=self._get_src_wrapper(hardware),
392                          name="qemu-src-%d" % os.getpid(),
393                          monitor_address=srcmonaddr)
394
395        dst = QEMUMachine(self._binary,
396                          args=self._get_dst_args(hardware, uri),
397                          wrapper=self._get_dst_wrapper(hardware),
398                          name="qemu-dst-%d" % os.getpid(),
399                          monitor_address=dstmonaddr)
400
401        try:
402            src.launch()
403            dst.launch()
404
405            ret = self._migrate(hardware, scenario, src, dst, uri)
406            progress_history = ret[0]
407            qemu_timings = ret[1]
408            vcpu_timings = ret[2]
409            if uri[0:5] == "unix:":
410                os.remove(uri[5:])
411            if self._verbose:
412                print("Finished migration")
413
414            src.shutdown()
415            dst.shutdown()
416
417            return Report(hardware, scenario, progress_history,
418                          Timings(self._get_timings(src) + self._get_timings(dst)),
419                          Timings(qemu_timings),
420                          Timings(vcpu_timings),
421                          self._binary, self._dst_host, self._kernel,
422                          self._initrd, self._transport, self._sleep)
423        except Exception as e:
424            if self._debug:
425                print("Failed: %s" % str(e))
426            try:
427                src.shutdown()
428            except:
429                pass
430            try:
431                dst.shutdown()
432            except:
433                pass
434
435            if self._debug:
436                print(src.get_log())
437                print(dst.get_log())
438            raise
439
440