1#
2# Migration test main engine
3#
4# Copyright (c) 2016 Red Hat, Inc.
5#
6# This library is free software; you can redistribute it and/or
7# modify it under the terms of the GNU Lesser General Public
8# License as published by the Free Software Foundation; either
9# version 2 of the License, or (at your option) any later version.
10#
11# This library is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14# Lesser General Public License for more details.
15#
16# You should have received a copy of the GNU Lesser General Public
17# License along with this library; if not, see <http://www.gnu.org/licenses/>.
18#
19
20
21import os
22import re
23import sys
24import time
25
26sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', '..', 'scripts'))
27import qemu
28import qmp.qmp
29from guestperf.progress import Progress, ProgressStats
30from guestperf.report import Report
31from guestperf.timings import TimingRecord, Timings
32
33
34class Engine(object):
35
36    def __init__(self, binary, dst_host, kernel, initrd, transport="tcp",
37                 sleep=15, verbose=False, debug=False):
38
39        self._binary = binary # Path to QEMU binary
40        self._dst_host = dst_host # Hostname of target host
41        self._kernel = kernel # Path to kernel image
42        self._initrd = initrd # Path to stress initrd
43        self._transport = transport # 'unix' or 'tcp' or 'rdma'
44        self._sleep = sleep
45        self._verbose = verbose
46        self._debug = debug
47
48        if debug:
49            self._verbose = debug
50
51    def _vcpu_timing(self, pid, tid_list):
52        records = []
53        now = time.time()
54
55        jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
56        for tid in tid_list:
57            statfile = "/proc/%d/task/%d/stat" % (pid, tid)
58            with open(statfile, "r") as fh:
59                stat = fh.readline()
60                fields = stat.split(" ")
61                stime = int(fields[13])
62                utime = int(fields[14])
63                records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec))
64        return records
65
66    def _cpu_timing(self, pid):
67        records = []
68        now = time.time()
69
70        jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
71        statfile = "/proc/%d/stat" % pid
72        with open(statfile, "r") as fh:
73            stat = fh.readline()
74            fields = stat.split(" ")
75            stime = int(fields[13])
76            utime = int(fields[14])
77            return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec)
78
79    def _migrate_progress(self, vm):
80        info = vm.command("query-migrate")
81
82        if "ram" not in info:
83            info["ram"] = {}
84
85        return Progress(
86            info.get("status", "active"),
87            ProgressStats(
88                info["ram"].get("transferred", 0),
89                info["ram"].get("remaining", 0),
90                info["ram"].get("total", 0),
91                info["ram"].get("duplicate", 0),
92                info["ram"].get("skipped", 0),
93                info["ram"].get("normal", 0),
94                info["ram"].get("normal-bytes", 0),
95                info["ram"].get("dirty-pages-rate", 0),
96                info["ram"].get("mbps", 0),
97                info["ram"].get("dirty-sync-count", 0)
98            ),
99            time.time(),
100            info.get("total-time", 0),
101            info.get("downtime", 0),
102            info.get("expected-downtime", 0),
103            info.get("setup-time", 0),
104            info.get("x-cpu-throttle-percentage", 0),
105        )
106
107    def _migrate(self, hardware, scenario, src, dst, connect_uri):
108        src_qemu_time = []
109        src_vcpu_time = []
110        src_pid = src.get_pid()
111
112        vcpus = src.command("query-cpus")
113        src_threads = []
114        for vcpu in vcpus:
115            src_threads.append(vcpu["thread_id"])
116
117        # XXX how to get dst timings on remote host ?
118
119        if self._verbose:
120            print "Sleeping %d seconds for initial guest workload run" % self._sleep
121        sleep_secs = self._sleep
122        while sleep_secs > 1:
123            src_qemu_time.append(self._cpu_timing(src_pid))
124            src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
125            time.sleep(1)
126            sleep_secs -= 1
127
128        if self._verbose:
129            print "Starting migration"
130        if scenario._auto_converge:
131            resp = src.command("migrate-set-capabilities",
132                               capabilities = [
133                                   { "capability": "auto-converge",
134                                     "state": True }
135                               ])
136            resp = src.command("migrate-set-parameters",
137                               x_cpu_throttle_increment=scenario._auto_converge_step)
138
139        if scenario._post_copy:
140            resp = src.command("migrate-set-capabilities",
141                               capabilities = [
142                                   { "capability": "postcopy-ram",
143                                     "state": True }
144                               ])
145            resp = dst.command("migrate-set-capabilities",
146                               capabilities = [
147                                   { "capability": "postcopy-ram",
148                                     "state": True }
149                               ])
150
151        resp = src.command("migrate_set_speed",
152                           value=scenario._bandwidth * 1024 * 1024)
153
154        resp = src.command("migrate_set_downtime",
155                           value=scenario._downtime / 1024.0)
156
157        if scenario._compression_mt:
158            resp = src.command("migrate-set-capabilities",
159                               capabilities = [
160                                   { "capability": "compress",
161                                     "state": True }
162                               ])
163            resp = src.command("migrate-set-parameters",
164                               compress_threads=scenario._compression_mt_threads)
165            resp = dst.command("migrate-set-capabilities",
166                               capabilities = [
167                                   { "capability": "compress",
168                                     "state": True }
169                               ])
170            resp = dst.command("migrate-set-parameters",
171                               decompress_threads=scenario._compression_mt_threads)
172
173        if scenario._compression_xbzrle:
174            resp = src.command("migrate-set-capabilities",
175                               capabilities = [
176                                   { "capability": "xbzrle",
177                                     "state": True }
178                               ])
179            resp = dst.command("migrate-set-capabilities",
180                               capabilities = [
181                                   { "capability": "xbzrle",
182                                     "state": True }
183                               ])
184            resp = src.command("migrate-set-cache-size",
185                               value=(hardware._mem * 1024 * 1024 * 1024 / 100 *
186                                      scenario._compression_xbzrle_cache))
187
188        resp = src.command("migrate", uri=connect_uri)
189
190        post_copy = False
191        paused = False
192
193        progress_history = []
194
195        start = time.time()
196        loop = 0
197        while True:
198            loop = loop + 1
199            time.sleep(0.05)
200
201            progress = self._migrate_progress(src)
202            if (loop % 20) == 0:
203                src_qemu_time.append(self._cpu_timing(src_pid))
204                src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
205
206            if (len(progress_history) == 0 or
207                (progress_history[-1]._ram._iterations <
208                 progress._ram._iterations)):
209                progress_history.append(progress)
210
211            if progress._status in ("completed", "failed", "cancelled"):
212                if progress._status == "completed" and paused:
213                    dst.command("cont")
214                if progress_history[-1] != progress:
215                    progress_history.append(progress)
216
217                if progress._status == "completed":
218                    if self._verbose:
219                        print "Sleeping %d seconds for final guest workload run" % self._sleep
220                    sleep_secs = self._sleep
221                    while sleep_secs > 1:
222                        time.sleep(1)
223                        src_qemu_time.append(self._cpu_timing(src_pid))
224                        src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
225                        sleep_secs -= 1
226
227                return [progress_history, src_qemu_time, src_vcpu_time]
228
229            if self._verbose and (loop % 20) == 0:
230                print "Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % (
231                    progress._ram._iterations,
232                    progress._ram._remaining_bytes / (1024 * 1024),
233                    progress._ram._total_bytes / (1024 * 1024),
234                    progress._ram._transferred_bytes / (1024 * 1024),
235                    progress._ram._transfer_rate_mbs,
236                )
237
238            if progress._ram._iterations > scenario._max_iters:
239                if self._verbose:
240                    print "No completion after %d iterations over RAM" % scenario._max_iters
241                src.command("migrate_cancel")
242                continue
243
244            if time.time() > (start + scenario._max_time):
245                if self._verbose:
246                    print "No completion after %d seconds" % scenario._max_time
247                src.command("migrate_cancel")
248                continue
249
250            if (scenario._post_copy and
251                progress._ram._iterations >= scenario._post_copy_iters and
252                not post_copy):
253                if self._verbose:
254                    print "Switching to post-copy after %d iterations" % scenario._post_copy_iters
255                resp = src.command("migrate-start-postcopy")
256                post_copy = True
257
258            if (scenario._pause and
259                progress._ram._iterations >= scenario._pause_iters and
260                not paused):
261                if self._verbose:
262                    print "Pausing VM after %d iterations" % scenario._pause_iters
263                resp = src.command("stop")
264                paused = True
265
266    def _get_common_args(self, hardware, tunnelled=False):
267        args = [
268            "noapic",
269            "edd=off",
270            "printk.time=1",
271            "noreplace-smp",
272            "cgroup_disable=memory",
273            "pci=noearly",
274            "console=ttyS0",
275        ]
276        if self._debug:
277            args.append("debug")
278        else:
279            args.append("quiet")
280
281        args.append("ramsize=%s" % hardware._mem)
282
283        cmdline = " ".join(args)
284        if tunnelled:
285            cmdline = "'" + cmdline + "'"
286
287        argv = [
288            "-machine", "accel=kvm",
289            "-cpu", "host",
290            "-kernel", self._kernel,
291            "-initrd", self._initrd,
292            "-append", cmdline,
293            "-chardev", "stdio,id=cdev0",
294            "-device", "isa-serial,chardev=cdev0",
295            "-m", str((hardware._mem * 1024) + 512),
296            "-smp", str(hardware._cpus),
297        ]
298
299        if self._debug:
300            argv.extend(["-device", "sga"])
301
302        if hardware._prealloc_pages:
303            argv_source += ["-mem-path", "/dev/shm",
304                            "-mem-prealloc"]
305        if hardware._locked_pages:
306            argv_source += ["-realtime", "mlock=on"]
307        if hardware._huge_pages:
308            pass
309
310        return argv
311
312    def _get_src_args(self, hardware):
313        return self._get_common_args(hardware)
314
315    def _get_dst_args(self, hardware, uri):
316        tunnelled = False
317        if self._dst_host != "localhost":
318            tunnelled = True
319        argv = self._get_common_args(hardware, tunnelled)
320        return argv + ["-incoming", uri]
321
322    @staticmethod
323    def _get_common_wrapper(cpu_bind, mem_bind):
324        wrapper = []
325        if len(cpu_bind) > 0 or len(mem_bind) > 0:
326            wrapper.append("numactl")
327            if cpu_bind:
328                wrapper.append("--physcpubind=%s" % ",".join(cpu_bind))
329            if mem_bind:
330                wrapper.append("--membind=%s" % ",".join(mem_bind))
331
332        return wrapper
333
334    def _get_src_wrapper(self, hardware):
335        return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind)
336
337    def _get_dst_wrapper(self, hardware):
338        wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind)
339        if self._dst_host != "localhost":
340            return ["ssh",
341                    "-R", "9001:localhost:9001",
342                    self._dst_host] + wrapper
343        else:
344            return wrapper
345
346    def _get_timings(self, vm):
347        log = vm.get_log()
348        if not log:
349            return []
350        if self._debug:
351            print log
352
353        regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms"
354        matcher = re.compile(regex)
355        records = []
356        for line in log.split("\n"):
357            match = matcher.match(line)
358            if match:
359                records.append(TimingRecord(int(match.group(1)),
360                                            int(match.group(2)) / 1000.0,
361                                            int(match.group(3))))
362        return records
363
364    def run(self, hardware, scenario, result_dir=os.getcwd()):
365        abs_result_dir = os.path.join(result_dir, scenario._name)
366
367        if self._transport == "tcp":
368            uri = "tcp:%s:9000" % self._dst_host
369        elif self._transport == "rdma":
370            uri = "rdma:%s:9000" % self._dst_host
371        elif self._transport == "unix":
372            if self._dst_host != "localhost":
373                raise Exception("Running use unix migration transport for non-local host")
374            uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid()
375            try:
376                os.remove(uri[5:])
377                os.remove(monaddr)
378            except:
379                pass
380
381        if self._dst_host != "localhost":
382            dstmonaddr = ("localhost", 9001)
383        else:
384            dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid()
385        srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid()
386
387        src = qemu.QEMUMachine(self._binary,
388                               args=self._get_src_args(hardware),
389                               wrapper=self._get_src_wrapper(hardware),
390                               name="qemu-src-%d" % os.getpid(),
391                               monitor_address=srcmonaddr,
392                               debug=self._debug)
393
394        dst = qemu.QEMUMachine(self._binary,
395                               args=self._get_dst_args(hardware, uri),
396                               wrapper=self._get_dst_wrapper(hardware),
397                               name="qemu-dst-%d" % os.getpid(),
398                               monitor_address=dstmonaddr,
399                               debug=self._debug)
400
401        try:
402            src.launch()
403            dst.launch()
404
405            ret = self._migrate(hardware, scenario, src, dst, uri)
406            progress_history = ret[0]
407            qemu_timings = ret[1]
408            vcpu_timings = ret[2]
409            if uri[0:5] == "unix:":
410                os.remove(uri[5:])
411            if self._verbose:
412                print "Finished migration"
413
414            src.shutdown()
415            dst.shutdown()
416
417            return Report(hardware, scenario, progress_history,
418                          Timings(self._get_timings(src) + self._get_timings(dst)),
419                          Timings(qemu_timings),
420                          Timings(vcpu_timings),
421                          self._binary, self._dst_host, self._kernel,
422                          self._initrd, self._transport, self._sleep)
423        except Exception as e:
424            if self._debug:
425                print "Failed: %s" % str(e)
426            try:
427                src.shutdown()
428            except:
429                pass
430            try:
431                dst.shutdown()
432            except:
433                pass
434
435            if self._debug:
436                print src.get_log()
437                print dst.get_log()
438            raise
439
440