xref: /openbmc/qemu/python/qemu/qmp/qmp_shell.py (revision 41511ed734dbf32f3c42ece60db0b86e081de4d2)
1#
2# Copyright (C) 2009-2022 Red Hat Inc.
3#
4# Authors:
5#  Luiz Capitulino <lcapitulino@redhat.com>
6#  John Snow <jsnow@redhat.com>
7#
8# This work is licensed under the terms of the GNU LGPL, version 2 or
9# later. See the COPYING file in the top-level directory.
10#
11
12"""
13qmp-shell - An interactive QEMU shell powered by QMP
14
15qmp-shell offers a simple shell with a convenient shorthand syntax as an
16alternative to typing JSON by hand. This syntax is not standardized and
17is not meant to be used as a scriptable interface. This shorthand *may*
18change incompatibly in the future, and it is strongly encouraged to use
19the QMP library to provide API-stable scripting when needed.
20
21usage: qmp-shell [-h] [-H] [-v] [-p] [-l LOGFILE] [-N] qmp_server
22
23positional arguments:
24  qmp_server            < UNIX socket path | TCP address:port >
25
26optional arguments:
27  -h, --help            show this help message and exit
28  -H, --hmp             Use HMP interface
29  -v, --verbose         Verbose (echo commands sent and received)
30  -p, --pretty          Pretty-print JSON
31  -l LOGFILE, --logfile LOGFILE
32                        Save log of all QMP messages to PATH
33  -N, --skip-negotiation
34                        Skip negotiate (for qemu-ga)
35
36Usage
37-----
38
39First, start QEMU with::
40
41    > qemu [...] -qmp unix:./qmp-sock,server=on[,wait=off]
42
43Then run the shell, passing the address of the socket::
44
45    > qmp-shell ./qmp-sock
46
47Syntax
48------
49
50Commands have the following format::
51
52    < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
53
54For example, to add a network device::
55
56    (QEMU) device_add driver=e1000 id=net1
57    {'return': {}}
58    (QEMU)
59
60key=value pairs support either Python or JSON object literal notations,
61**without spaces**. Dictionaries/objects ``{}`` are supported, as are
62arrays ``[]``::
63
64    example-command arg-name1={'key':'value','obj'={'prop':"value"}}
65
66Either JSON or Python formatting for compound values works, including
67both styles of string literal quotes (either single or double
68quotes). Both paradigms of literal values are accepted, including
69``null/true/false`` for JSON and ``None/True/False`` for Python.
70
71Transactions
72------------
73
74Transactions have the following multi-line format::
75
76   transaction(
77   action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
78   ...
79   action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
80   )
81
82One line transactions are also supported::
83
84   transaction( action-name1 ... )
85
86For example::
87
88    (QEMU) transaction(
89    TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
90    TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
91    TRANS> )
92    {"return": {}}
93    (QEMU)
94
95Commands
96--------
97
98Autocomplete of command names using <tab> is supported. Pressing <tab>
99at a blank CLI prompt will show you a list of all available commands
100that the connected QEMU instance supports.
101
102For documentation on QMP commands and their arguments, please see
103`qmp ref`.
104
105Events
106------
107
108qmp-shell will display events received from the server, but this version
109does not do so asynchronously. To check for new events from the server,
110press <enter> on a blank line::
111
112    (QEMU) ⏎
113    {'timestamp': {'seconds': 1660071944, 'microseconds': 184667},
114     'event': 'STOP'}
115
116Display options
117---------------
118
119Use the -v and -p options to activate the verbose and pretty-print
120options, which will echo back the properly formatted JSON-compliant QMP
121that is being sent to QEMU. This is useful for debugging to see the
122wire-level QMP data being exchanged, and generating output for use in
123writing documentation for QEMU.
124"""
125
126import argparse
127import ast
128import json
129import logging
130import os
131import re
132import readline
133from subprocess import Popen
134import sys
135from typing import (
136    IO,
137    Dict,
138    Iterator,
139    List,
140    NoReturn,
141    Optional,
142    Sequence,
143    cast,
144)
145
146from qemu.qmp import (
147    ConnectError,
148    ExecuteError,
149    QMPError,
150    SocketAddrT,
151)
152from qemu.qmp.legacy import (
153    QEMUMonitorProtocol,
154    QMPBadPortError,
155    QMPMessage,
156    QMPObject,
157)
158
159
160LOG = logging.getLogger(__name__)
161
162
163class QMPCompleter:
164    """
165    QMPCompleter provides a readline library tab-complete behavior.
166    """
167    # NB: Python 3.9+ will probably allow us to subclass list[str] directly,
168    # but pylint as of today does not know that List[str] is simply 'list'.
169    def __init__(self) -> None:
170        self._matches: List[str] = []
171
172    def append(self, value: str) -> None:
173        """Append a new valid completion to the list of possibilities."""
174        return self._matches.append(value)
175
176    def complete(self, text: str, state: int) -> Optional[str]:
177        """readline.set_completer() callback implementation."""
178        for cmd in self._matches:
179            if cmd.startswith(text):
180                if state == 0:
181                    return cmd
182                state -= 1
183        return None
184
185
186class QMPShellError(QMPError):
187    """
188    QMP Shell Base error class.
189    """
190
191
192class FuzzyJSON(ast.NodeTransformer):
193    """
194    This extension of ast.NodeTransformer filters literal "true/false/null"
195    values in a Python AST and replaces them by proper "True/False/None" values
196    that Python can properly evaluate.
197    """
198
199    @classmethod
200    def visit_Name(cls,  # pylint: disable=invalid-name
201                   node: ast.Name) -> ast.AST:
202        """
203        Transform Name nodes with certain values into Constant (keyword) nodes.
204        """
205        if node.id == 'true':
206            return ast.Constant(value=True)
207        if node.id == 'false':
208            return ast.Constant(value=False)
209        if node.id == 'null':
210            return ast.Constant(value=None)
211        return node
212
213
214class QMPShell(QEMUMonitorProtocol):
215    """
216    QMPShell provides a basic readline-based QMP shell.
217
218    :param address: Address of the QMP server.
219    :param pretty: Pretty-print QMP messages.
220    :param verbose: Echo outgoing QMP messages to console.
221    """
222    def __init__(self, address: SocketAddrT,
223                 pretty: bool = False,
224                 verbose: bool = False,
225                 server: bool = False,
226                 logfile: Optional[str] = None):
227        super().__init__(address, server=server)
228        self._greeting: Optional[QMPMessage] = None
229        self._completer = QMPCompleter()
230        self._transmode = False
231        self._actions: List[QMPMessage] = []
232        self._histfile = os.path.join(os.path.expanduser('~'),
233                                      '.qmp-shell_history')
234        self.pretty = pretty
235        self.verbose = verbose
236        self.logfile = None
237
238        if logfile is not None:
239            self.logfile = open(logfile, "w", encoding='utf-8')
240
241    def close(self) -> None:
242        # Hook into context manager of parent to save shell history.
243        self._save_history()
244        super().close()
245
246    def _fill_completion(self) -> None:
247        try:
248            cmds = cast(List[Dict[str, str]], self.cmd('query-commands'))
249            for cmd in cmds:
250                self._completer.append(cmd['name'])
251        except ExecuteError:
252            pass
253
254    def _completer_setup(self) -> None:
255        self._completer = QMPCompleter()
256        self._fill_completion()
257        readline.set_history_length(1024)
258        readline.set_completer(self._completer.complete)
259        readline.parse_and_bind("tab: complete")
260        # NB: default delimiters conflict with some command names
261        # (eg. query-), clearing everything as it doesn't seem to matter
262        readline.set_completer_delims('')
263        try:
264            readline.read_history_file(self._histfile)
265        except FileNotFoundError:
266            pass
267        except IOError as err:
268            msg = f"Failed to read history '{self._histfile}': {err!s}"
269            LOG.warning(msg)
270
271    def _save_history(self) -> None:
272        try:
273            readline.write_history_file(self._histfile)
274        except IOError as err:
275            msg = f"Failed to save history file '{self._histfile}': {err!s}"
276            LOG.warning(msg)
277
278    @classmethod
279    def _parse_value(cls, val: str) -> object:
280        try:
281            return int(val)
282        except ValueError:
283            pass
284
285        if val.lower() == 'true':
286            return True
287        if val.lower() == 'false':
288            return False
289        if val.startswith(('{', '[')):
290            # Try first as pure JSON:
291            try:
292                return json.loads(val)
293            except ValueError:
294                pass
295            # Try once again as FuzzyJSON:
296            try:
297                tree = ast.parse(val, mode='eval')
298                transformed = FuzzyJSON().visit(tree)
299                return ast.literal_eval(transformed)
300            except (SyntaxError, ValueError):
301                pass
302        return val
303
304    def _cli_expr(self,
305                  tokens: Sequence[str],
306                  parent: QMPObject) -> None:
307        for arg in tokens:
308            (key, sep, val) = arg.partition('=')
309            if sep != '=':
310                raise QMPShellError(
311                    f"Expected a key=value pair, got '{arg!s}'"
312                )
313
314            value = self._parse_value(val)
315            optpath = key.split('.')
316            curpath = []
317            for path in optpath[:-1]:
318                curpath.append(path)
319                obj = parent.get(path, {})
320                if not isinstance(obj, dict):
321                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
322                    raise QMPShellError(msg.format('.'.join(curpath)))
323                parent[path] = obj
324                parent = obj
325            if optpath[-1] in parent:
326                if isinstance(parent[optpath[-1]], dict):
327                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
328                    raise QMPShellError(msg.format('.'.join(curpath)))
329                raise QMPShellError(f'Cannot set "{key}" multiple times')
330            parent[optpath[-1]] = value
331
332    def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]:
333        """
334        Build a QMP input object from a user provided command-line in the
335        following format:
336
337            < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
338        """
339        argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
340        cmdargs = re.findall(argument_regex, cmdline)
341        qmpcmd: QMPMessage
342
343        # Transactional CLI entry:
344        if cmdargs and cmdargs[0] == 'transaction(':
345            self._transmode = True
346            self._actions = []
347            cmdargs.pop(0)
348
349        # Transactional CLI exit:
350        if cmdargs and cmdargs[0] == ')' and self._transmode:
351            self._transmode = False
352            if len(cmdargs) > 1:
353                msg = 'Unexpected input after close of Transaction sub-shell'
354                raise QMPShellError(msg)
355            qmpcmd = {
356                'execute': 'transaction',
357                'arguments': {'actions': self._actions}
358            }
359            return qmpcmd
360
361        # No args, or no args remaining
362        if not cmdargs:
363            return None
364
365        if self._transmode:
366            # Parse and cache this Transactional Action
367            finalize = False
368            action = {'type': cmdargs[0], 'data': {}}
369            if cmdargs[-1] == ')':
370                cmdargs.pop(-1)
371                finalize = True
372            self._cli_expr(cmdargs[1:], action['data'])
373            self._actions.append(action)
374            return self._build_cmd(')') if finalize else None
375
376        # Standard command: parse and return it to be executed.
377        qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
378        self._cli_expr(cmdargs[1:], qmpcmd['arguments'])
379        return qmpcmd
380
381    def _print(self, qmp_message: object, fh: IO[str] = sys.stdout) -> None:
382        jsobj = json.dumps(qmp_message,
383                           indent=4 if self.pretty else None,
384                           sort_keys=self.pretty)
385        print(str(jsobj), file=fh)
386
387    def _execute_cmd(self, cmdline: str) -> bool:
388        try:
389            qmpcmd = self._build_cmd(cmdline)
390        except QMPShellError as err:
391            print(
392                f"Error while parsing command line: {err!s}\n"
393                "command format: <command-name> "
394                "[arg-name1=arg1] ... [arg-nameN=argN",
395                file=sys.stderr
396            )
397            return True
398        # For transaction mode, we may have just cached the action:
399        if qmpcmd is None:
400            return True
401        if self.verbose:
402            self._print(qmpcmd)
403        resp = self.cmd_obj(qmpcmd)
404        if resp is None:
405            print('Disconnected')
406            return False
407        self._print(resp)
408        if self.logfile is not None:
409            cmd = {**qmpcmd, **resp}
410            self._print(cmd, fh=self.logfile)
411        return True
412
413    def connect(self, negotiate: bool = True) -> None:
414        self._greeting = super().connect(negotiate)
415        self._completer_setup()
416
417    def show_banner(self,
418                    msg: str = 'Welcome to the QMP low-level shell!') -> None:
419        """
420        Print to stdio a greeting, and the QEMU version if available.
421        """
422        print(msg)
423        if not self._greeting:
424            print('Connected')
425            return
426        version = self._greeting['QMP']['version']['qemu']
427        print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
428
429    @property
430    def prompt(self) -> str:
431        """
432        Return the current shell prompt, including a trailing space.
433        """
434        if self._transmode:
435            return 'TRANS> '
436        return '(QEMU) '
437
438    def read_exec_command(self) -> bool:
439        """
440        Read and execute a command.
441
442        @return True if execution was ok, return False if disconnected.
443        """
444        try:
445            cmdline = input(self.prompt)
446        except EOFError:
447            print()
448            return False
449
450        if cmdline == '':
451            for event in self.get_events():
452                print(event)
453            return True
454
455        return self._execute_cmd(cmdline)
456
457    def repl(self) -> Iterator[None]:
458        """
459        Return an iterator that implements the REPL.
460        """
461        self.show_banner()
462        while self.read_exec_command():
463            yield
464        self.close()
465
466
467class HMPShell(QMPShell):
468    """
469    HMPShell provides a basic readline-based HMP shell, tunnelled via QMP.
470
471    :param address: Address of the QMP server.
472    :param pretty: Pretty-print QMP messages.
473    :param verbose: Echo outgoing QMP messages to console.
474    """
475    def __init__(self, address: SocketAddrT,
476                 pretty: bool = False,
477                 verbose: bool = False,
478                 server: bool = False,
479                 logfile: Optional[str] = None):
480        super().__init__(address, pretty, verbose, server, logfile)
481        self._cpu_index = 0
482
483    def _cmd_completion(self) -> None:
484        for cmd in self._cmd_passthrough('help')['return'].split('\r\n'):
485            if cmd and cmd[0] != '[' and cmd[0] != '\t':
486                name = cmd.split()[0]  # drop help text
487                if name == 'info':
488                    continue
489                if name.find('|') != -1:
490                    # Command in the form 'foobar|f' or 'f|foobar', take the
491                    # full name
492                    opt = name.split('|')
493                    if len(opt[0]) == 1:
494                        name = opt[1]
495                    else:
496                        name = opt[0]
497                self._completer.append(name)
498                self._completer.append('help ' + name)  # help completion
499
500    def _info_completion(self) -> None:
501        for cmd in self._cmd_passthrough('info')['return'].split('\r\n'):
502            if cmd:
503                self._completer.append('info ' + cmd.split()[1])
504
505    def _other_completion(self) -> None:
506        # special cases
507        self._completer.append('help info')
508
509    def _fill_completion(self) -> None:
510        self._cmd_completion()
511        self._info_completion()
512        self._other_completion()
513
514    def _cmd_passthrough(self, cmdline: str,
515                         cpu_index: int = 0) -> QMPMessage:
516        return self.cmd_obj({
517            'execute': 'human-monitor-command',
518            'arguments': {
519                'command-line': cmdline,
520                'cpu-index': cpu_index
521            }
522        })
523
524    def _execute_cmd(self, cmdline: str) -> bool:
525        if cmdline.split()[0] == "cpu":
526            # trap the cpu command, it requires special setting
527            try:
528                idx = int(cmdline.split()[1])
529                if 'return' not in self._cmd_passthrough('info version', idx):
530                    print('bad CPU index')
531                    return True
532                self._cpu_index = idx
533            except ValueError:
534                print('cpu command takes an integer argument')
535                return True
536        resp = self._cmd_passthrough(cmdline, self._cpu_index)
537        if resp is None:
538            print('Disconnected')
539            return False
540        assert 'return' in resp or 'error' in resp
541        if 'return' in resp:
542            # Success
543            if len(resp['return']) > 0:
544                print(resp['return'], end=' ')
545        else:
546            # Error
547            print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
548        return True
549
550    def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None:
551        QMPShell.show_banner(self, msg)
552
553
554def die(msg: str) -> NoReturn:
555    """Write an error to stderr, then exit with a return code of 1."""
556    sys.stderr.write('ERROR: %s\n' % msg)
557    sys.exit(1)
558
559
560def common_parser() -> argparse.ArgumentParser:
561    """Build common parsing options used by qmp-shell and qmp-shell-wrap."""
562    parser = argparse.ArgumentParser()
563    parser.add_argument('-H', '--hmp', action='store_true',
564                        help='Use HMP interface')
565    parser.add_argument('-v', '--verbose', action='store_true',
566                        help='Verbose (echo commands sent and received)')
567    parser.add_argument('-p', '--pretty', action='store_true',
568                        help='Pretty-print JSON')
569    parser.add_argument('-l', '--logfile',
570                        help='Save log of all QMP messages to PATH')
571    # NOTE: When changing arguments, update both this module docstring
572    # and the manpage synopsis in docs/man/qmp_shell.rst.
573    return parser
574
575
576def main() -> None:
577    """
578    qmp-shell entry point: parse command line arguments and start the REPL.
579    """
580    parser = common_parser()
581    parser.add_argument('-N', '--skip-negotiation', action='store_true',
582                        help='Skip negotiate (for qemu-ga)')
583
584    default_server = os.environ.get('QMP_SOCKET')
585    parser.add_argument('qmp_server', action='store',
586                        default=default_server,
587                        help='< UNIX socket path | TCP address:port >')
588
589    args = parser.parse_args()
590    if args.qmp_server is None:
591        parser.error("QMP socket or TCP address must be specified")
592
593    shell_class = HMPShell if args.hmp else QMPShell
594
595    try:
596        address = shell_class.parse_address(args.qmp_server)
597    except QMPBadPortError:
598        parser.error(f"Bad port number: {args.qmp_server}")
599        return  # pycharm doesn't know error() is noreturn
600
601    with shell_class(address, args.pretty, args.verbose, args.logfile) as qemu:
602        try:
603            qemu.connect(negotiate=not args.skip_negotiation)
604        except ConnectError as err:
605            if isinstance(err.exc, OSError):
606                die(f"Couldn't connect to {args.qmp_server}: {err!s}")
607            die(str(err))
608
609        for _ in qemu.repl():
610            pass
611
612
613def main_wrap() -> None:
614    """
615    qmp-shell-wrap - QEMU + qmp-shell launcher utility
616
617    Launch QEMU and connect to it with `qmp-shell` in a single command.
618    CLI arguments will be forwarded to qemu, with additional arguments
619    added to allow `qmp-shell` to then connect to the recently launched
620    QEMU instance.
621
622    usage: qmp-shell-wrap [-h] [-H] [-v] [-p] [-l LOGFILE] ...
623
624    positional arguments:
625      command               QEMU command line to invoke
626
627    optional arguments:
628      -h, --help            show this help message and exit
629      -H, --hmp             Use HMP interface
630      -v, --verbose         Verbose (echo commands sent and received)
631      -p, --pretty          Pretty-print JSON
632      -l LOGFILE, --logfile LOGFILE
633                            Save log of all QMP messages to PATH
634
635    Usage
636    -----
637
638    Prepend "qmp-shell-wrap" to your usual QEMU command line::
639
640        > qmp-shell-wrap qemu-system-x86_64 -M q35 -m 4096 -display none
641        Welcome to the QMP low-level shell!
642        Connected
643        (QEMU)
644    """
645    parser = common_parser()
646    parser.add_argument('command', nargs=argparse.REMAINDER,
647                        help='QEMU command line to invoke')
648
649    args = parser.parse_args()
650
651    cmd = args.command
652    if len(cmd) != 0 and cmd[0] == '--':
653        cmd = cmd[1:]
654    if len(cmd) == 0:
655        cmd = ["qemu-system-x86_64"]
656
657    sockpath = "qmp-shell-wrap-%d" % os.getpid()
658    cmd += ["-qmp", "unix:%s" % sockpath]
659
660    shell_class = HMPShell if args.hmp else QMPShell
661
662    try:
663        address = shell_class.parse_address(sockpath)
664    except QMPBadPortError:
665        parser.error(f"Bad port number: {sockpath}")
666        return  # pycharm doesn't know error() is noreturn
667
668    try:
669        with shell_class(address, args.pretty, args.verbose,
670                         True, args.logfile) as qemu:
671            with Popen(cmd):
672
673                try:
674                    qemu.accept()
675                except ConnectError as err:
676                    if isinstance(err.exc, OSError):
677                        die(f"Couldn't connect to {args.qmp_server}: {err!s}")
678                    die(str(err))
679
680                for _ in qemu.repl():
681                    pass
682    except FileNotFoundError:
683        sys.stderr.write(f"ERROR: QEMU executable '{cmd[0]}' not found.\n")
684    finally:
685        os.unlink(sockpath)
686
687
688if __name__ == '__main__':
689    main()
690