1#
2# Copyright (C) 2013 Intel Corporation
3#
4# SPDX-License-Identifier: MIT
5#
6
7# This module provides a class for starting qemu images using runqemu.
8# It's used by testimage.bbclass.
9
10import subprocess
11import os
12import sys
13import time
14import signal
15import re
16import socket
17import select
18import errno
19import string
20import threading
21import codecs
22import logging
23from oeqa.utils.dump import HostDumper
24from collections import defaultdict
25
26# Get Unicode non printable control chars
27control_range = list(range(0,32))+list(range(127,160))
28control_chars = [chr(x) for x in control_range
29                if chr(x) not in string.printable]
30re_control_char = re.compile('[%s]' % re.escape("".join(control_chars)))
31
32class QemuRunner:
33
34    def __init__(self, machine, rootfs, display, tmpdir, deploy_dir_image, logfile, boottime, dump_dir, dump_host_cmds,
35                 use_kvm, logger, use_slirp=False, serial_ports=2, boot_patterns = defaultdict(str), use_ovmf=False, workdir=None, tmpfsdir=None):
36
37        # Popen object for runqemu
38        self.runqemu = None
39        self.runqemu_exited = False
40        # pid of the qemu process that runqemu will start
41        self.qemupid = None
42        # target ip - from the command line or runqemu output
43        self.ip = None
44        # host ip - where qemu is running
45        self.server_ip = None
46        # target ip netmask
47        self.netmask = None
48
49        self.machine = machine
50        self.rootfs = rootfs
51        self.display = display
52        self.tmpdir = tmpdir
53        self.deploy_dir_image = deploy_dir_image
54        self.logfile = logfile
55        self.boottime = boottime
56        self.logged = False
57        self.thread = None
58        self.use_kvm = use_kvm
59        self.use_ovmf = use_ovmf
60        self.use_slirp = use_slirp
61        self.serial_ports = serial_ports
62        self.msg = ''
63        self.boot_patterns = boot_patterns
64        self.tmpfsdir = tmpfsdir
65
66        self.runqemutime = 120
67        if not workdir:
68            workdir = os.getcwd()
69        self.qemu_pidfile = workdir + '/pidfile_' + str(os.getpid())
70        self.host_dumper = HostDumper(dump_host_cmds, dump_dir)
71        self.monitorpipe = None
72
73        self.logger = logger
74
75        # Enable testing other OS's
76        # Set commands for target communication, and default to Linux ALWAYS
77        # Other OS's or baremetal applications need to provide their
78        # own implementation passing it through QemuRunner's constructor
79        # or by passing them through TESTIMAGE_BOOT_PATTERNS[flag]
80        # provided variables, where <flag> is one of the mentioned below.
81        accepted_patterns = ['search_reached_prompt', 'send_login_user', 'search_login_succeeded', 'search_cmd_finished']
82        default_boot_patterns = defaultdict(str)
83        # Default to the usual paterns used to communicate with the target
84        default_boot_patterns['search_reached_prompt'] = b' login:'
85        default_boot_patterns['send_login_user'] = 'root\n'
86        default_boot_patterns['search_login_succeeded'] = r"root@[a-zA-Z0-9\-]+:~#"
87        default_boot_patterns['search_cmd_finished'] = r"[a-zA-Z0-9]+@[a-zA-Z0-9\-]+:~#"
88
89        # Only override patterns that were set e.g. login user TESTIMAGE_BOOT_PATTERNS[send_login_user] = "webserver\n"
90        for pattern in accepted_patterns:
91            if not self.boot_patterns[pattern]:
92                self.boot_patterns[pattern] = default_boot_patterns[pattern]
93
94    def create_socket(self):
95        try:
96            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
97            sock.setblocking(0)
98            sock.bind(("127.0.0.1",0))
99            sock.listen(2)
100            port = sock.getsockname()[1]
101            self.logger.debug("Created listening socket for qemu serial console on: 127.0.0.1:%s" % port)
102            return (sock, port)
103
104        except socket.error:
105            sock.close()
106            raise
107
108    def log(self, msg):
109        if self.logfile:
110            # It is needed to sanitize the data received from qemu
111            # because is possible to have control characters
112            msg = msg.decode("utf-8", errors='ignore')
113            msg = re_control_char.sub('', msg)
114            self.msg += msg
115            with codecs.open(self.logfile, "a", encoding="utf-8") as f:
116                f.write("%s" % msg)
117
118    def getOutput(self, o):
119        import fcntl
120        fl = fcntl.fcntl(o, fcntl.F_GETFL)
121        fcntl.fcntl(o, fcntl.F_SETFL, fl | os.O_NONBLOCK)
122        return os.read(o.fileno(), 1000000).decode("utf-8")
123
124
125    def handleSIGCHLD(self, signum, frame):
126        if self.runqemu and self.runqemu.poll():
127            if self.runqemu.returncode:
128                self.logger.error('runqemu exited with code %d' % self.runqemu.returncode)
129                self.logger.error('Output from runqemu:\n%s' % self.getOutput(self.runqemu.stdout))
130                self.stop()
131                self._dump_host()
132
133    def start(self, qemuparams = None, get_ip = True, extra_bootparams = None, runqemuparams='', launch_cmd=None, discard_writes=True):
134        env = os.environ.copy()
135        if self.display:
136            env["DISPLAY"] = self.display
137            # Set this flag so that Qemu doesn't do any grabs as SDL grabs
138            # interact badly with screensavers.
139            env["QEMU_DONT_GRAB"] = "1"
140        if not os.path.exists(self.rootfs):
141            self.logger.error("Invalid rootfs %s" % self.rootfs)
142            return False
143        if not os.path.exists(self.tmpdir):
144            self.logger.error("Invalid TMPDIR path %s" % self.tmpdir)
145            return False
146        else:
147            env["OE_TMPDIR"] = self.tmpdir
148        if not os.path.exists(self.deploy_dir_image):
149            self.logger.error("Invalid DEPLOY_DIR_IMAGE path %s" % self.deploy_dir_image)
150            return False
151        else:
152            env["DEPLOY_DIR_IMAGE"] = self.deploy_dir_image
153
154        if self.tmpfsdir:
155            env["RUNQEMU_TMPFS_DIR"] = self.tmpfsdir
156
157        if not launch_cmd:
158            launch_cmd = 'runqemu %s' % ('snapshot' if discard_writes else '')
159            if self.use_kvm:
160                self.logger.debug('Using kvm for runqemu')
161                launch_cmd += ' kvm'
162            else:
163                self.logger.debug('Not using kvm for runqemu')
164            if not self.display:
165                launch_cmd += ' nographic'
166            if self.use_slirp:
167                launch_cmd += ' slirp'
168            if self.use_ovmf:
169                launch_cmd += ' ovmf'
170            launch_cmd += ' %s %s %s' % (runqemuparams, self.machine, self.rootfs)
171
172        return self.launch(launch_cmd, qemuparams=qemuparams, get_ip=get_ip, extra_bootparams=extra_bootparams, env=env)
173
174    def launch(self, launch_cmd, get_ip = True, qemuparams = None, extra_bootparams = None, env = None):
175        try:
176            if self.serial_ports >= 2:
177                self.threadsock, threadport = self.create_socket()
178            self.server_socket, self.serverport = self.create_socket()
179        except socket.error as msg:
180            self.logger.error("Failed to create listening socket: %s" % msg[1])
181            return False
182
183        bootparams = ' printk.time=1'
184        if extra_bootparams:
185            bootparams = bootparams + ' ' + extra_bootparams
186
187        # Ask QEMU to store the QEMU process PID in file, this way we don't have to parse running processes
188        # and analyze descendents in order to determine it.
189        if os.path.exists(self.qemu_pidfile):
190            os.remove(self.qemu_pidfile)
191        self.qemuparams = 'bootparams="{0}" qemuparams="-pidfile {1}"'.format(bootparams, self.qemu_pidfile)
192        if qemuparams:
193            self.qemuparams = self.qemuparams[:-1] + " " + qemuparams + " " + '\"'
194
195        if self.serial_ports >= 2:
196            launch_cmd += ' tcpserial=%s:%s %s' % (threadport, self.serverport, self.qemuparams)
197        else:
198            launch_cmd += ' tcpserial=%s %s' % (self.serverport, self.qemuparams)
199
200        self.origchldhandler = signal.getsignal(signal.SIGCHLD)
201        signal.signal(signal.SIGCHLD, self.handleSIGCHLD)
202
203        self.logger.debug('launchcmd=%s'%(launch_cmd))
204
205        # FIXME: We pass in stdin=subprocess.PIPE here to work around stty
206        # blocking at the end of the runqemu script when using this within
207        # oe-selftest (this makes stty error out immediately). There ought
208        # to be a proper fix but this will suffice for now.
209        self.runqemu = subprocess.Popen(launch_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, preexec_fn=os.setpgrp, env=env)
210        output = self.runqemu.stdout
211
212        #
213        # We need the preexec_fn above so that all runqemu processes can easily be killed
214        # (by killing their process group). This presents a problem if this controlling
215        # process itself is killed however since those processes don't notice the death
216        # of the parent and merrily continue on.
217        #
218        # Rather than hack runqemu to deal with this, we add something here instead.
219        # Basically we fork off another process which holds an open pipe to the parent
220        # and also is setpgrp. If/when the pipe sees EOF from the parent dieing, it kills
221        # the process group. This is like pctrl's PDEATHSIG but for a process group
222        # rather than a single process.
223        #
224        r, w = os.pipe()
225        self.monitorpid = os.fork()
226        if self.monitorpid:
227            os.close(r)
228            self.monitorpipe = os.fdopen(w, "w")
229        else:
230            # child process
231            os.setpgrp()
232            os.close(w)
233            r = os.fdopen(r)
234            x = r.read()
235            os.killpg(os.getpgid(self.runqemu.pid), signal.SIGTERM)
236            sys.exit(0)
237
238        self.logger.debug("runqemu started, pid is %s" % self.runqemu.pid)
239        self.logger.debug("waiting at most %s seconds for qemu pid (%s)" %
240                          (self.runqemutime, time.strftime("%D %H:%M:%S")))
241        endtime = time.time() + self.runqemutime
242        while not self.is_alive() and time.time() < endtime:
243            if self.runqemu.poll():
244                if self.runqemu_exited:
245                    return False
246                if self.runqemu.returncode:
247                    # No point waiting any longer
248                    self.logger.warning('runqemu exited with code %d' % self.runqemu.returncode)
249                    self._dump_host()
250                    self.logger.warning("Output from runqemu:\n%s" % self.getOutput(output))
251                    self.stop()
252                    return False
253            time.sleep(0.5)
254
255        if self.runqemu_exited:
256            return False
257
258        if not self.is_alive():
259            self.logger.error("Qemu pid didn't appear in %s seconds (%s)" %
260                              (self.runqemutime, time.strftime("%D %H:%M:%S")))
261
262            qemu_pid = None
263            if os.path.isfile(self.qemu_pidfile):
264                with open(self.qemu_pidfile, 'r') as f:
265                    qemu_pid = f.read().strip()
266
267            self.logger.error("Status information, poll status: %s, pidfile exists: %s, pidfile contents %s, proc pid exists %s"
268                % (self.runqemu.poll(), os.path.isfile(self.qemu_pidfile), str(qemu_pid), os.path.exists("/proc/" + str(qemu_pid))))
269
270            # Dump all processes to help us to figure out what is going on...
271            ps = subprocess.Popen(['ps', 'axww', '-o', 'pid,ppid,pri,ni,command '], stdout=subprocess.PIPE).communicate()[0]
272            processes = ps.decode("utf-8")
273            self.logger.debug("Running processes:\n%s" % processes)
274            self._dump_host()
275            op = self.getOutput(output)
276            self.stop()
277            if op:
278                self.logger.error("Output from runqemu:\n%s" % op)
279            else:
280                self.logger.error("No output from runqemu.\n")
281            return False
282
283        # We are alive: qemu is running
284        out = self.getOutput(output)
285        netconf = False # network configuration is not required by default
286        self.logger.debug("qemu started in %s seconds - qemu procces pid is %s (%s)" %
287                          (time.time() - (endtime - self.runqemutime),
288                           self.qemupid, time.strftime("%D %H:%M:%S")))
289        cmdline = ''
290        if get_ip:
291            with open('/proc/%s/cmdline' % self.qemupid) as p:
292                cmdline = p.read()
293                # It is needed to sanitize the data received
294                # because is possible to have control characters
295                cmdline = re_control_char.sub(' ', cmdline)
296            try:
297                if self.use_slirp:
298                    tcp_ports = cmdline.split("hostfwd=tcp::")[1]
299                    host_port = tcp_ports[:tcp_ports.find('-')]
300                    self.ip = "localhost:%s" % host_port
301                else:
302                    ips = re.findall(r"((?:[0-9]{1,3}\.){3}[0-9]{1,3})", cmdline.split("ip=")[1])
303                    self.ip = ips[0]
304                    self.server_ip = ips[1]
305                self.logger.debug("qemu cmdline used:\n{}".format(cmdline))
306            except (IndexError, ValueError):
307                # Try to get network configuration from runqemu output
308                match = re.match(r'.*Network configuration: (?:ip=)*([0-9.]+)::([0-9.]+):([0-9.]+)$.*',
309                                 out, re.MULTILINE|re.DOTALL)
310                if match:
311                    self.ip, self.server_ip, self.netmask = match.groups()
312                    # network configuration is required as we couldn't get it
313                    # from the runqemu command line, so qemu doesn't run kernel
314                    # and guest networking is not configured
315                    netconf = True
316                else:
317                    self.logger.error("Couldn't get ip from qemu command line and runqemu output! "
318                                 "Here is the qemu command line used:\n%s\n"
319                                 "and output from runqemu:\n%s" % (cmdline, out))
320                    self._dump_host()
321                    self.stop()
322                    return False
323
324        self.logger.debug("Target IP: %s" % self.ip)
325        self.logger.debug("Server IP: %s" % self.server_ip)
326
327        if self.serial_ports >= 2:
328            self.thread = LoggingThread(self.log, self.threadsock, self.logger)
329            self.thread.start()
330            if not self.thread.connection_established.wait(self.boottime):
331                self.logger.error("Didn't receive a console connection from qemu. "
332                             "Here is the qemu command line used:\n%s\nand "
333                             "output from runqemu:\n%s" % (cmdline, out))
334                self.stop_thread()
335                return False
336
337        self.logger.debug("Output from runqemu:\n%s", out)
338        self.logger.debug("Waiting at most %d seconds for login banner (%s)" %
339                          (self.boottime, time.strftime("%D %H:%M:%S")))
340        endtime = time.time() + self.boottime
341        socklist = [self.server_socket]
342        reachedlogin = False
343        stopread = False
344        qemusock = None
345        bootlog = b''
346        data = b''
347        while time.time() < endtime and not stopread:
348            try:
349                sread, swrite, serror = select.select(socklist, [], [], 5)
350            except InterruptedError:
351                continue
352            for sock in sread:
353                if sock is self.server_socket:
354                    qemusock, addr = self.server_socket.accept()
355                    qemusock.setblocking(0)
356                    socklist.append(qemusock)
357                    socklist.remove(self.server_socket)
358                    self.logger.debug("Connection from %s:%s" % addr)
359                else:
360                    data = data + sock.recv(1024)
361                    if data:
362                        bootlog += data
363                        if self.serial_ports < 2:
364                            # this socket has mixed console/kernel data, log it to logfile
365                            self.log(data)
366
367                        data = b''
368                        if self.boot_patterns['search_reached_prompt'] in bootlog:
369                            self.server_socket = qemusock
370                            stopread = True
371                            reachedlogin = True
372                            self.logger.debug("Reached login banner in %s seconds (%s)" %
373                                              (time.time() - (endtime - self.boottime),
374                                              time.strftime("%D %H:%M:%S")))
375                    else:
376                        # no need to check if reachedlogin unless we support multiple connections
377                        self.logger.debug("QEMU socket disconnected before login banner reached. (%s)" %
378                                          time.strftime("%D %H:%M:%S"))
379                        socklist.remove(sock)
380                        sock.close()
381                        stopread = True
382
383
384        if not reachedlogin:
385            if time.time() >= endtime:
386                self.logger.warning("Target didn't reach login banner in %d seconds (%s)" %
387                                  (self.boottime, time.strftime("%D %H:%M:%S")))
388            tail = lambda l: "\n".join(l.splitlines()[-25:])
389            bootlog = bootlog.decode("utf-8")
390            # in case bootlog is empty, use tail qemu log store at self.msg
391            lines = tail(bootlog if bootlog else self.msg)
392            self.logger.warning("Last 25 lines of text:\n%s" % lines)
393            self.logger.warning("Check full boot log: %s" % self.logfile)
394            self._dump_host()
395            self.stop()
396            return False
397
398        # If we are not able to login the tests can continue
399        try:
400            (status, output) = self.run_serial(self.boot_patterns['send_login_user'], raw=True, timeout=120)
401            if re.search(self.boot_patterns['search_login_succeeded'], output):
402                self.logged = True
403                self.logger.debug("Logged as root in serial console")
404                if netconf:
405                    # configure guest networking
406                    cmd = "ifconfig eth0 %s netmask %s up\n" % (self.ip, self.netmask)
407                    output = self.run_serial(cmd, raw=True)[1]
408                    if re.search(r"root@[a-zA-Z0-9\-]+:~#", output):
409                        self.logger.debug("configured ip address %s", self.ip)
410                    else:
411                        self.logger.debug("Couldn't configure guest networking")
412            else:
413                self.logger.warning("Couldn't login into serial console"
414                            " as root using blank password")
415                self.logger.warning("The output:\n%s" % output)
416        except:
417            self.logger.warning("Serial console failed while trying to login")
418        return True
419
420    def stop(self):
421        if hasattr(self, "origchldhandler"):
422            signal.signal(signal.SIGCHLD, self.origchldhandler)
423        self.stop_thread()
424        self.stop_qemu_system()
425        if self.runqemu:
426            if hasattr(self, "monitorpid"):
427                os.kill(self.monitorpid, signal.SIGKILL)
428                self.logger.debug("Sending SIGTERM to runqemu")
429                try:
430                    os.killpg(os.getpgid(self.runqemu.pid), signal.SIGTERM)
431                except OSError as e:
432                    if e.errno != errno.ESRCH:
433                        raise
434            endtime = time.time() + self.runqemutime
435            while self.runqemu.poll() is None and time.time() < endtime:
436                time.sleep(1)
437            if self.runqemu.poll() is None:
438                self.logger.debug("Sending SIGKILL to runqemu")
439                os.killpg(os.getpgid(self.runqemu.pid), signal.SIGKILL)
440            self.runqemu.stdin.close()
441            self.runqemu.stdout.close()
442            self.runqemu_exited = True
443
444        if hasattr(self, 'server_socket') and self.server_socket:
445            self.server_socket.close()
446            self.server_socket = None
447        if hasattr(self, 'threadsock') and self.threadsock:
448            self.threadsock.close()
449            self.threadsock = None
450        self.qemupid = None
451        self.ip = None
452        if os.path.exists(self.qemu_pidfile):
453            try:
454                os.remove(self.qemu_pidfile)
455            except FileNotFoundError as e:
456                # We raced, ignore
457                pass
458        if self.monitorpipe:
459            self.monitorpipe.close()
460
461    def stop_qemu_system(self):
462        if self.qemupid:
463            try:
464                # qemu-system behaves well and a SIGTERM is enough
465                os.kill(self.qemupid, signal.SIGTERM)
466            except ProcessLookupError as e:
467                self.logger.warning('qemu-system ended unexpectedly')
468
469    def stop_thread(self):
470        if self.thread and self.thread.is_alive():
471            self.thread.stop()
472            self.thread.join()
473
474    def restart(self, qemuparams = None):
475        self.logger.warning("Restarting qemu process")
476        if self.runqemu.poll() is None:
477            self.stop()
478        if self.start(qemuparams):
479            return True
480        return False
481
482    def is_alive(self):
483        if not self.runqemu or self.runqemu.poll() is not None or self.runqemu_exited:
484            return False
485        if os.path.isfile(self.qemu_pidfile):
486            # when handling pidfile, qemu creates the file, stat it, lock it and then write to it
487            # so it's possible that the file has been created but the content is empty
488            pidfile_timeout = time.time() + 3
489            while time.time() < pidfile_timeout:
490                with open(self.qemu_pidfile, 'r') as f:
491                    qemu_pid = f.read().strip()
492                # file created but not yet written contents
493                if not qemu_pid:
494                    time.sleep(0.5)
495                    continue
496                else:
497                    if os.path.exists("/proc/" + qemu_pid):
498                        self.qemupid = int(qemu_pid)
499                        return True
500        return False
501
502    def run_serial(self, command, raw=False, timeout=60):
503        # We assume target system have echo to get command status
504        if not raw:
505            command = "%s; echo $?\n" % command
506
507        data = ''
508        status = 0
509        self.server_socket.sendall(command.encode('utf-8'))
510        start = time.time()
511        end = start + timeout
512        while True:
513            now = time.time()
514            if now >= end:
515                data += "<<< run_serial(): command timed out after %d seconds without output >>>\r\n\r\n" % timeout
516                break
517            try:
518                sread, _, _ = select.select([self.server_socket],[],[], end - now)
519            except InterruptedError:
520                continue
521            if sread:
522                answer = self.server_socket.recv(1024)
523                if answer:
524                    data += answer.decode('utf-8')
525                    # Search the prompt to stop
526                    if re.search(self.boot_patterns['search_cmd_finished'], data):
527                        break
528                else:
529                    raise Exception("No data on serial console socket")
530
531        if data:
532            if raw:
533                status = 1
534            else:
535                # Remove first line (command line) and last line (prompt)
536                data = data[data.find('$?\r\n')+4:data.rfind('\r\n')]
537                index = data.rfind('\r\n')
538                if index == -1:
539                    status_cmd = data
540                    data = ""
541                else:
542                    status_cmd = data[index+2:]
543                    data = data[:index]
544                if (status_cmd == "0"):
545                    status = 1
546        return (status, str(data))
547
548
549    def _dump_host(self):
550        self.host_dumper.create_dir("qemu")
551        self.logger.warning("Qemu ended unexpectedly, dump data from host"
552                " is in %s" % self.host_dumper.dump_dir)
553        self.host_dumper.dump_host()
554
555# This class is for reading data from a socket and passing it to logfunc
556# to be processed. It's completely event driven and has a straightforward
557# event loop. The mechanism for stopping the thread is a simple pipe which
558# will wake up the poll and allow for tearing everything down.
559class LoggingThread(threading.Thread):
560    def __init__(self, logfunc, sock, logger):
561        self.connection_established = threading.Event()
562        self.serversock = sock
563        self.logfunc = logfunc
564        self.logger = logger
565        self.readsock = None
566        self.running = False
567
568        self.errorevents = select.POLLERR | select.POLLHUP | select.POLLNVAL
569        self.readevents = select.POLLIN | select.POLLPRI
570
571        threading.Thread.__init__(self, target=self.threadtarget)
572
573    def threadtarget(self):
574        try:
575            self.eventloop()
576        finally:
577            self.teardown()
578
579    def run(self):
580        self.logger.debug("Starting logging thread")
581        self.readpipe, self.writepipe = os.pipe()
582        threading.Thread.run(self)
583
584    def stop(self):
585        self.logger.debug("Stopping logging thread")
586        if self.running:
587            os.write(self.writepipe, bytes("stop", "utf-8"))
588
589    def teardown(self):
590        self.logger.debug("Tearing down logging thread")
591        self.close_socket(self.serversock)
592
593        if self.readsock is not None:
594            self.close_socket(self.readsock)
595
596        self.close_ignore_error(self.readpipe)
597        self.close_ignore_error(self.writepipe)
598        self.running = False
599
600    def eventloop(self):
601        poll = select.poll()
602        event_read_mask = self.errorevents | self.readevents
603        poll.register(self.serversock.fileno())
604        poll.register(self.readpipe, event_read_mask)
605
606        breakout = False
607        self.running = True
608        self.logger.debug("Starting thread event loop")
609        while not breakout:
610            events = poll.poll()
611            for event in events:
612                # An error occurred, bail out
613                if event[1] & self.errorevents:
614                    raise Exception(self.stringify_event(event[1]))
615
616                # Event to stop the thread
617                if self.readpipe == event[0]:
618                    self.logger.debug("Stop event received")
619                    breakout = True
620                    break
621
622                # A connection request was received
623                elif self.serversock.fileno() == event[0]:
624                    self.logger.debug("Connection request received")
625                    self.readsock, _ = self.serversock.accept()
626                    self.readsock.setblocking(0)
627                    poll.unregister(self.serversock.fileno())
628                    poll.register(self.readsock.fileno(), event_read_mask)
629
630                    self.logger.debug("Setting connection established event")
631                    self.connection_established.set()
632
633                # Actual data to be logged
634                elif self.readsock.fileno() == event[0]:
635                    data = self.recv(1024)
636                    self.logfunc(data)
637
638    # Since the socket is non-blocking make sure to honor EAGAIN
639    # and EWOULDBLOCK.
640    def recv(self, count):
641        try:
642            data = self.readsock.recv(count)
643        except socket.error as e:
644            if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK:
645                return ''
646            else:
647                raise
648
649        if data is None:
650            raise Exception("No data on read ready socket")
651        elif not data:
652            # This actually means an orderly shutdown
653            # happened. But for this code it counts as an
654            # error since the connection shouldn't go away
655            # until qemu exits.
656            raise Exception("Console connection closed unexpectedly")
657
658        return data
659
660    def stringify_event(self, event):
661        val = ''
662        if select.POLLERR == event:
663            val = 'POLLER'
664        elif select.POLLHUP == event:
665            val = 'POLLHUP'
666        elif select.POLLNVAL == event:
667            val = 'POLLNVAL'
668        return val
669
670    def close_socket(self, sock):
671        sock.shutdown(socket.SHUT_RDWR)
672        sock.close()
673
674    def close_ignore_error(self, fd):
675        try:
676            os.close(fd)
677        except OSError:
678            pass
679