1""" 2(Legacy) Sync QMP Wrapper 3 4This module provides the `QEMUMonitorProtocol` class, which is a 5synchronous wrapper around `QMPClient`. 6 7Its design closely resembles that of the original QEMUMonitorProtocol 8class, originally written by Luiz Capitulino. It is provided here for 9compatibility with scripts inside the QEMU source tree that expect the 10old interface. 11""" 12 13# 14# Copyright (C) 2009-2022 Red Hat Inc. 15# 16# Authors: 17# Luiz Capitulino <lcapitulino@redhat.com> 18# John Snow <jsnow@redhat.com> 19# 20# This work is licensed under the terms of the GNU GPL, version 2. See 21# the COPYING file in the top-level directory. 22# 23 24import asyncio 25import socket 26from types import TracebackType 27from typing import ( 28 Any, 29 Awaitable, 30 Dict, 31 List, 32 Optional, 33 Type, 34 TypeVar, 35 Union, 36) 37 38from .error import QMPError 39from .protocol import Runstate, SocketAddrT 40from .qmp_client import QMPClient 41from .util import get_or_create_event_loop 42 43 44#: QMPMessage is an entire QMP message of any kind. 45QMPMessage = Dict[str, Any] 46 47#: QMPReturnValue is the 'return' value of a command. 48QMPReturnValue = object 49 50#: QMPObject is any object in a QMP message. 51QMPObject = Dict[str, object] 52 53# QMPMessage can be outgoing commands or incoming events/returns. 54# QMPReturnValue is usually a dict/json object, but due to QAPI's 55# 'command-returns-exceptions', it can actually be anything. 56# 57# {'return': {}} is a QMPMessage, 58# {} is the QMPReturnValue. 59 60 61class QMPBadPortError(QMPError): 62 """ 63 Unable to parse socket address: Port was non-numerical. 64 """ 65 66 67class QEMUMonitorProtocol: 68 """ 69 Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) 70 and then allow to handle commands and events. 71 72 :param address: QEMU address, can be a unix socket path (string), a tuple 73 in the form ( address, port ) for a TCP connection, or an 74 existing `socket.socket` object. 75 :param server: Act as the socket server. (See 'accept') 76 Not applicable when passing a socket directly. 77 :param nickname: Optional nickname used for logging. 78 """ 79 80 def __init__(self, 81 address: Union[SocketAddrT, socket.socket], 82 server: bool = False, 83 nickname: Optional[str] = None): 84 85 if server and isinstance(address, socket.socket): 86 raise ValueError( 87 "server argument should be False when passing a socket") 88 89 self._qmp = QMPClient(nickname) 90 self._address = address 91 self._timeout: Optional[float] = None 92 93 # This is a sync shim intended for use in fully synchronous 94 # programs. Create and set an event loop if necessary. 95 self._aloop = get_or_create_event_loop() 96 97 if server: 98 assert not isinstance(self._address, socket.socket) 99 self._sync(self._qmp.start_server(self._address)) 100 101 _T = TypeVar('_T') 102 103 def _sync( 104 self, future: Awaitable[_T], timeout: Optional[float] = None 105 ) -> _T: 106 return self._aloop.run_until_complete( 107 asyncio.wait_for(future, timeout=timeout) 108 ) 109 110 def _get_greeting(self) -> Optional[QMPMessage]: 111 if self._qmp.greeting is not None: 112 # pylint: disable=protected-access 113 return self._qmp.greeting._asdict() 114 return None 115 116 def __enter__(self: _T) -> _T: 117 # Implement context manager enter function. 118 return self 119 120 def __exit__(self, 121 exc_type: Optional[Type[BaseException]], 122 exc_val: Optional[BaseException], 123 exc_tb: Optional[TracebackType]) -> None: 124 # Implement context manager exit function. 125 self.close() 126 127 @classmethod 128 def parse_address(cls, address: str) -> SocketAddrT: 129 """ 130 Parse a string into a QMP address. 131 132 Figure out if the argument is in the port:host form. 133 If it's not, it's probably a file path. 134 """ 135 components = address.split(':') 136 if len(components) == 2: 137 try: 138 port = int(components[1]) 139 except ValueError: 140 msg = f"Bad port: '{components[1]}' in '{address}'." 141 raise QMPBadPortError(msg) from None 142 return (components[0], port) 143 144 # Treat as filepath. 145 return address 146 147 def connect(self, negotiate: bool = True) -> Optional[QMPMessage]: 148 """ 149 Connect to the QMP Monitor and perform capabilities negotiation. 150 151 :return: QMP greeting dict, or None if negotiate is false 152 :raise ConnectError: on connection errors 153 """ 154 self._qmp.await_greeting = negotiate 155 self._qmp.negotiate = negotiate 156 157 self._sync( 158 self._qmp.connect(self._address) 159 ) 160 return self._get_greeting() 161 162 def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage: 163 """ 164 Await connection from QMP Monitor and perform capabilities negotiation. 165 166 :param timeout: 167 timeout in seconds (nonnegative float number, or None). 168 If None, there is no timeout, and this may block forever. 169 170 :return: QMP greeting dict 171 :raise ConnectError: on connection errors 172 """ 173 self._qmp.await_greeting = True 174 self._qmp.negotiate = True 175 176 self._sync(self._qmp.accept(), timeout) 177 178 ret = self._get_greeting() 179 assert ret is not None 180 return ret 181 182 def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage: 183 """ 184 Send a QMP command to the QMP Monitor. 185 186 :param qmp_cmd: QMP command to be sent as a Python dict 187 :return: QMP response as a Python dict 188 """ 189 return dict( 190 self._sync( 191 # pylint: disable=protected-access 192 193 # _raw() isn't a public API, because turning off 194 # automatic ID assignment is discouraged. For 195 # compatibility with iotests *only*, do it anyway. 196 self._qmp._raw(qmp_cmd, assign_id=False), 197 self._timeout 198 ) 199 ) 200 201 def cmd_raw(self, name: str, 202 args: Optional[Dict[str, object]] = None) -> QMPMessage: 203 """ 204 Build a QMP command and send it to the QMP Monitor. 205 206 :param name: command name (string) 207 :param args: command arguments (dict) 208 """ 209 qmp_cmd: QMPMessage = {'execute': name} 210 if args: 211 qmp_cmd['arguments'] = args 212 return self.cmd_obj(qmp_cmd) 213 214 def cmd(self, cmd: str, **kwds: object) -> QMPReturnValue: 215 """ 216 Build and send a QMP command to the monitor, report errors if any 217 """ 218 return self._sync( 219 self._qmp.execute(cmd, kwds), 220 self._timeout 221 ) 222 223 def pull_event(self, 224 wait: Union[bool, float] = False) -> Optional[QMPMessage]: 225 """ 226 Pulls a single event. 227 228 :param wait: 229 If False or 0, do not wait. Return None if no events ready. 230 If True, wait forever until the next event. 231 Otherwise, wait for the specified number of seconds. 232 233 :raise asyncio.TimeoutError: 234 When a timeout is requested and the timeout period elapses. 235 236 :return: The first available QMP event, or None. 237 """ 238 # Kick the event loop to allow events to accumulate 239 self._sync(asyncio.sleep(0)) 240 241 if not wait: 242 # wait is False/0: "do not wait, do not except." 243 if self._qmp.events.empty(): 244 return None 245 246 # If wait is 'True', wait forever. If wait is False/0, the events 247 # queue must not be empty; but it still needs some real amount 248 # of time to complete. 249 timeout = None 250 if wait and isinstance(wait, float): 251 timeout = wait 252 253 return dict( 254 self._sync( 255 self._qmp.events.get(), 256 timeout 257 ) 258 ) 259 260 def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]: 261 """ 262 Get a list of QMP events and clear all pending events. 263 264 :param wait: 265 If False or 0, do not wait. Return None if no events ready. 266 If True, wait until we have at least one event. 267 Otherwise, wait for up to the specified number of seconds for at 268 least one event. 269 270 :raise asyncio.TimeoutError: 271 When a timeout is requested and the timeout period elapses. 272 273 :return: A list of QMP events. 274 """ 275 events = [dict(x) for x in self._qmp.events.clear()] 276 if events: 277 return events 278 279 event = self.pull_event(wait) 280 return [event] if event is not None else [] 281 282 def clear_events(self) -> None: 283 """Clear current list of pending events.""" 284 self._qmp.events.clear() 285 286 def close(self) -> None: 287 """Close the connection.""" 288 self._sync( 289 self._qmp.disconnect() 290 ) 291 292 def settimeout(self, timeout: Optional[float]) -> None: 293 """ 294 Set the timeout for QMP RPC execution. 295 296 This timeout affects the `cmd`, `cmd_obj`, and `command` methods. 297 The `accept`, `pull_event` and `get_event` methods have their 298 own configurable timeouts. 299 300 :param timeout: 301 timeout in seconds, or None. 302 None will wait indefinitely. 303 """ 304 self._timeout = timeout 305 306 def send_fd_scm(self, fd: int) -> None: 307 """ 308 Send a file descriptor to the remote via SCM_RIGHTS. 309 """ 310 self._qmp.send_fd_scm(fd) 311 312 def __del__(self) -> None: 313 if self._qmp.runstate != Runstate.IDLE: 314 self._qmp.logger.warning( 315 "QEMUMonitorProtocol object garbage collected without a prior " 316 "call to close()" 317 ) 318 319 if not self._aloop.is_running(): 320 if self._qmp.runstate != Runstate.IDLE: 321 # If the user neglected to close the QMP session and we 322 # are not currently running in an asyncio context, we 323 # have the opportunity to close the QMP session. If we 324 # do not do this, the error messages presented over 325 # dangling async resources may not make any sense to the 326 # user. 327 self.close() 328 329 if self._qmp.runstate != Runstate.IDLE: 330 # If QMP is still not quiesced, it means that the garbage 331 # collector ran from a context within the event loop and we 332 # are simply too late to take any corrective action. Raise 333 # our own error to give meaningful feedback to the user in 334 # order to prevent pages of asyncio stacktrace jargon. 335 raise QMPError( 336 "QEMUMonitorProtocol.close() was not called before object was " 337 "garbage collected, and could not be closed due to GC running " 338 "in the event loop" 339 ) 340