1""" 2QEMU Guest Agent Client 3 4Usage: 5 6Start QEMU with: 7 8# qemu [...] -chardev socket,path=/tmp/qga.sock,server=on,wait=off,id=qga0 \ 9 -device virtio-serial \ 10 -device virtserialport,chardev=qga0,name=org.qemu.guest_agent.0 11 12Run the script: 13 14$ qemu-ga-client --address=/tmp/qga.sock <command> [args...] 15 16or 17 18$ export QGA_CLIENT_ADDRESS=/tmp/qga.sock 19$ qemu-ga-client <command> [args...] 20 21For example: 22 23$ qemu-ga-client cat /etc/resolv.conf 24# Generated by NetworkManager 25nameserver 10.0.2.3 26$ qemu-ga-client fsfreeze status 27thawed 28$ qemu-ga-client fsfreeze freeze 292 filesystems frozen 30 31See also: https://wiki.qemu.org/Features/QAPI/GuestAgent 32""" 33 34# Copyright (C) 2012 Ryota Ozaki <ozaki.ryota@gmail.com> 35# 36# This work is licensed under the terms of the GNU GPL, version 2. See 37# the COPYING file in the top-level directory. 38 39import argparse 40import asyncio 41import base64 42import os 43import random 44import sys 45from typing import ( 46 Any, 47 Callable, 48 Dict, 49 Optional, 50 Sequence, 51) 52 53from qemu.qmp import ConnectError, SocketAddrT 54from qemu.qmp.legacy import QEMUMonitorProtocol 55 56 57# This script has not seen many patches or careful attention in quite 58# some time. If you would like to improve it, please review the design 59# carefully and add docstrings at that point in time. Until then: 60 61# pylint: disable=missing-docstring 62 63 64class QemuGuestAgent(QEMUMonitorProtocol): 65 def __getattr__(self, name: str) -> Callable[..., Any]: 66 def wrapper(**kwds: object) -> object: 67 return self.cmd('guest-' + name.replace('_', '-'), **kwds) 68 return wrapper 69 70 71class QemuGuestAgentClient: 72 def __init__(self, address: SocketAddrT): 73 self.qga = QemuGuestAgent(address) 74 self.qga.connect(negotiate=False) 75 76 def sync(self, timeout: Optional[float] = 3) -> None: 77 # Avoid being blocked forever 78 if not self.ping(timeout): 79 raise EnvironmentError('Agent seems not alive') 80 uid = random.randint(0, (1 << 32) - 1) 81 while True: 82 ret = self.qga.sync(id=uid) 83 if isinstance(ret, int) and int(ret) == uid: 84 break 85 86 def __file_read_all(self, handle: int) -> bytes: 87 eof = False 88 data = b'' 89 while not eof: 90 ret = self.qga.file_read(handle=handle, count=1024) 91 _data = base64.b64decode(ret['buf-b64']) 92 data += _data 93 eof = ret['eof'] 94 return data 95 96 def read(self, path: str) -> bytes: 97 handle = self.qga.file_open(path=path) 98 try: 99 data = self.__file_read_all(handle) 100 finally: 101 self.qga.file_close(handle=handle) 102 return data 103 104 def info(self) -> str: 105 info = self.qga.info() 106 107 msgs = [] 108 msgs.append('version: ' + info['version']) 109 msgs.append('supported_commands:') 110 enabled = [c['name'] for c in info['supported_commands'] 111 if c['enabled']] 112 msgs.append('\tenabled: ' + ', '.join(enabled)) 113 disabled = [c['name'] for c in info['supported_commands'] 114 if not c['enabled']] 115 msgs.append('\tdisabled: ' + ', '.join(disabled)) 116 117 return '\n'.join(msgs) 118 119 @classmethod 120 def __gen_ipv4_netmask(cls, prefixlen: int) -> str: 121 mask = int('1' * prefixlen + '0' * (32 - prefixlen), 2) 122 return '.'.join([str(mask >> 24), 123 str((mask >> 16) & 0xff), 124 str((mask >> 8) & 0xff), 125 str(mask & 0xff)]) 126 127 def ifconfig(self) -> str: 128 nifs = self.qga.network_get_interfaces() 129 130 msgs = [] 131 for nif in nifs: 132 msgs.append(nif['name'] + ':') 133 if 'ip-addresses' in nif: 134 for ipaddr in nif['ip-addresses']: 135 if ipaddr['ip-address-type'] == 'ipv4': 136 addr = ipaddr['ip-address'] 137 mask = self.__gen_ipv4_netmask(int(ipaddr['prefix'])) 138 msgs.append(f"\tinet {addr} netmask {mask}") 139 elif ipaddr['ip-address-type'] == 'ipv6': 140 addr = ipaddr['ip-address'] 141 prefix = ipaddr['prefix'] 142 msgs.append(f"\tinet6 {addr} prefixlen {prefix}") 143 if nif['hardware-address'] != '00:00:00:00:00:00': 144 msgs.append("\tether " + nif['hardware-address']) 145 146 return '\n'.join(msgs) 147 148 def ping(self, timeout: Optional[float]) -> bool: 149 self.qga.settimeout(timeout) 150 try: 151 self.qga.ping() 152 except asyncio.TimeoutError: 153 return False 154 return True 155 156 def fsfreeze(self, cmd: str) -> object: 157 if cmd not in ['status', 'freeze', 'thaw']: 158 raise ValueError('Invalid command: ' + cmd) 159 # Can be int (freeze, thaw) or GuestFsfreezeStatus (status) 160 return getattr(self.qga, 'fsfreeze' + '_' + cmd)() 161 162 def fstrim(self, minimum: int) -> Dict[str, object]: 163 # returns GuestFilesystemTrimResponse 164 ret = getattr(self.qga, 'fstrim')(minimum=minimum) 165 assert isinstance(ret, dict) 166 return ret 167 168 def suspend(self, mode: str) -> None: 169 if mode not in ['disk', 'ram', 'hybrid']: 170 raise ValueError('Invalid mode: ' + mode) 171 172 try: 173 getattr(self.qga, 'suspend' + '_' + mode)() 174 # On error exception will raise 175 except asyncio.TimeoutError: 176 # On success command will timed out 177 pass 178 179 def shutdown(self, mode: str = 'powerdown') -> None: 180 if mode not in ['powerdown', 'halt', 'reboot']: 181 raise ValueError('Invalid mode: ' + mode) 182 183 try: 184 self.qga.shutdown(mode=mode) 185 except asyncio.TimeoutError: 186 pass 187 188 189def _cmd_cat(client: QemuGuestAgentClient, args: Sequence[str]) -> None: 190 if len(args) != 1: 191 print('Invalid argument') 192 print('Usage: cat <file>') 193 sys.exit(1) 194 print(client.read(args[0])) 195 196 197def _cmd_fsfreeze(client: QemuGuestAgentClient, args: Sequence[str]) -> None: 198 usage = 'Usage: fsfreeze status|freeze|thaw' 199 if len(args) != 1: 200 print('Invalid argument') 201 print(usage) 202 sys.exit(1) 203 if args[0] not in ['status', 'freeze', 'thaw']: 204 print('Invalid command: ' + args[0]) 205 print(usage) 206 sys.exit(1) 207 cmd = args[0] 208 ret = client.fsfreeze(cmd) 209 if cmd == 'status': 210 print(ret) 211 return 212 213 assert isinstance(ret, int) 214 verb = 'frozen' if cmd == 'freeze' else 'thawed' 215 print(f"{ret:d} filesystems {verb}") 216 217 218def _cmd_fstrim(client: QemuGuestAgentClient, args: Sequence[str]) -> None: 219 if len(args) == 0: 220 minimum = 0 221 else: 222 minimum = int(args[0]) 223 print(client.fstrim(minimum)) 224 225 226def _cmd_ifconfig(client: QemuGuestAgentClient, args: Sequence[str]) -> None: 227 assert not args 228 print(client.ifconfig()) 229 230 231def _cmd_info(client: QemuGuestAgentClient, args: Sequence[str]) -> None: 232 assert not args 233 print(client.info()) 234 235 236def _cmd_ping(client: QemuGuestAgentClient, args: Sequence[str]) -> None: 237 timeout = 3.0 if len(args) == 0 else float(args[0]) 238 alive = client.ping(timeout) 239 if not alive: 240 print("Not responded in %s sec" % args[0]) 241 sys.exit(1) 242 243 244def _cmd_suspend(client: QemuGuestAgentClient, args: Sequence[str]) -> None: 245 usage = 'Usage: suspend disk|ram|hybrid' 246 if len(args) != 1: 247 print('Less argument') 248 print(usage) 249 sys.exit(1) 250 if args[0] not in ['disk', 'ram', 'hybrid']: 251 print('Invalid command: ' + args[0]) 252 print(usage) 253 sys.exit(1) 254 client.suspend(args[0]) 255 256 257def _cmd_shutdown(client: QemuGuestAgentClient, args: Sequence[str]) -> None: 258 assert not args 259 client.shutdown() 260 261 262_cmd_powerdown = _cmd_shutdown 263 264 265def _cmd_halt(client: QemuGuestAgentClient, args: Sequence[str]) -> None: 266 assert not args 267 client.shutdown('halt') 268 269 270def _cmd_reboot(client: QemuGuestAgentClient, args: Sequence[str]) -> None: 271 assert not args 272 client.shutdown('reboot') 273 274 275commands = [m.replace('_cmd_', '') for m in dir() if '_cmd_' in m] 276 277 278def send_command(address: str, cmd: str, args: Sequence[str]) -> None: 279 if not os.path.exists(address): 280 print(f"'{address}' not found. (Is QEMU running?)") 281 sys.exit(1) 282 283 if cmd not in commands: 284 print('Invalid command: ' + cmd) 285 print('Available commands: ' + ', '.join(commands)) 286 sys.exit(1) 287 288 try: 289 client = QemuGuestAgentClient(address) 290 except ConnectError as err: 291 print(err) 292 if isinstance(err.exc, ConnectionError): 293 print('(Is QEMU running?)') 294 sys.exit(1) 295 296 if cmd == 'fsfreeze' and args[0] == 'freeze': 297 client.sync(60) 298 elif cmd != 'ping': 299 client.sync() 300 301 globals()['_cmd_' + cmd](client, args) 302 303 304def main() -> None: 305 address = os.environ.get('QGA_CLIENT_ADDRESS') 306 307 parser = argparse.ArgumentParser() 308 parser.add_argument('--address', action='store', 309 default=address, 310 help='Specify a ip:port pair or a unix socket path') 311 parser.add_argument('command', choices=commands) 312 parser.add_argument('args', nargs='*') 313 314 args = parser.parse_args() 315 if args.address is None: 316 parser.error('address is not specified') 317 sys.exit(1) 318 319 send_command(args.address, args.command, args.args) 320 321 322if __name__ == '__main__': 323 main() 324