1# 2# Copyright (C) 2009-2022 Red Hat Inc. 3# 4# Authors: 5# Luiz Capitulino <lcapitulino@redhat.com> 6# John Snow <jsnow@redhat.com> 7# 8# This work is licensed under the terms of the GNU LGPL, version 2 or 9# later. See the COPYING file in the top-level directory. 10# 11 12""" 13Low-level QEMU shell on top of QMP. 14 15usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server 16 17positional arguments: 18 qmp_server < UNIX socket path | TCP address:port > 19 20optional arguments: 21 -h, --help show this help message and exit 22 -H, --hmp Use HMP interface 23 -N, --skip-negotiation 24 Skip negotiate (for qemu-ga) 25 -v, --verbose Verbose (echo commands sent and received) 26 -p, --pretty Pretty-print JSON 27 28 29Start QEMU with: 30 31# qemu [...] -qmp unix:./qmp-sock,server 32 33Run the shell: 34 35$ qmp-shell ./qmp-sock 36 37Commands have the following format: 38 39 < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 40 41For example: 42 43(QEMU) device_add driver=e1000 id=net1 44{'return': {}} 45(QEMU) 46 47key=value pairs also support Python or JSON object literal subset notations, 48without spaces. Dictionaries/objects {} are supported as are arrays []. 49 50 example-command arg-name1={'key':'value','obj'={'prop':"value"}} 51 52Both JSON and Python formatting should work, including both styles of 53string literal quotes. Both paradigms of literal values should work, 54including null/true/false for JSON and None/True/False for Python. 55 56 57Transactions have the following multi-line format: 58 59 transaction( 60 action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ] 61 ... 62 action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ] 63 ) 64 65One line transactions are also supported: 66 67 transaction( action-name1 ... ) 68 69For example: 70 71 (QEMU) transaction( 72 TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1 73 TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0 74 TRANS> ) 75 {"return": {}} 76 (QEMU) 77 78Use the -v and -p options to activate the verbose and pretty-print options, 79which will echo back the properly formatted JSON-compliant QMP that is being 80sent to QEMU, which is useful for debugging and documentation generation. 81""" 82 83import argparse 84import ast 85import json 86import logging 87import os 88import re 89import readline 90from subprocess import Popen 91import sys 92from typing import ( 93 IO, 94 Dict, 95 Iterator, 96 List, 97 NoReturn, 98 Optional, 99 Sequence, 100 cast, 101) 102 103from qemu.qmp import ( 104 ConnectError, 105 ExecuteError, 106 QMPError, 107 SocketAddrT, 108) 109from qemu.qmp.legacy import ( 110 QEMUMonitorProtocol, 111 QMPBadPortError, 112 QMPMessage, 113 QMPObject, 114) 115 116 117LOG = logging.getLogger(__name__) 118 119 120class QMPCompleter: 121 """ 122 QMPCompleter provides a readline library tab-complete behavior. 123 """ 124 # NB: Python 3.9+ will probably allow us to subclass list[str] directly, 125 # but pylint as of today does not know that List[str] is simply 'list'. 126 def __init__(self) -> None: 127 self._matches: List[str] = [] 128 129 def append(self, value: str) -> None: 130 """Append a new valid completion to the list of possibilities.""" 131 return self._matches.append(value) 132 133 def complete(self, text: str, state: int) -> Optional[str]: 134 """readline.set_completer() callback implementation.""" 135 for cmd in self._matches: 136 if cmd.startswith(text): 137 if state == 0: 138 return cmd 139 state -= 1 140 return None 141 142 143class QMPShellError(QMPError): 144 """ 145 QMP Shell Base error class. 146 """ 147 148 149class FuzzyJSON(ast.NodeTransformer): 150 """ 151 This extension of ast.NodeTransformer filters literal "true/false/null" 152 values in a Python AST and replaces them by proper "True/False/None" values 153 that Python can properly evaluate. 154 """ 155 156 @classmethod 157 def visit_Name(cls, # pylint: disable=invalid-name 158 node: ast.Name) -> ast.AST: 159 """ 160 Transform Name nodes with certain values into Constant (keyword) nodes. 161 """ 162 if node.id == 'true': 163 return ast.Constant(value=True) 164 if node.id == 'false': 165 return ast.Constant(value=False) 166 if node.id == 'null': 167 return ast.Constant(value=None) 168 return node 169 170 171class QMPShell(QEMUMonitorProtocol): 172 """ 173 QMPShell provides a basic readline-based QMP shell. 174 175 :param address: Address of the QMP server. 176 :param pretty: Pretty-print QMP messages. 177 :param verbose: Echo outgoing QMP messages to console. 178 """ 179 def __init__(self, address: SocketAddrT, 180 pretty: bool = False, 181 verbose: bool = False, 182 server: bool = False, 183 logfile: Optional[str] = None): 184 super().__init__(address, server=server) 185 self._greeting: Optional[QMPMessage] = None 186 self._completer = QMPCompleter() 187 self._transmode = False 188 self._actions: List[QMPMessage] = [] 189 self._histfile = os.path.join(os.path.expanduser('~'), 190 '.qmp-shell_history') 191 self.pretty = pretty 192 self.verbose = verbose 193 self.logfile = None 194 195 if logfile is not None: 196 self.logfile = open(logfile, "w", encoding='utf-8') 197 198 def close(self) -> None: 199 # Hook into context manager of parent to save shell history. 200 self._save_history() 201 super().close() 202 203 def _fill_completion(self) -> None: 204 try: 205 cmds = cast(List[Dict[str, str]], self.cmd('query-commands')) 206 for cmd in cmds: 207 self._completer.append(cmd['name']) 208 except ExecuteError: 209 pass 210 211 def _completer_setup(self) -> None: 212 self._completer = QMPCompleter() 213 self._fill_completion() 214 readline.set_history_length(1024) 215 readline.set_completer(self._completer.complete) 216 readline.parse_and_bind("tab: complete") 217 # NB: default delimiters conflict with some command names 218 # (eg. query-), clearing everything as it doesn't seem to matter 219 readline.set_completer_delims('') 220 try: 221 readline.read_history_file(self._histfile) 222 except FileNotFoundError: 223 pass 224 except IOError as err: 225 msg = f"Failed to read history '{self._histfile}': {err!s}" 226 LOG.warning(msg) 227 228 def _save_history(self) -> None: 229 try: 230 readline.write_history_file(self._histfile) 231 except IOError as err: 232 msg = f"Failed to save history file '{self._histfile}': {err!s}" 233 LOG.warning(msg) 234 235 @classmethod 236 def _parse_value(cls, val: str) -> object: 237 try: 238 return int(val) 239 except ValueError: 240 pass 241 242 if val.lower() == 'true': 243 return True 244 if val.lower() == 'false': 245 return False 246 if val.startswith(('{', '[')): 247 # Try first as pure JSON: 248 try: 249 return json.loads(val) 250 except ValueError: 251 pass 252 # Try once again as FuzzyJSON: 253 try: 254 tree = ast.parse(val, mode='eval') 255 transformed = FuzzyJSON().visit(tree) 256 return ast.literal_eval(transformed) 257 except (SyntaxError, ValueError): 258 pass 259 return val 260 261 def _cli_expr(self, 262 tokens: Sequence[str], 263 parent: QMPObject) -> None: 264 for arg in tokens: 265 (key, sep, val) = arg.partition('=') 266 if sep != '=': 267 raise QMPShellError( 268 f"Expected a key=value pair, got '{arg!s}'" 269 ) 270 271 value = self._parse_value(val) 272 optpath = key.split('.') 273 curpath = [] 274 for path in optpath[:-1]: 275 curpath.append(path) 276 obj = parent.get(path, {}) 277 if not isinstance(obj, dict): 278 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 279 raise QMPShellError(msg.format('.'.join(curpath))) 280 parent[path] = obj 281 parent = obj 282 if optpath[-1] in parent: 283 if isinstance(parent[optpath[-1]], dict): 284 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 285 raise QMPShellError(msg.format('.'.join(curpath))) 286 raise QMPShellError(f'Cannot set "{key}" multiple times') 287 parent[optpath[-1]] = value 288 289 def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]: 290 """ 291 Build a QMP input object from a user provided command-line in the 292 following format: 293 294 < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 295 """ 296 argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''' 297 cmdargs = re.findall(argument_regex, cmdline) 298 qmpcmd: QMPMessage 299 300 # Transactional CLI entry: 301 if cmdargs and cmdargs[0] == 'transaction(': 302 self._transmode = True 303 self._actions = [] 304 cmdargs.pop(0) 305 306 # Transactional CLI exit: 307 if cmdargs and cmdargs[0] == ')' and self._transmode: 308 self._transmode = False 309 if len(cmdargs) > 1: 310 msg = 'Unexpected input after close of Transaction sub-shell' 311 raise QMPShellError(msg) 312 qmpcmd = { 313 'execute': 'transaction', 314 'arguments': {'actions': self._actions} 315 } 316 return qmpcmd 317 318 # No args, or no args remaining 319 if not cmdargs: 320 return None 321 322 if self._transmode: 323 # Parse and cache this Transactional Action 324 finalize = False 325 action = {'type': cmdargs[0], 'data': {}} 326 if cmdargs[-1] == ')': 327 cmdargs.pop(-1) 328 finalize = True 329 self._cli_expr(cmdargs[1:], action['data']) 330 self._actions.append(action) 331 return self._build_cmd(')') if finalize else None 332 333 # Standard command: parse and return it to be executed. 334 qmpcmd = {'execute': cmdargs[0], 'arguments': {}} 335 self._cli_expr(cmdargs[1:], qmpcmd['arguments']) 336 return qmpcmd 337 338 def _print(self, qmp_message: object, fh: IO[str] = sys.stdout) -> None: 339 jsobj = json.dumps(qmp_message, 340 indent=4 if self.pretty else None, 341 sort_keys=self.pretty) 342 print(str(jsobj), file=fh) 343 344 def _execute_cmd(self, cmdline: str) -> bool: 345 try: 346 qmpcmd = self._build_cmd(cmdline) 347 except QMPShellError as err: 348 print( 349 f"Error while parsing command line: {err!s}\n" 350 "command format: <command-name> " 351 "[arg-name1=arg1] ... [arg-nameN=argN", 352 file=sys.stderr 353 ) 354 return True 355 # For transaction mode, we may have just cached the action: 356 if qmpcmd is None: 357 return True 358 if self.verbose: 359 self._print(qmpcmd) 360 resp = self.cmd_obj(qmpcmd) 361 if resp is None: 362 print('Disconnected') 363 return False 364 self._print(resp) 365 if self.logfile is not None: 366 cmd = {**qmpcmd, **resp} 367 self._print(cmd, fh=self.logfile) 368 return True 369 370 def connect(self, negotiate: bool = True) -> None: 371 self._greeting = super().connect(negotiate) 372 self._completer_setup() 373 374 def show_banner(self, 375 msg: str = 'Welcome to the QMP low-level shell!') -> None: 376 """ 377 Print to stdio a greeting, and the QEMU version if available. 378 """ 379 print(msg) 380 if not self._greeting: 381 print('Connected') 382 return 383 version = self._greeting['QMP']['version']['qemu'] 384 print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version)) 385 386 @property 387 def prompt(self) -> str: 388 """ 389 Return the current shell prompt, including a trailing space. 390 """ 391 if self._transmode: 392 return 'TRANS> ' 393 return '(QEMU) ' 394 395 def read_exec_command(self) -> bool: 396 """ 397 Read and execute a command. 398 399 @return True if execution was ok, return False if disconnected. 400 """ 401 try: 402 cmdline = input(self.prompt) 403 except EOFError: 404 print() 405 return False 406 407 if cmdline == '': 408 for event in self.get_events(): 409 print(event) 410 return True 411 412 return self._execute_cmd(cmdline) 413 414 def repl(self) -> Iterator[None]: 415 """ 416 Return an iterator that implements the REPL. 417 """ 418 self.show_banner() 419 while self.read_exec_command(): 420 yield 421 self.close() 422 423 424class HMPShell(QMPShell): 425 """ 426 HMPShell provides a basic readline-based HMP shell, tunnelled via QMP. 427 428 :param address: Address of the QMP server. 429 :param pretty: Pretty-print QMP messages. 430 :param verbose: Echo outgoing QMP messages to console. 431 """ 432 def __init__(self, address: SocketAddrT, 433 pretty: bool = False, 434 verbose: bool = False, 435 server: bool = False, 436 logfile: Optional[str] = None): 437 super().__init__(address, pretty, verbose, server, logfile) 438 self._cpu_index = 0 439 440 def _cmd_completion(self) -> None: 441 for cmd in self._cmd_passthrough('help')['return'].split('\r\n'): 442 if cmd and cmd[0] != '[' and cmd[0] != '\t': 443 name = cmd.split()[0] # drop help text 444 if name == 'info': 445 continue 446 if name.find('|') != -1: 447 # Command in the form 'foobar|f' or 'f|foobar', take the 448 # full name 449 opt = name.split('|') 450 if len(opt[0]) == 1: 451 name = opt[1] 452 else: 453 name = opt[0] 454 self._completer.append(name) 455 self._completer.append('help ' + name) # help completion 456 457 def _info_completion(self) -> None: 458 for cmd in self._cmd_passthrough('info')['return'].split('\r\n'): 459 if cmd: 460 self._completer.append('info ' + cmd.split()[1]) 461 462 def _other_completion(self) -> None: 463 # special cases 464 self._completer.append('help info') 465 466 def _fill_completion(self) -> None: 467 self._cmd_completion() 468 self._info_completion() 469 self._other_completion() 470 471 def _cmd_passthrough(self, cmdline: str, 472 cpu_index: int = 0) -> QMPMessage: 473 return self.cmd_obj({ 474 'execute': 'human-monitor-command', 475 'arguments': { 476 'command-line': cmdline, 477 'cpu-index': cpu_index 478 } 479 }) 480 481 def _execute_cmd(self, cmdline: str) -> bool: 482 if cmdline.split()[0] == "cpu": 483 # trap the cpu command, it requires special setting 484 try: 485 idx = int(cmdline.split()[1]) 486 if 'return' not in self._cmd_passthrough('info version', idx): 487 print('bad CPU index') 488 return True 489 self._cpu_index = idx 490 except ValueError: 491 print('cpu command takes an integer argument') 492 return True 493 resp = self._cmd_passthrough(cmdline, self._cpu_index) 494 if resp is None: 495 print('Disconnected') 496 return False 497 assert 'return' in resp or 'error' in resp 498 if 'return' in resp: 499 # Success 500 if len(resp['return']) > 0: 501 print(resp['return'], end=' ') 502 else: 503 # Error 504 print('%s: %s' % (resp['error']['class'], resp['error']['desc'])) 505 return True 506 507 def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None: 508 QMPShell.show_banner(self, msg) 509 510 511def die(msg: str) -> NoReturn: 512 """Write an error to stderr, then exit with a return code of 1.""" 513 sys.stderr.write('ERROR: %s\n' % msg) 514 sys.exit(1) 515 516 517def main() -> None: 518 """ 519 qmp-shell entry point: parse command line arguments and start the REPL. 520 """ 521 parser = argparse.ArgumentParser() 522 parser.add_argument('-H', '--hmp', action='store_true', 523 help='Use HMP interface') 524 parser.add_argument('-N', '--skip-negotiation', action='store_true', 525 help='Skip negotiate (for qemu-ga)') 526 parser.add_argument('-v', '--verbose', action='store_true', 527 help='Verbose (echo commands sent and received)') 528 parser.add_argument('-p', '--pretty', action='store_true', 529 help='Pretty-print JSON') 530 parser.add_argument('-l', '--logfile', 531 help='Save log of all QMP messages to PATH') 532 533 default_server = os.environ.get('QMP_SOCKET') 534 parser.add_argument('qmp_server', action='store', 535 default=default_server, 536 help='< UNIX socket path | TCP address:port >') 537 538 args = parser.parse_args() 539 if args.qmp_server is None: 540 parser.error("QMP socket or TCP address must be specified") 541 542 shell_class = HMPShell if args.hmp else QMPShell 543 544 try: 545 address = shell_class.parse_address(args.qmp_server) 546 except QMPBadPortError: 547 parser.error(f"Bad port number: {args.qmp_server}") 548 return # pycharm doesn't know error() is noreturn 549 550 with shell_class(address, args.pretty, args.verbose, args.logfile) as qemu: 551 try: 552 qemu.connect(negotiate=not args.skip_negotiation) 553 except ConnectError as err: 554 if isinstance(err.exc, OSError): 555 die(f"Couldn't connect to {args.qmp_server}: {err!s}") 556 die(str(err)) 557 558 for _ in qemu.repl(): 559 pass 560 561 562def main_wrap() -> None: 563 """ 564 qmp-shell-wrap entry point: parse command line arguments and 565 start the REPL. 566 """ 567 parser = argparse.ArgumentParser() 568 parser.add_argument('-H', '--hmp', action='store_true', 569 help='Use HMP interface') 570 parser.add_argument('-v', '--verbose', action='store_true', 571 help='Verbose (echo commands sent and received)') 572 parser.add_argument('-p', '--pretty', action='store_true', 573 help='Pretty-print JSON') 574 parser.add_argument('-l', '--logfile', 575 help='Save log of all QMP messages to PATH') 576 577 parser.add_argument('command', nargs=argparse.REMAINDER, 578 help='QEMU command line to invoke') 579 580 args = parser.parse_args() 581 582 cmd = args.command 583 if len(cmd) != 0 and cmd[0] == '--': 584 cmd = cmd[1:] 585 if len(cmd) == 0: 586 cmd = ["qemu-system-x86_64"] 587 588 sockpath = "qmp-shell-wrap-%d" % os.getpid() 589 cmd += ["-qmp", "unix:%s" % sockpath] 590 591 shell_class = HMPShell if args.hmp else QMPShell 592 593 try: 594 address = shell_class.parse_address(sockpath) 595 except QMPBadPortError: 596 parser.error(f"Bad port number: {sockpath}") 597 return # pycharm doesn't know error() is noreturn 598 599 try: 600 with shell_class(address, args.pretty, args.verbose, 601 True, args.logfile) as qemu: 602 with Popen(cmd): 603 604 try: 605 qemu.accept() 606 except ConnectError as err: 607 if isinstance(err.exc, OSError): 608 die(f"Couldn't connect to {args.qmp_server}: {err!s}") 609 die(str(err)) 610 611 for _ in qemu.repl(): 612 pass 613 finally: 614 os.unlink(sockpath) 615 616 617if __name__ == '__main__': 618 main() 619