xref: /openbmc/qemu/scripts/qmp/qmp-shell (revision 1eab8872)
1#!/usr/bin/env python3
2#
3# Low-level QEMU shell on top of QMP.
4#
5# Copyright (C) 2009, 2010 Red Hat Inc.
6#
7# Authors:
8#  Luiz Capitulino <lcapitulino@redhat.com>
9#
10# This work is licensed under the terms of the GNU GPL, version 2.  See
11# the COPYING file in the top-level directory.
12#
13# Usage:
14#
15# Start QEMU with:
16#
17# # qemu [...] -qmp unix:./qmp-sock,server
18#
19# Run the shell:
20#
21# $ qmp-shell ./qmp-sock
22#
23# Commands have the following format:
24#
25#    < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
26#
27# For example:
28#
29# (QEMU) device_add driver=e1000 id=net1
30# {u'return': {}}
31# (QEMU)
32#
33# key=value pairs also support Python or JSON object literal subset notations,
34# without spaces. Dictionaries/objects {} are supported as are arrays [].
35#
36#    example-command arg-name1={'key':'value','obj'={'prop':"value"}}
37#
38# Both JSON and Python formatting should work, including both styles of
39# string literal quotes. Both paradigms of literal values should work,
40# including null/true/false for JSON and None/True/False for Python.
41#
42#
43# Transactions have the following multi-line format:
44#
45#    transaction(
46#    action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
47#    ...
48#    action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
49#    )
50#
51# One line transactions are also supported:
52#
53#    transaction( action-name1 ... )
54#
55# For example:
56#
57#     (QEMU) transaction(
58#     TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
59#     TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
60#     TRANS> )
61#     {"return": {}}
62#     (QEMU)
63#
64# Use the -v and -p options to activate the verbose and pretty-print options,
65# which will echo back the properly formatted JSON-compliant QMP that is being
66# sent to QEMU, which is useful for debugging and documentation generation.
67import argparse
68import ast
69import atexit
70import json
71import os
72import re
73import readline
74import sys
75from typing import (
76    Iterator,
77    List,
78    NoReturn,
79    Optional,
80    Sequence,
81)
82
83
84sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
85from qemu import qmp
86from qemu.qmp import QMPMessage
87
88
89class QMPCompleter:
90    # NB: Python 3.9+ will probably allow us to subclass list[str] directly,
91    # but pylint as of today does not know that List[str] is simply 'list'.
92    def __init__(self) -> None:
93        self._matches: List[str] = []
94
95    def append(self, value: str) -> None:
96        return self._matches.append(value)
97
98    def complete(self, text: str, state: int) -> Optional[str]:
99        for cmd in self._matches:
100            if cmd.startswith(text):
101                if state == 0:
102                    return cmd
103                state -= 1
104        return None
105
106
107class QMPShellError(Exception):
108    pass
109
110
111class FuzzyJSON(ast.NodeTransformer):
112    """
113    This extension of ast.NodeTransformer filters literal "true/false/null"
114    values in a Python AST and replaces them by proper "True/False/None" values
115    that Python can properly evaluate.
116    """
117
118    @classmethod
119    def visit_Name(cls,  # pylint: disable=invalid-name
120                   node: ast.Name) -> ast.AST:
121        if node.id == 'true':
122            return ast.Constant(value=True)
123        if node.id == 'false':
124            return ast.Constant(value=False)
125        if node.id == 'null':
126            return ast.Constant(value=None)
127        return node
128
129
130# TODO: QMPShell's interface is a bit ugly (eg. _fill_completion() and
131#       _execute_cmd()). Let's design a better one.
132class QMPShell(qmp.QEMUMonitorProtocol):
133    def __init__(self, address: str, pretty: bool = False,
134                 verbose: bool = False):
135        super().__init__(self.parse_address(address))
136        self._greeting: Optional[QMPMessage] = None
137        self._completer = QMPCompleter()
138        self._pretty = pretty
139        self._transmode = False
140        self._actions: List[QMPMessage] = []
141        self._histfile = os.path.join(os.path.expanduser('~'),
142                                      '.qmp-shell_history')
143        self.verbose = verbose
144
145    def _fill_completion(self) -> None:
146        cmds = self.cmd('query-commands')
147        if 'error' in cmds:
148            return
149        for cmd in cmds['return']:
150            self._completer.append(cmd['name'])
151
152    def __completer_setup(self) -> None:
153        self._completer = QMPCompleter()
154        self._fill_completion()
155        readline.set_history_length(1024)
156        readline.set_completer(self._completer.complete)
157        readline.parse_and_bind("tab: complete")
158        # NB: default delimiters conflict with some command names
159        # (eg. query-), clearing everything as it doesn't seem to matter
160        readline.set_completer_delims('')
161        try:
162            readline.read_history_file(self._histfile)
163        except FileNotFoundError:
164            pass
165        except IOError as err:
166            print(f"Failed to read history '{self._histfile}': {err!s}")
167        atexit.register(self.__save_history)
168
169    def __save_history(self) -> None:
170        try:
171            readline.write_history_file(self._histfile)
172        except IOError as err:
173            print(f"Failed to save history file '{self._histfile}': {err!s}")
174
175    @classmethod
176    def __parse_value(cls, val: str) -> object:
177        try:
178            return int(val)
179        except ValueError:
180            pass
181
182        if val.lower() == 'true':
183            return True
184        if val.lower() == 'false':
185            return False
186        if val.startswith(('{', '[')):
187            # Try first as pure JSON:
188            try:
189                return json.loads(val)
190            except ValueError:
191                pass
192            # Try once again as FuzzyJSON:
193            try:
194                tree = ast.parse(val, mode='eval')
195                transformed = FuzzyJSON().visit(tree)
196                return ast.literal_eval(transformed)
197            except (SyntaxError, ValueError):
198                pass
199        return val
200
201    def __cli_expr(self,
202                   tokens: Sequence[str],
203                   parent: qmp.QMPObject) -> None:
204        for arg in tokens:
205            (key, sep, val) = arg.partition('=')
206            if sep != '=':
207                raise QMPShellError(
208                    f"Expected a key=value pair, got '{arg!s}'"
209                )
210
211            value = self.__parse_value(val)
212            optpath = key.split('.')
213            curpath = []
214            for path in optpath[:-1]:
215                curpath.append(path)
216                obj = parent.get(path, {})
217                if not isinstance(obj, dict):
218                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
219                    raise QMPShellError(msg.format('.'.join(curpath)))
220                parent[path] = obj
221                parent = obj
222            if optpath[-1] in parent:
223                if isinstance(parent[optpath[-1]], dict):
224                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
225                    raise QMPShellError(msg.format('.'.join(curpath)))
226                raise QMPShellError(f'Cannot set "{key}" multiple times')
227            parent[optpath[-1]] = value
228
229    def __build_cmd(self, cmdline: str) -> Optional[QMPMessage]:
230        """
231        Build a QMP input object from a user provided command-line in the
232        following format:
233
234            < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
235        """
236        argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
237        cmdargs = re.findall(argument_regex, cmdline)
238        qmpcmd: QMPMessage
239
240        # Transactional CLI entry/exit:
241        if cmdargs[0] == 'transaction(':
242            self._transmode = True
243            cmdargs.pop(0)
244        elif cmdargs[0] == ')' and self._transmode:
245            self._transmode = False
246            if len(cmdargs) > 1:
247                msg = 'Unexpected input after close of Transaction sub-shell'
248                raise QMPShellError(msg)
249            qmpcmd = {
250                'execute': 'transaction',
251                'arguments': {'actions': self._actions}
252            }
253            self._actions = list()
254            return qmpcmd
255
256        # Nothing to process?
257        if not cmdargs:
258            return None
259
260        # Parse and then cache this Transactional Action
261        if self._transmode:
262            finalize = False
263            action = {'type': cmdargs[0], 'data': {}}
264            if cmdargs[-1] == ')':
265                cmdargs.pop(-1)
266                finalize = True
267            self.__cli_expr(cmdargs[1:], action['data'])
268            self._actions.append(action)
269            return self.__build_cmd(')') if finalize else None
270
271        # Standard command: parse and return it to be executed.
272        qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
273        self.__cli_expr(cmdargs[1:], qmpcmd['arguments'])
274        return qmpcmd
275
276    def _print(self, qmp_message: object) -> None:
277        indent = None
278        if self._pretty:
279            indent = 4
280        jsobj = json.dumps(qmp_message, indent=indent, sort_keys=self._pretty)
281        print(str(jsobj))
282
283    def _execute_cmd(self, cmdline: str) -> bool:
284        try:
285            qmpcmd = self.__build_cmd(cmdline)
286        except Exception as err:
287            print('Error while parsing command line: %s' % err)
288            print('command format: <command-name> ', end=' ')
289            print('[arg-name1=arg1] ... [arg-nameN=argN]')
290            return True
291        # For transaction mode, we may have just cached the action:
292        if qmpcmd is None:
293            return True
294        if self.verbose:
295            self._print(qmpcmd)
296        resp = self.cmd_obj(qmpcmd)
297        if resp is None:
298            print('Disconnected')
299            return False
300        self._print(resp)
301        return True
302
303    def connect(self, negotiate: bool = True) -> None:
304        self._greeting = super().connect(negotiate)
305        self.__completer_setup()
306
307    def show_banner(self,
308                    msg: str = 'Welcome to the QMP low-level shell!') -> None:
309        print(msg)
310        if not self._greeting:
311            print('Connected')
312            return
313        version = self._greeting['QMP']['version']['qemu']
314        print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
315
316    @property
317    def prompt(self) -> str:
318        if self._transmode:
319            return 'TRANS> '
320        return '(QEMU) '
321
322    def read_exec_command(self) -> bool:
323        """
324        Read and execute a command.
325
326        @return True if execution was ok, return False if disconnected.
327        """
328        try:
329            cmdline = input(self.prompt)
330        except EOFError:
331            print()
332            return False
333
334        if cmdline == '':
335            for event in self.get_events():
336                print(event)
337            self.clear_events()
338            return True
339
340        return self._execute_cmd(cmdline)
341
342    def repl(self) -> Iterator[None]:
343        self.show_banner()
344        while self.read_exec_command():
345            yield
346        self.close()
347
348
349class HMPShell(QMPShell):
350    def __init__(self, address: str,
351                 pretty: bool = False, verbose: bool = False):
352        super().__init__(address, pretty, verbose)
353        self.__cpu_index = 0
354
355    def __cmd_completion(self) -> None:
356        for cmd in self.__cmd_passthrough('help')['return'].split('\r\n'):
357            if cmd and cmd[0] != '[' and cmd[0] != '\t':
358                name = cmd.split()[0]  # drop help text
359                if name == 'info':
360                    continue
361                if name.find('|') != -1:
362                    # Command in the form 'foobar|f' or 'f|foobar', take the
363                    # full name
364                    opt = name.split('|')
365                    if len(opt[0]) == 1:
366                        name = opt[1]
367                    else:
368                        name = opt[0]
369                self._completer.append(name)
370                self._completer.append('help ' + name)  # help completion
371
372    def __info_completion(self) -> None:
373        for cmd in self.__cmd_passthrough('info')['return'].split('\r\n'):
374            if cmd:
375                self._completer.append('info ' + cmd.split()[1])
376
377    def __other_completion(self) -> None:
378        # special cases
379        self._completer.append('help info')
380
381    def _fill_completion(self) -> None:
382        self.__cmd_completion()
383        self.__info_completion()
384        self.__other_completion()
385
386    def __cmd_passthrough(self, cmdline: str,
387                          cpu_index: int = 0) -> QMPMessage:
388        return self.cmd_obj({
389            'execute': 'human-monitor-command',
390            'arguments': {
391                'command-line': cmdline,
392                'cpu-index': cpu_index
393            }
394        })
395
396    def _execute_cmd(self, cmdline: str) -> bool:
397        if cmdline.split()[0] == "cpu":
398            # trap the cpu command, it requires special setting
399            try:
400                idx = int(cmdline.split()[1])
401                if 'return' not in self.__cmd_passthrough('info version', idx):
402                    print('bad CPU index')
403                    return True
404                self.__cpu_index = idx
405            except ValueError:
406                print('cpu command takes an integer argument')
407                return True
408        resp = self.__cmd_passthrough(cmdline, self.__cpu_index)
409        if resp is None:
410            print('Disconnected')
411            return False
412        assert 'return' in resp or 'error' in resp
413        if 'return' in resp:
414            # Success
415            if len(resp['return']) > 0:
416                print(resp['return'], end=' ')
417        else:
418            # Error
419            print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
420        return True
421
422    def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None:
423        QMPShell.show_banner(self, msg)
424
425
426def die(msg: str) -> NoReturn:
427    sys.stderr.write('ERROR: %s\n' % msg)
428    sys.exit(1)
429
430
431def main() -> None:
432    parser = argparse.ArgumentParser()
433    parser.add_argument('-H', '--hmp', action='store_true',
434                        help='Use HMP interface')
435    parser.add_argument('-N', '--skip-negotiation', action='store_true',
436                        help='Skip negotiate (for qemu-ga)')
437    parser.add_argument('-v', '--verbose', action='store_true',
438                        help='Verbose (echo commands sent and received)')
439    parser.add_argument('-p', '--pretty', action='store_true',
440                        help='Pretty-print JSON')
441
442    default_server = os.environ.get('QMP_SOCKET')
443    parser.add_argument('qmp_server', action='store',
444                        default=default_server,
445                        help='< UNIX socket path | TCP address:port >')
446
447    args = parser.parse_args()
448    if args.qmp_server is None:
449        parser.error("QMP socket or TCP address must be specified")
450
451    shell_class = HMPShell if args.hmp else QMPShell
452    try:
453        qemu = shell_class(args.qmp_server, args.pretty, args.verbose)
454    except qmp.QMPBadPortError:
455        parser.error(f"Bad port number: {args.qmp_server}")
456        return  # pycharm doesn't know error() is noreturn
457
458    try:
459        qemu.connect(negotiate=not args.skip_negotiation)
460    except qmp.QMPConnectError:
461        die("Didn't get QMP greeting message")
462    except qmp.QMPCapabilitiesError:
463        die("Couldn't negotiate capabilities")
464    except OSError as err:
465        die(f"Couldn't connect to {args.qmp_server}: {err!s}")
466
467    for _ in qemu.repl():
468        pass
469
470
471if __name__ == '__main__':
472    main()
473