xref: /openbmc/qemu/python/qemu/machine/qtest.py (revision 800485762e6564e04e2ab315132d477069562d91)
1"""
2QEMU qtest library
3
4qtest offers the QEMUQtestProtocol and QEMUQTestMachine classes, which
5offer a connection to QEMU's qtest protocol socket, and a qtest-enabled
6subclass of QEMUMachine, respectively.
7"""
8
9# Copyright (C) 2015 Red Hat Inc.
10#
11# Authors:
12#  Fam Zheng <famz@redhat.com>
13#
14# This work is licensed under the terms of the GNU GPL, version 2.  See
15# the COPYING file in the top-level directory.
16#
17# Based on qmp.py.
18#
19
20import os
21import socket
22from typing import (
23    List,
24    Optional,
25    Sequence,
26    TextIO,
27    Tuple,
28)
29
30from qemu.qmp import SocketAddrT
31
32from .machine import QEMUMachine
33
34
35class QEMUQtestProtocol:
36    """
37    QEMUQtestProtocol implements a connection to a qtest socket.
38
39    :param address: QEMU address, can be either a unix socket path (string)
40                    or a tuple in the form ( address, port ) for a TCP
41                    connection
42    :param sock: An existing socket can be provided as an alternative to
43                 an address. One of address or sock must be provided.
44    :param server: server mode, listens on the socket. Only meaningful
45                   in conjunction with an address and not an existing
46                   socket.
47
48    :raise socket.error: on socket connection errors
49
50    .. note::
51       No connection is established by __init__(), this is done
52       by the connect() or accept() methods.
53    """
54    def __init__(self,
55                 address: Optional[SocketAddrT] = None,
56                 sock: Optional[socket.socket] = None,
57                 server: bool = False):
58        if address is None and sock is None:
59            raise ValueError("Either 'address' or 'sock' must be specified")
60        if address is not None and sock is not None:
61            raise ValueError(
62                "Either 'address' or 'sock' must be specified, but not both")
63        if sock is not None and server:
64            raise ValueError("server=True is meaningless when passing socket")
65
66        self._address = address
67        self._sock = sock or self._get_sock()
68        self._sockfile: Optional[TextIO] = None
69
70        if server:
71            assert self._address is not None
72            self._sock.bind(self._address)
73            self._sock.listen(1)
74
75    def _get_sock(self) -> socket.socket:
76        assert self._address is not None
77        if isinstance(self._address, tuple):
78            family = socket.AF_INET
79        else:
80            family = socket.AF_UNIX
81        return socket.socket(family, socket.SOCK_STREAM)
82
83    def connect(self) -> None:
84        """
85        Connect to the qtest socket.
86
87        @raise socket.error on socket connection errors
88        """
89        if self._address is not None:
90            self._sock.connect(self._address)
91        self._sockfile = self._sock.makefile(mode='r')
92
93    def accept(self) -> None:
94        """
95        Await connection from QEMU.
96
97        @raise socket.error on socket connection errors
98        """
99        self._sock, _ = self._sock.accept()
100        self._sockfile = self._sock.makefile(mode='r')
101
102    def cmd(self, qtest_cmd: str) -> str:
103        """
104        Send a qtest command on the wire.
105
106        @param qtest_cmd: qtest command text to be sent
107        """
108        assert self._sockfile is not None
109        self._sock.sendall((qtest_cmd + "\n").encode('utf-8'))
110        resp = self._sockfile.readline()
111        return resp
112
113    def close(self) -> None:
114        """
115        Close this socket.
116        """
117        self._sock.close()
118        if self._sockfile:
119            self._sockfile.close()
120            self._sockfile = None
121
122    def settimeout(self, timeout: Optional[float]) -> None:
123        """Set a timeout, in seconds."""
124        self._sock.settimeout(timeout)
125
126
127class QEMUQtestMachine(QEMUMachine):
128    """
129    A QEMU VM, with a qtest socket available.
130    """
131
132    def __init__(self,
133                 binary: str,
134                 args: Sequence[str] = (),
135                 wrapper: Sequence[str] = (),
136                 name: Optional[str] = None,
137                 base_temp_dir: str = "/var/tmp",
138                 qmp_timer: Optional[float] = None):
139        # pylint: disable=too-many-arguments
140
141        if name is None:
142            name = "qemu-%d" % os.getpid()
143        super().__init__(binary, args, wrapper=wrapper, name=name,
144                         base_temp_dir=base_temp_dir,
145                         qmp_timer=qmp_timer)
146        self._qtest: Optional[QEMUQtestProtocol] = None
147        self._qtest_sock_pair: Optional[
148            Tuple[socket.socket, socket.socket]] = None
149
150    @property
151    def _base_args(self) -> List[str]:
152        args = super()._base_args
153        assert self._qtest_sock_pair is not None
154        fd = self._qtest_sock_pair[0].fileno()
155        args.extend([
156            '-chardev', f"socket,id=qtest,fd={fd}",
157            '-qtest', 'chardev:qtest',
158            '-accel', 'qtest'
159        ])
160        return args
161
162    def _pre_launch(self) -> None:
163        self._qtest_sock_pair = socket.socketpair()
164        os.set_inheritable(self._qtest_sock_pair[0].fileno(), True)
165        super()._pre_launch()
166        self._qtest = QEMUQtestProtocol(sock=self._qtest_sock_pair[1])
167
168    def _post_launch(self) -> None:
169        assert self._qtest is not None
170        super()._post_launch()
171        if self._qtest_sock_pair:
172            self._qtest_sock_pair[0].close()
173        self._qtest.connect()
174
175    def _post_shutdown(self) -> None:
176        if self._qtest_sock_pair:
177            self._qtest_sock_pair[0].close()
178            self._qtest_sock_pair[1].close()
179            self._qtest_sock_pair = None
180        super()._post_shutdown()
181
182    def qtest(self, cmd: str) -> str:
183        """
184        Send a qtest command to the guest.
185
186        :param cmd: qtest command to send
187        :return: qtest server response
188        """
189        if self._qtest is None:
190            raise RuntimeError("qtest socket not available")
191        return self._qtest.cmd(cmd)
192