1""" 2(Legacy) Sync QMP Wrapper 3 4This module provides the `QEMUMonitorProtocol` class, which is a 5synchronous wrapper around `QMPClient`. 6 7Its design closely resembles that of the original QEMUMonitorProtocol 8class, originally written by Luiz Capitulino. It is provided here for 9compatibility with scripts inside the QEMU source tree that expect the 10old interface. 11""" 12 13# 14# Copyright (C) 2009-2022 Red Hat Inc. 15# 16# Authors: 17# Luiz Capitulino <lcapitulino@redhat.com> 18# John Snow <jsnow@redhat.com> 19# 20# This work is licensed under the terms of the GNU GPL, version 2. See 21# the COPYING file in the top-level directory. 22# 23 24import asyncio 25import socket 26from types import TracebackType 27from typing import ( 28 Any, 29 Awaitable, 30 Dict, 31 List, 32 Optional, 33 Type, 34 TypeVar, 35 Union, 36) 37 38from .error import QMPError 39from .protocol import Runstate, SocketAddrT 40from .qmp_client import QMPClient 41 42 43#: QMPMessage is an entire QMP message of any kind. 44QMPMessage = Dict[str, Any] 45 46#: QMPReturnValue is the 'return' value of a command. 47QMPReturnValue = object 48 49#: QMPObject is any object in a QMP message. 50QMPObject = Dict[str, object] 51 52# QMPMessage can be outgoing commands or incoming events/returns. 53# QMPReturnValue is usually a dict/json object, but due to QAPI's 54# 'command-returns-exceptions', it can actually be anything. 55# 56# {'return': {}} is a QMPMessage, 57# {} is the QMPReturnValue. 58 59 60class QMPBadPortError(QMPError): 61 """ 62 Unable to parse socket address: Port was non-numerical. 63 """ 64 65 66class QEMUMonitorProtocol: 67 """ 68 Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) 69 and then allow to handle commands and events. 70 71 :param address: QEMU address, can be a unix socket path (string), a tuple 72 in the form ( address, port ) for a TCP connection, or an 73 existing `socket.socket` object. 74 :param server: Act as the socket server. (See 'accept') 75 Not applicable when passing a socket directly. 76 :param nickname: Optional nickname used for logging. 77 """ 78 79 def __init__(self, 80 address: Union[SocketAddrT, socket.socket], 81 server: bool = False, 82 nickname: Optional[str] = None): 83 84 if server and isinstance(address, socket.socket): 85 raise ValueError( 86 "server argument should be False when passing a socket") 87 88 self._qmp = QMPClient(nickname) 89 self._aloop = asyncio.get_event_loop() 90 self._address = address 91 self._timeout: Optional[float] = None 92 93 if server: 94 assert not isinstance(self._address, socket.socket) 95 self._sync(self._qmp.start_server(self._address)) 96 97 _T = TypeVar('_T') 98 99 def _sync( 100 self, future: Awaitable[_T], timeout: Optional[float] = None 101 ) -> _T: 102 return self._aloop.run_until_complete( 103 asyncio.wait_for(future, timeout=timeout) 104 ) 105 106 def _get_greeting(self) -> Optional[QMPMessage]: 107 if self._qmp.greeting is not None: 108 # pylint: disable=protected-access 109 return self._qmp.greeting._asdict() 110 return None 111 112 def __enter__(self: _T) -> _T: 113 # Implement context manager enter function. 114 return self 115 116 def __exit__(self, 117 exc_type: Optional[Type[BaseException]], 118 exc_val: Optional[BaseException], 119 exc_tb: Optional[TracebackType]) -> None: 120 # Implement context manager exit function. 121 self.close() 122 123 @classmethod 124 def parse_address(cls, address: str) -> SocketAddrT: 125 """ 126 Parse a string into a QMP address. 127 128 Figure out if the argument is in the port:host form. 129 If it's not, it's probably a file path. 130 """ 131 components = address.split(':') 132 if len(components) == 2: 133 try: 134 port = int(components[1]) 135 except ValueError: 136 msg = f"Bad port: '{components[1]}' in '{address}'." 137 raise QMPBadPortError(msg) from None 138 return (components[0], port) 139 140 # Treat as filepath. 141 return address 142 143 def connect(self, negotiate: bool = True) -> Optional[QMPMessage]: 144 """ 145 Connect to the QMP Monitor and perform capabilities negotiation. 146 147 :return: QMP greeting dict, or None if negotiate is false 148 :raise ConnectError: on connection errors 149 """ 150 self._qmp.await_greeting = negotiate 151 self._qmp.negotiate = negotiate 152 153 self._sync( 154 self._qmp.connect(self._address) 155 ) 156 return self._get_greeting() 157 158 def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage: 159 """ 160 Await connection from QMP Monitor and perform capabilities negotiation. 161 162 :param timeout: 163 timeout in seconds (nonnegative float number, or None). 164 If None, there is no timeout, and this may block forever. 165 166 :return: QMP greeting dict 167 :raise ConnectError: on connection errors 168 """ 169 self._qmp.await_greeting = True 170 self._qmp.negotiate = True 171 172 self._sync(self._qmp.accept(), timeout) 173 174 ret = self._get_greeting() 175 assert ret is not None 176 return ret 177 178 def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage: 179 """ 180 Send a QMP command to the QMP Monitor. 181 182 :param qmp_cmd: QMP command to be sent as a Python dict 183 :return: QMP response as a Python dict 184 """ 185 return dict( 186 self._sync( 187 # pylint: disable=protected-access 188 189 # _raw() isn't a public API, because turning off 190 # automatic ID assignment is discouraged. For 191 # compatibility with iotests *only*, do it anyway. 192 self._qmp._raw(qmp_cmd, assign_id=False), 193 self._timeout 194 ) 195 ) 196 197 def cmd_raw(self, name: str, 198 args: Optional[Dict[str, object]] = None) -> QMPMessage: 199 """ 200 Build a QMP command and send it to the QMP Monitor. 201 202 :param name: command name (string) 203 :param args: command arguments (dict) 204 """ 205 qmp_cmd: QMPMessage = {'execute': name} 206 if args: 207 qmp_cmd['arguments'] = args 208 return self.cmd_obj(qmp_cmd) 209 210 def cmd(self, cmd: str, **kwds: object) -> QMPReturnValue: 211 """ 212 Build and send a QMP command to the monitor, report errors if any 213 """ 214 return self._sync( 215 self._qmp.execute(cmd, kwds), 216 self._timeout 217 ) 218 219 def pull_event(self, 220 wait: Union[bool, float] = False) -> Optional[QMPMessage]: 221 """ 222 Pulls a single event. 223 224 :param wait: 225 If False or 0, do not wait. Return None if no events ready. 226 If True, wait forever until the next event. 227 Otherwise, wait for the specified number of seconds. 228 229 :raise asyncio.TimeoutError: 230 When a timeout is requested and the timeout period elapses. 231 232 :return: The first available QMP event, or None. 233 """ 234 if not wait: 235 # wait is False/0: "do not wait, do not except." 236 if self._qmp.events.empty(): 237 return None 238 239 # If wait is 'True', wait forever. If wait is False/0, the events 240 # queue must not be empty; but it still needs some real amount 241 # of time to complete. 242 timeout = None 243 if wait and isinstance(wait, float): 244 timeout = wait 245 246 return dict( 247 self._sync( 248 self._qmp.events.get(), 249 timeout 250 ) 251 ) 252 253 def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]: 254 """ 255 Get a list of QMP events and clear all pending events. 256 257 :param wait: 258 If False or 0, do not wait. Return None if no events ready. 259 If True, wait until we have at least one event. 260 Otherwise, wait for up to the specified number of seconds for at 261 least one event. 262 263 :raise asyncio.TimeoutError: 264 When a timeout is requested and the timeout period elapses. 265 266 :return: A list of QMP events. 267 """ 268 events = [dict(x) for x in self._qmp.events.clear()] 269 if events: 270 return events 271 272 event = self.pull_event(wait) 273 return [event] if event is not None else [] 274 275 def clear_events(self) -> None: 276 """Clear current list of pending events.""" 277 self._qmp.events.clear() 278 279 def close(self) -> None: 280 """Close the connection.""" 281 self._sync( 282 self._qmp.disconnect() 283 ) 284 285 def settimeout(self, timeout: Optional[float]) -> None: 286 """ 287 Set the timeout for QMP RPC execution. 288 289 This timeout affects the `cmd`, `cmd_obj`, and `command` methods. 290 The `accept`, `pull_event` and `get_event` methods have their 291 own configurable timeouts. 292 293 :param timeout: 294 timeout in seconds, or None. 295 None will wait indefinitely. 296 """ 297 self._timeout = timeout 298 299 def send_fd_scm(self, fd: int) -> None: 300 """ 301 Send a file descriptor to the remote via SCM_RIGHTS. 302 """ 303 self._qmp.send_fd_scm(fd) 304 305 def __del__(self) -> None: 306 if self._qmp.runstate == Runstate.IDLE: 307 return 308 309 if not self._aloop.is_running(): 310 self.close() 311 else: 312 # Garbage collection ran while the event loop was running. 313 # Nothing we can do about it now, but if we don't raise our 314 # own error, the user will be treated to a lot of traceback 315 # they might not understand. 316 raise QMPError( 317 "QEMUMonitorProtocol.close()" 318 " was not called before object was garbage collected" 319 ) 320