1""" 2QEMU qtest library 3 4qtest offers the QEMUQtestProtocol and QEMUQTestMachine classes, which 5offer a connection to QEMU's qtest protocol socket, and a qtest-enabled 6subclass of QEMUMachine, respectively. 7""" 8 9# Copyright (C) 2015 Red Hat Inc. 10# 11# Authors: 12# Fam Zheng <famz@redhat.com> 13# 14# This work is licensed under the terms of the GNU GPL, version 2. See 15# the COPYING file in the top-level directory. 16# 17# Based on qmp.py. 18# 19 20import os 21import socket 22from typing import ( 23 List, 24 Optional, 25 Sequence, 26 TextIO, 27) 28 29from qemu.qmp import SocketAddrT # pylint: disable=import-error 30 31from .machine import QEMUMachine 32 33 34class QEMUQtestProtocol: 35 """ 36 QEMUQtestProtocol implements a connection to a qtest socket. 37 38 :param address: QEMU address, can be either a unix socket path (string) 39 or a tuple in the form ( address, port ) for a TCP 40 connection 41 :param server: server mode, listens on the socket (bool) 42 :raise socket.error: on socket connection errors 43 44 .. note:: 45 No conection is estabalished by __init__(), this is done 46 by the connect() or accept() methods. 47 """ 48 def __init__(self, address: SocketAddrT, 49 server: bool = False): 50 self._address = address 51 self._sock = self._get_sock() 52 self._sockfile: Optional[TextIO] = None 53 if server: 54 self._sock.bind(self._address) 55 self._sock.listen(1) 56 57 def _get_sock(self) -> socket.socket: 58 if isinstance(self._address, tuple): 59 family = socket.AF_INET 60 else: 61 family = socket.AF_UNIX 62 return socket.socket(family, socket.SOCK_STREAM) 63 64 def connect(self) -> None: 65 """ 66 Connect to the qtest socket. 67 68 @raise socket.error on socket connection errors 69 """ 70 self._sock.connect(self._address) 71 self._sockfile = self._sock.makefile(mode='r') 72 73 def accept(self) -> None: 74 """ 75 Await connection from QEMU. 76 77 @raise socket.error on socket connection errors 78 """ 79 self._sock, _ = self._sock.accept() 80 self._sockfile = self._sock.makefile(mode='r') 81 82 def cmd(self, qtest_cmd: str) -> str: 83 """ 84 Send a qtest command on the wire. 85 86 @param qtest_cmd: qtest command text to be sent 87 """ 88 assert self._sockfile is not None 89 self._sock.sendall((qtest_cmd + "\n").encode('utf-8')) 90 resp = self._sockfile.readline() 91 return resp 92 93 def close(self) -> None: 94 """ 95 Close this socket. 96 """ 97 self._sock.close() 98 if self._sockfile: 99 self._sockfile.close() 100 self._sockfile = None 101 102 def settimeout(self, timeout: Optional[float]) -> None: 103 """Set a timeout, in seconds.""" 104 self._sock.settimeout(timeout) 105 106 107class QEMUQtestMachine(QEMUMachine): 108 """ 109 A QEMU VM, with a qtest socket available. 110 """ 111 112 def __init__(self, 113 binary: str, 114 args: Sequence[str] = (), 115 wrapper: Sequence[str] = (), 116 name: Optional[str] = None, 117 base_temp_dir: str = "/var/tmp", 118 sock_dir: Optional[str] = None, 119 qmp_timer: Optional[float] = None): 120 # pylint: disable=too-many-arguments 121 122 if name is None: 123 name = "qemu-%d" % os.getpid() 124 if sock_dir is None: 125 sock_dir = base_temp_dir 126 super().__init__(binary, args, wrapper=wrapper, name=name, 127 base_temp_dir=base_temp_dir, 128 sock_dir=sock_dir, qmp_timer=qmp_timer) 129 self._qtest: Optional[QEMUQtestProtocol] = None 130 self._qtest_path = os.path.join(sock_dir, name + "-qtest.sock") 131 132 @property 133 def _base_args(self) -> List[str]: 134 args = super()._base_args 135 args.extend([ 136 '-qtest', f"unix:path={self._qtest_path}", 137 '-accel', 'qtest' 138 ]) 139 return args 140 141 def _pre_launch(self) -> None: 142 super()._pre_launch() 143 self._qtest = QEMUQtestProtocol(self._qtest_path, server=True) 144 145 def _post_launch(self) -> None: 146 assert self._qtest is not None 147 super()._post_launch() 148 self._qtest.accept() 149 150 def _post_shutdown(self) -> None: 151 super()._post_shutdown() 152 self._remove_if_exists(self._qtest_path) 153 154 def qtest(self, cmd: str) -> str: 155 """ 156 Send a qtest command to the guest. 157 158 :param cmd: qtest command to send 159 :return: qtest server response 160 """ 161 if self._qtest is None: 162 raise RuntimeError("qtest socket not available") 163 return self._qtest.cmd(cmd) 164