xref: /openbmc/qemu/python/qemu/qmp/legacy.py (revision 800485762e6564e04e2ab315132d477069562d91)
1"""
2(Legacy) Sync QMP Wrapper
3
4This module provides the `QEMUMonitorProtocol` class, which is a
5synchronous wrapper around `QMPClient`.
6
7Its design closely resembles that of the original QEMUMonitorProtocol
8class, originally written by Luiz Capitulino. It is provided here for
9compatibility with scripts inside the QEMU source tree that expect the
10old interface.
11"""
12
13#
14# Copyright (C) 2009-2022 Red Hat Inc.
15#
16# Authors:
17#  Luiz Capitulino <lcapitulino@redhat.com>
18#  John Snow <jsnow@redhat.com>
19#
20# This work is licensed under the terms of the GNU GPL, version 2.  See
21# the COPYING file in the top-level directory.
22#
23
24import asyncio
25import socket
26from types import TracebackType
27from typing import (
28    Any,
29    Awaitable,
30    Dict,
31    List,
32    Optional,
33    Type,
34    TypeVar,
35    Union,
36)
37
38from .error import QMPError
39from .protocol import Runstate, SocketAddrT
40from .qmp_client import QMPClient
41
42
43#: QMPMessage is an entire QMP message of any kind.
44QMPMessage = Dict[str, Any]
45
46#: QMPReturnValue is the 'return' value of a command.
47QMPReturnValue = object
48
49#: QMPObject is any object in a QMP message.
50QMPObject = Dict[str, object]
51
52# QMPMessage can be outgoing commands or incoming events/returns.
53# QMPReturnValue is usually a dict/json object, but due to QAPI's
54# 'command-returns-exceptions', it can actually be anything.
55#
56# {'return': {}} is a QMPMessage,
57# {} is the QMPReturnValue.
58
59
60class QMPBadPortError(QMPError):
61    """
62    Unable to parse socket address: Port was non-numerical.
63    """
64
65
66class QEMUMonitorProtocol:
67    """
68    Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP)
69    and then allow to handle commands and events.
70
71    :param address:  QEMU address, can be a unix socket path (string), a tuple
72                     in the form ( address, port ) for a TCP connection, or an
73                     existing `socket.socket` object.
74    :param server:   Act as the socket server. (See 'accept')
75                     Not applicable when passing a socket directly.
76    :param nickname: Optional nickname used for logging.
77    """
78
79    def __init__(self,
80                 address: Union[SocketAddrT, socket.socket],
81                 server: bool = False,
82                 nickname: Optional[str] = None):
83
84        if server and isinstance(address, socket.socket):
85            raise ValueError(
86                "server argument should be False when passing a socket")
87
88        self._qmp = QMPClient(nickname)
89        self._aloop = asyncio.get_event_loop()
90        self._address = address
91        self._timeout: Optional[float] = None
92
93        if server:
94            assert not isinstance(self._address, socket.socket)
95            self._sync(self._qmp.start_server(self._address))
96
97    _T = TypeVar('_T')
98
99    def _sync(
100            self, future: Awaitable[_T], timeout: Optional[float] = None
101    ) -> _T:
102        return self._aloop.run_until_complete(
103            asyncio.wait_for(future, timeout=timeout)
104        )
105
106    def _get_greeting(self) -> Optional[QMPMessage]:
107        if self._qmp.greeting is not None:
108            # pylint: disable=protected-access
109            return self._qmp.greeting._asdict()
110        return None
111
112    def __enter__(self: _T) -> _T:
113        # Implement context manager enter function.
114        return self
115
116    def __exit__(self,
117                 exc_type: Optional[Type[BaseException]],
118                 exc_val: Optional[BaseException],
119                 exc_tb: Optional[TracebackType]) -> None:
120        # Implement context manager exit function.
121        self.close()
122
123    @classmethod
124    def parse_address(cls, address: str) -> SocketAddrT:
125        """
126        Parse a string into a QMP address.
127
128        Figure out if the argument is in the port:host form.
129        If it's not, it's probably a file path.
130        """
131        components = address.split(':')
132        if len(components) == 2:
133            try:
134                port = int(components[1])
135            except ValueError:
136                msg = f"Bad port: '{components[1]}' in '{address}'."
137                raise QMPBadPortError(msg) from None
138            return (components[0], port)
139
140        # Treat as filepath.
141        return address
142
143    def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
144        """
145        Connect to the QMP Monitor and perform capabilities negotiation.
146
147        :return: QMP greeting dict, or None if negotiate is false
148        :raise ConnectError: on connection errors
149        """
150        self._qmp.await_greeting = negotiate
151        self._qmp.negotiate = negotiate
152
153        self._sync(
154            self._qmp.connect(self._address)
155        )
156        return self._get_greeting()
157
158    def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
159        """
160        Await connection from QMP Monitor and perform capabilities negotiation.
161
162        :param timeout:
163            timeout in seconds (nonnegative float number, or None).
164            If None, there is no timeout, and this may block forever.
165
166        :return: QMP greeting dict
167        :raise ConnectError: on connection errors
168        """
169        self._qmp.await_greeting = True
170        self._qmp.negotiate = True
171
172        self._sync(self._qmp.accept(), timeout)
173
174        ret = self._get_greeting()
175        assert ret is not None
176        return ret
177
178    def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
179        """
180        Send a QMP command to the QMP Monitor.
181
182        :param qmp_cmd: QMP command to be sent as a Python dict
183        :return: QMP response as a Python dict
184        """
185        return dict(
186            self._sync(
187                # pylint: disable=protected-access
188
189                # _raw() isn't a public API, because turning off
190                # automatic ID assignment is discouraged. For
191                # compatibility with iotests *only*, do it anyway.
192                self._qmp._raw(qmp_cmd, assign_id=False),
193                self._timeout
194            )
195        )
196
197    def cmd_raw(self, name: str,
198                args: Optional[Dict[str, object]] = None) -> QMPMessage:
199        """
200        Build a QMP command and send it to the QMP Monitor.
201
202        :param name: command name (string)
203        :param args: command arguments (dict)
204        """
205        qmp_cmd: QMPMessage = {'execute': name}
206        if args:
207            qmp_cmd['arguments'] = args
208        return self.cmd_obj(qmp_cmd)
209
210    def cmd(self, cmd: str, **kwds: object) -> QMPReturnValue:
211        """
212        Build and send a QMP command to the monitor, report errors if any
213        """
214        return self._sync(
215            self._qmp.execute(cmd, kwds),
216            self._timeout
217        )
218
219    def pull_event(self,
220                   wait: Union[bool, float] = False) -> Optional[QMPMessage]:
221        """
222        Pulls a single event.
223
224        :param wait:
225            If False or 0, do not wait. Return None if no events ready.
226            If True, wait forever until the next event.
227            Otherwise, wait for the specified number of seconds.
228
229        :raise asyncio.TimeoutError:
230            When a timeout is requested and the timeout period elapses.
231
232        :return: The first available QMP event, or None.
233        """
234        if not wait:
235            # wait is False/0: "do not wait, do not except."
236            if self._qmp.events.empty():
237                return None
238
239        # If wait is 'True', wait forever. If wait is False/0, the events
240        # queue must not be empty; but it still needs some real amount
241        # of time to complete.
242        timeout = None
243        if wait and isinstance(wait, float):
244            timeout = wait
245
246        return dict(
247            self._sync(
248                self._qmp.events.get(),
249                timeout
250            )
251        )
252
253    def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
254        """
255        Get a list of QMP events and clear all pending events.
256
257        :param wait:
258            If False or 0, do not wait. Return None if no events ready.
259            If True, wait until we have at least one event.
260            Otherwise, wait for up to the specified number of seconds for at
261            least one event.
262
263        :raise asyncio.TimeoutError:
264            When a timeout is requested and the timeout period elapses.
265
266        :return: A list of QMP events.
267        """
268        events = [dict(x) for x in self._qmp.events.clear()]
269        if events:
270            return events
271
272        event = self.pull_event(wait)
273        return [event] if event is not None else []
274
275    def clear_events(self) -> None:
276        """Clear current list of pending events."""
277        self._qmp.events.clear()
278
279    def close(self) -> None:
280        """Close the connection."""
281        self._sync(
282            self._qmp.disconnect()
283        )
284
285    def settimeout(self, timeout: Optional[float]) -> None:
286        """
287        Set the timeout for QMP RPC execution.
288
289        This timeout affects the `cmd`, `cmd_obj`, and `command` methods.
290        The `accept`, `pull_event` and `get_event` methods have their
291        own configurable timeouts.
292
293        :param timeout:
294            timeout in seconds, or None.
295            None will wait indefinitely.
296        """
297        self._timeout = timeout
298
299    def send_fd_scm(self, fd: int) -> None:
300        """
301        Send a file descriptor to the remote via SCM_RIGHTS.
302        """
303        self._qmp.send_fd_scm(fd)
304
305    def __del__(self) -> None:
306        if self._qmp.runstate == Runstate.IDLE:
307            return
308
309        if not self._aloop.is_running():
310            self.close()
311        else:
312            # Garbage collection ran while the event loop was running.
313            # Nothing we can do about it now, but if we don't raise our
314            # own error, the user will be treated to a lot of traceback
315            # they might not understand.
316            raise QMPError(
317                "QEMUMonitorProtocol.close()"
318                " was not called before object was garbage collected"
319            )
320