1#!/usr/bin/env python3 2# 3# Low-level QEMU shell on top of QMP. 4# 5# Copyright (C) 2009, 2010 Red Hat Inc. 6# 7# Authors: 8# Luiz Capitulino <lcapitulino@redhat.com> 9# 10# This work is licensed under the terms of the GNU GPL, version 2. See 11# the COPYING file in the top-level directory. 12# 13# Usage: 14# 15# Start QEMU with: 16# 17# # qemu [...] -qmp unix:./qmp-sock,server 18# 19# Run the shell: 20# 21# $ qmp-shell ./qmp-sock 22# 23# Commands have the following format: 24# 25# < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 26# 27# For example: 28# 29# (QEMU) device_add driver=e1000 id=net1 30# {u'return': {}} 31# (QEMU) 32# 33# key=value pairs also support Python or JSON object literal subset notations, 34# without spaces. Dictionaries/objects {} are supported as are arrays []. 35# 36# example-command arg-name1={'key':'value','obj'={'prop':"value"}} 37# 38# Both JSON and Python formatting should work, including both styles of 39# string literal quotes. Both paradigms of literal values should work, 40# including null/true/false for JSON and None/True/False for Python. 41# 42# 43# Transactions have the following multi-line format: 44# 45# transaction( 46# action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ] 47# ... 48# action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ] 49# ) 50# 51# One line transactions are also supported: 52# 53# transaction( action-name1 ... ) 54# 55# For example: 56# 57# (QEMU) transaction( 58# TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1 59# TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0 60# TRANS> ) 61# {"return": {}} 62# (QEMU) 63# 64# Use the -v and -p options to activate the verbose and pretty-print options, 65# which will echo back the properly formatted JSON-compliant QMP that is being 66# sent to QEMU, which is useful for debugging and documentation generation. 67import argparse 68import ast 69import atexit 70import json 71import os 72import re 73import readline 74import sys 75from typing import ( 76 Iterator, 77 List, 78 NoReturn, 79 Optional, 80 Sequence, 81) 82 83 84sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 85from qemu import qmp 86from qemu.qmp import QMPMessage 87 88 89class QMPCompleter: 90 # NB: Python 3.9+ will probably allow us to subclass list[str] directly, 91 # but pylint as of today does not know that List[str] is simply 'list'. 92 def __init__(self) -> None: 93 self._matches: List[str] = [] 94 95 def append(self, value: str) -> None: 96 return self._matches.append(value) 97 98 def complete(self, text: str, state: int) -> Optional[str]: 99 for cmd in self._matches: 100 if cmd.startswith(text): 101 if state == 0: 102 return cmd 103 state -= 1 104 return None 105 106 107class QMPShellError(Exception): 108 pass 109 110 111class FuzzyJSON(ast.NodeTransformer): 112 """ 113 This extension of ast.NodeTransformer filters literal "true/false/null" 114 values in a Python AST and replaces them by proper "True/False/None" values 115 that Python can properly evaluate. 116 """ 117 118 @classmethod 119 def visit_Name(cls, # pylint: disable=invalid-name 120 node: ast.Name) -> ast.AST: 121 if node.id == 'true': 122 return ast.Constant(value=True) 123 if node.id == 'false': 124 return ast.Constant(value=False) 125 if node.id == 'null': 126 return ast.Constant(value=None) 127 return node 128 129 130# TODO: QMPShell's interface is a bit ugly (eg. _fill_completion() and 131# _execute_cmd()). Let's design a better one. 132class QMPShell(qmp.QEMUMonitorProtocol): 133 def __init__(self, address: str, pretty: bool = False, 134 verbose: bool = False): 135 super().__init__(self.parse_address(address)) 136 self._greeting: Optional[QMPMessage] = None 137 self._completer = QMPCompleter() 138 self._pretty = pretty 139 self._transmode = False 140 self._actions: List[QMPMessage] = [] 141 self._histfile = os.path.join(os.path.expanduser('~'), 142 '.qmp-shell_history') 143 self.verbose = verbose 144 145 def _fill_completion(self) -> None: 146 cmds = self.cmd('query-commands') 147 if 'error' in cmds: 148 return 149 for cmd in cmds['return']: 150 self._completer.append(cmd['name']) 151 152 def __completer_setup(self) -> None: 153 self._completer = QMPCompleter() 154 self._fill_completion() 155 readline.set_history_length(1024) 156 readline.set_completer(self._completer.complete) 157 readline.parse_and_bind("tab: complete") 158 # NB: default delimiters conflict with some command names 159 # (eg. query-), clearing everything as it doesn't seem to matter 160 readline.set_completer_delims('') 161 try: 162 readline.read_history_file(self._histfile) 163 except FileNotFoundError: 164 pass 165 except IOError as err: 166 print(f"Failed to read history '{self._histfile}': {err!s}") 167 atexit.register(self.__save_history) 168 169 def __save_history(self) -> None: 170 try: 171 readline.write_history_file(self._histfile) 172 except IOError as err: 173 print(f"Failed to save history file '{self._histfile}': {err!s}") 174 175 @classmethod 176 def __parse_value(cls, val: str) -> object: 177 try: 178 return int(val) 179 except ValueError: 180 pass 181 182 if val.lower() == 'true': 183 return True 184 if val.lower() == 'false': 185 return False 186 if val.startswith(('{', '[')): 187 # Try first as pure JSON: 188 try: 189 return json.loads(val) 190 except ValueError: 191 pass 192 # Try once again as FuzzyJSON: 193 try: 194 tree = ast.parse(val, mode='eval') 195 transformed = FuzzyJSON().visit(tree) 196 return ast.literal_eval(transformed) 197 except (SyntaxError, ValueError): 198 pass 199 return val 200 201 def __cli_expr(self, 202 tokens: Sequence[str], 203 parent: qmp.QMPObject) -> None: 204 for arg in tokens: 205 (key, sep, val) = arg.partition('=') 206 if sep != '=': 207 raise QMPShellError( 208 f"Expected a key=value pair, got '{arg!s}'" 209 ) 210 211 value = self.__parse_value(val) 212 optpath = key.split('.') 213 curpath = [] 214 for path in optpath[:-1]: 215 curpath.append(path) 216 obj = parent.get(path, {}) 217 if not isinstance(obj, dict): 218 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 219 raise QMPShellError(msg.format('.'.join(curpath))) 220 parent[path] = obj 221 parent = obj 222 if optpath[-1] in parent: 223 if isinstance(parent[optpath[-1]], dict): 224 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 225 raise QMPShellError(msg.format('.'.join(curpath))) 226 raise QMPShellError(f'Cannot set "{key}" multiple times') 227 parent[optpath[-1]] = value 228 229 def __build_cmd(self, cmdline: str) -> Optional[QMPMessage]: 230 """ 231 Build a QMP input object from a user provided command-line in the 232 following format: 233 234 < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 235 """ 236 argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''' 237 cmdargs = re.findall(argument_regex, cmdline) 238 qmpcmd: QMPMessage 239 240 # Transactional CLI entry/exit: 241 if cmdargs[0] == 'transaction(': 242 self._transmode = True 243 cmdargs.pop(0) 244 elif cmdargs[0] == ')' and self._transmode: 245 self._transmode = False 246 if len(cmdargs) > 1: 247 msg = 'Unexpected input after close of Transaction sub-shell' 248 raise QMPShellError(msg) 249 qmpcmd = { 250 'execute': 'transaction', 251 'arguments': {'actions': self._actions} 252 } 253 self._actions = list() 254 return qmpcmd 255 256 # Nothing to process? 257 if not cmdargs: 258 return None 259 260 # Parse and then cache this Transactional Action 261 if self._transmode: 262 finalize = False 263 action = {'type': cmdargs[0], 'data': {}} 264 if cmdargs[-1] == ')': 265 cmdargs.pop(-1) 266 finalize = True 267 self.__cli_expr(cmdargs[1:], action['data']) 268 self._actions.append(action) 269 return self.__build_cmd(')') if finalize else None 270 271 # Standard command: parse and return it to be executed. 272 qmpcmd = {'execute': cmdargs[0], 'arguments': {}} 273 self.__cli_expr(cmdargs[1:], qmpcmd['arguments']) 274 return qmpcmd 275 276 def _print(self, qmp_message: object) -> None: 277 indent = None 278 if self._pretty: 279 indent = 4 280 jsobj = json.dumps(qmp_message, indent=indent, sort_keys=self._pretty) 281 print(str(jsobj)) 282 283 def _execute_cmd(self, cmdline: str) -> bool: 284 try: 285 qmpcmd = self.__build_cmd(cmdline) 286 except Exception as err: 287 print('Error while parsing command line: %s' % err) 288 print('command format: <command-name> ', end=' ') 289 print('[arg-name1=arg1] ... [arg-nameN=argN]') 290 return True 291 # For transaction mode, we may have just cached the action: 292 if qmpcmd is None: 293 return True 294 if self.verbose: 295 self._print(qmpcmd) 296 resp = self.cmd_obj(qmpcmd) 297 if resp is None: 298 print('Disconnected') 299 return False 300 self._print(resp) 301 return True 302 303 def connect(self, negotiate: bool = True) -> None: 304 self._greeting = super().connect(negotiate) 305 self.__completer_setup() 306 307 def show_banner(self, 308 msg: str = 'Welcome to the QMP low-level shell!') -> None: 309 print(msg) 310 if not self._greeting: 311 print('Connected') 312 return 313 version = self._greeting['QMP']['version']['qemu'] 314 print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version)) 315 316 @property 317 def prompt(self) -> str: 318 if self._transmode: 319 return 'TRANS> ' 320 return '(QEMU) ' 321 322 def read_exec_command(self) -> bool: 323 """ 324 Read and execute a command. 325 326 @return True if execution was ok, return False if disconnected. 327 """ 328 try: 329 cmdline = input(self.prompt) 330 except EOFError: 331 print() 332 return False 333 334 if cmdline == '': 335 for event in self.get_events(): 336 print(event) 337 self.clear_events() 338 return True 339 340 return self._execute_cmd(cmdline) 341 342 def repl(self) -> Iterator[None]: 343 self.show_banner() 344 while self.read_exec_command(): 345 yield 346 self.close() 347 348 349class HMPShell(QMPShell): 350 def __init__(self, address: str, 351 pretty: bool = False, verbose: bool = False): 352 super().__init__(address, pretty, verbose) 353 self.__cpu_index = 0 354 355 def __cmd_completion(self) -> None: 356 for cmd in self.__cmd_passthrough('help')['return'].split('\r\n'): 357 if cmd and cmd[0] != '[' and cmd[0] != '\t': 358 name = cmd.split()[0] # drop help text 359 if name == 'info': 360 continue 361 if name.find('|') != -1: 362 # Command in the form 'foobar|f' or 'f|foobar', take the 363 # full name 364 opt = name.split('|') 365 if len(opt[0]) == 1: 366 name = opt[1] 367 else: 368 name = opt[0] 369 self._completer.append(name) 370 self._completer.append('help ' + name) # help completion 371 372 def __info_completion(self) -> None: 373 for cmd in self.__cmd_passthrough('info')['return'].split('\r\n'): 374 if cmd: 375 self._completer.append('info ' + cmd.split()[1]) 376 377 def __other_completion(self) -> None: 378 # special cases 379 self._completer.append('help info') 380 381 def _fill_completion(self) -> None: 382 self.__cmd_completion() 383 self.__info_completion() 384 self.__other_completion() 385 386 def __cmd_passthrough(self, cmdline: str, 387 cpu_index: int = 0) -> QMPMessage: 388 return self.cmd_obj({ 389 'execute': 'human-monitor-command', 390 'arguments': { 391 'command-line': cmdline, 392 'cpu-index': cpu_index 393 } 394 }) 395 396 def _execute_cmd(self, cmdline: str) -> bool: 397 if cmdline.split()[0] == "cpu": 398 # trap the cpu command, it requires special setting 399 try: 400 idx = int(cmdline.split()[1]) 401 if 'return' not in self.__cmd_passthrough('info version', idx): 402 print('bad CPU index') 403 return True 404 self.__cpu_index = idx 405 except ValueError: 406 print('cpu command takes an integer argument') 407 return True 408 resp = self.__cmd_passthrough(cmdline, self.__cpu_index) 409 if resp is None: 410 print('Disconnected') 411 return False 412 assert 'return' in resp or 'error' in resp 413 if 'return' in resp: 414 # Success 415 if len(resp['return']) > 0: 416 print(resp['return'], end=' ') 417 else: 418 # Error 419 print('%s: %s' % (resp['error']['class'], resp['error']['desc'])) 420 return True 421 422 def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None: 423 QMPShell.show_banner(self, msg) 424 425 426def die(msg: str) -> NoReturn: 427 sys.stderr.write('ERROR: %s\n' % msg) 428 sys.exit(1) 429 430 431def main() -> None: 432 parser = argparse.ArgumentParser() 433 parser.add_argument('-H', '--hmp', action='store_true', 434 help='Use HMP interface') 435 parser.add_argument('-N', '--skip-negotiation', action='store_true', 436 help='Skip negotiate (for qemu-ga)') 437 parser.add_argument('-v', '--verbose', action='store_true', 438 help='Verbose (echo commands sent and received)') 439 parser.add_argument('-p', '--pretty', action='store_true', 440 help='Pretty-print JSON') 441 442 default_server = os.environ.get('QMP_SOCKET') 443 parser.add_argument('qmp_server', action='store', 444 default=default_server, 445 help='< UNIX socket path | TCP address:port >') 446 447 args = parser.parse_args() 448 if args.qmp_server is None: 449 parser.error("QMP socket or TCP address must be specified") 450 451 shell_class = HMPShell if args.hmp else QMPShell 452 try: 453 qemu = shell_class(args.qmp_server, args.pretty, args.verbose) 454 except qmp.QMPBadPortError: 455 parser.error(f"Bad port number: {args.qmp_server}") 456 return # pycharm doesn't know error() is noreturn 457 458 try: 459 qemu.connect(negotiate=not args.skip_negotiation) 460 except qmp.QMPConnectError: 461 die("Didn't get QMP greeting message") 462 except qmp.QMPCapabilitiesError: 463 die("Couldn't negotiate capabilities") 464 except OSError as err: 465 die(f"Couldn't connect to {args.qmp_server}: {err!s}") 466 467 for _ in qemu.repl(): 468 pass 469 470 471if __name__ == '__main__': 472 main() 473