1#!/usr/bin/env python3 2 3""" 4QEMU Guest Agent Client 5 6Usage: 7 8Start QEMU with: 9 10# qemu [...] -chardev socket,path=/tmp/qga.sock,server,wait=off,id=qga0 \ 11 -device virtio-serial \ 12 -device virtserialport,chardev=qga0,name=org.qemu.guest_agent.0 13 14Run the script: 15 16$ qemu-ga-client --address=/tmp/qga.sock <command> [args...] 17 18or 19 20$ export QGA_CLIENT_ADDRESS=/tmp/qga.sock 21$ qemu-ga-client <command> [args...] 22 23For example: 24 25$ qemu-ga-client cat /etc/resolv.conf 26# Generated by NetworkManager 27nameserver 10.0.2.3 28$ qemu-ga-client fsfreeze status 29thawed 30$ qemu-ga-client fsfreeze freeze 312 filesystems frozen 32 33See also: https://wiki.qemu.org/Features/QAPI/GuestAgent 34""" 35 36# Copyright (C) 2012 Ryota Ozaki <ozaki.ryota@gmail.com> 37# 38# This work is licensed under the terms of the GNU GPL, version 2. See 39# the COPYING file in the top-level directory. 40 41import argparse 42import base64 43import errno 44import os 45import random 46import sys 47from typing import ( 48 Any, 49 Callable, 50 Dict, 51 Optional, 52 Sequence, 53) 54 55 56sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 57from qemu import qmp 58from qemu.qmp import SocketAddrT 59 60 61# This script has not seen many patches or careful attention in quite 62# some time. If you would like to improve it, please review the design 63# carefully and add docstrings at that point in time. Until then: 64 65# pylint: disable=missing-docstring 66 67 68class QemuGuestAgent(qmp.QEMUMonitorProtocol): 69 def __getattr__(self, name: str) -> Callable[..., Any]: 70 def wrapper(**kwds: object) -> object: 71 return self.command('guest-' + name.replace('_', '-'), **kwds) 72 return wrapper 73 74 75class QemuGuestAgentClient: 76 def __init__(self, address: SocketAddrT): 77 self.qga = QemuGuestAgent(address) 78 self.qga.connect(negotiate=False) 79 80 def sync(self, timeout: Optional[float] = 3) -> None: 81 # Avoid being blocked forever 82 if not self.ping(timeout): 83 raise EnvironmentError('Agent seems not alive') 84 uid = random.randint(0, (1 << 32) - 1) 85 while True: 86 ret = self.qga.sync(id=uid) 87 if isinstance(ret, int) and int(ret) == uid: 88 break 89 90 def __file_read_all(self, handle: int) -> bytes: 91 eof = False 92 data = b'' 93 while not eof: 94 ret = self.qga.file_read(handle=handle, count=1024) 95 _data = base64.b64decode(ret['buf-b64']) 96 data += _data 97 eof = ret['eof'] 98 return data 99 100 def read(self, path: str) -> bytes: 101 handle = self.qga.file_open(path=path) 102 try: 103 data = self.__file_read_all(handle) 104 finally: 105 self.qga.file_close(handle=handle) 106 return data 107 108 def info(self) -> str: 109 info = self.qga.info() 110 111 msgs = [] 112 msgs.append('version: ' + info['version']) 113 msgs.append('supported_commands:') 114 enabled = [c['name'] for c in info['supported_commands'] 115 if c['enabled']] 116 msgs.append('\tenabled: ' + ', '.join(enabled)) 117 disabled = [c['name'] for c in info['supported_commands'] 118 if not c['enabled']] 119 msgs.append('\tdisabled: ' + ', '.join(disabled)) 120 121 return '\n'.join(msgs) 122 123 @classmethod 124 def __gen_ipv4_netmask(cls, prefixlen: int) -> str: 125 mask = int('1' * prefixlen + '0' * (32 - prefixlen), 2) 126 return '.'.join([str(mask >> 24), 127 str((mask >> 16) & 0xff), 128 str((mask >> 8) & 0xff), 129 str(mask & 0xff)]) 130 131 def ifconfig(self) -> str: 132 nifs = self.qga.network_get_interfaces() 133 134 msgs = [] 135 for nif in nifs: 136 msgs.append(nif['name'] + ':') 137 if 'ip-addresses' in nif: 138 for ipaddr in nif['ip-addresses']: 139 if ipaddr['ip-address-type'] == 'ipv4': 140 addr = ipaddr['ip-address'] 141 mask = self.__gen_ipv4_netmask(int(ipaddr['prefix'])) 142 msgs.append(f"\tinet {addr} netmask {mask}") 143 elif ipaddr['ip-address-type'] == 'ipv6': 144 addr = ipaddr['ip-address'] 145 prefix = ipaddr['prefix'] 146 msgs.append(f"\tinet6 {addr} prefixlen {prefix}") 147 if nif['hardware-address'] != '00:00:00:00:00:00': 148 msgs.append("\tether " + nif['hardware-address']) 149 150 return '\n'.join(msgs) 151 152 def ping(self, timeout: Optional[float]) -> bool: 153 self.qga.settimeout(timeout) 154 try: 155 self.qga.ping() 156 except TimeoutError: 157 return False 158 return True 159 160 def fsfreeze(self, cmd: str) -> object: 161 if cmd not in ['status', 'freeze', 'thaw']: 162 raise Exception('Invalid command: ' + cmd) 163 # Can be int (freeze, thaw) or GuestFsfreezeStatus (status) 164 return getattr(self.qga, 'fsfreeze' + '_' + cmd)() 165 166 def fstrim(self, minimum: int) -> Dict[str, object]: 167 # returns GuestFilesystemTrimResponse 168 ret = getattr(self.qga, 'fstrim')(minimum=minimum) 169 assert isinstance(ret, dict) 170 return ret 171 172 def suspend(self, mode: str) -> None: 173 if mode not in ['disk', 'ram', 'hybrid']: 174 raise Exception('Invalid mode: ' + mode) 175 176 try: 177 getattr(self.qga, 'suspend' + '_' + mode)() 178 # On error exception will raise 179 except TimeoutError: 180 # On success command will timed out 181 return 182 183 def shutdown(self, mode: str = 'powerdown') -> None: 184 if mode not in ['powerdown', 'halt', 'reboot']: 185 raise Exception('Invalid mode: ' + mode) 186 187 try: 188 self.qga.shutdown(mode=mode) 189 except TimeoutError: 190 pass 191 192 193def _cmd_cat(client: QemuGuestAgentClient, args: Sequence[str]) -> None: 194 if len(args) != 1: 195 print('Invalid argument') 196 print('Usage: cat <file>') 197 sys.exit(1) 198 print(client.read(args[0])) 199 200 201def _cmd_fsfreeze(client: QemuGuestAgentClient, args: Sequence[str]) -> None: 202 usage = 'Usage: fsfreeze status|freeze|thaw' 203 if len(args) != 1: 204 print('Invalid argument') 205 print(usage) 206 sys.exit(1) 207 if args[0] not in ['status', 'freeze', 'thaw']: 208 print('Invalid command: ' + args[0]) 209 print(usage) 210 sys.exit(1) 211 cmd = args[0] 212 ret = client.fsfreeze(cmd) 213 if cmd == 'status': 214 print(ret) 215 return 216 217 assert isinstance(ret, int) 218 verb = 'frozen' if cmd == 'freeze' else 'thawed' 219 print(f"{ret:d} filesystems {verb}") 220 221 222def _cmd_fstrim(client: QemuGuestAgentClient, args: Sequence[str]) -> None: 223 if len(args) == 0: 224 minimum = 0 225 else: 226 minimum = int(args[0]) 227 print(client.fstrim(minimum)) 228 229 230def _cmd_ifconfig(client: QemuGuestAgentClient, args: Sequence[str]) -> None: 231 assert not args 232 print(client.ifconfig()) 233 234 235def _cmd_info(client: QemuGuestAgentClient, args: Sequence[str]) -> None: 236 assert not args 237 print(client.info()) 238 239 240def _cmd_ping(client: QemuGuestAgentClient, args: Sequence[str]) -> None: 241 timeout = 3.0 if len(args) == 0 else float(args[0]) 242 alive = client.ping(timeout) 243 if not alive: 244 print("Not responded in %s sec" % args[0]) 245 sys.exit(1) 246 247 248def _cmd_suspend(client: QemuGuestAgentClient, args: Sequence[str]) -> None: 249 usage = 'Usage: suspend disk|ram|hybrid' 250 if len(args) != 1: 251 print('Less argument') 252 print(usage) 253 sys.exit(1) 254 if args[0] not in ['disk', 'ram', 'hybrid']: 255 print('Invalid command: ' + args[0]) 256 print(usage) 257 sys.exit(1) 258 client.suspend(args[0]) 259 260 261def _cmd_shutdown(client: QemuGuestAgentClient, args: Sequence[str]) -> None: 262 assert not args 263 client.shutdown() 264 265 266_cmd_powerdown = _cmd_shutdown 267 268 269def _cmd_halt(client: QemuGuestAgentClient, args: Sequence[str]) -> None: 270 assert not args 271 client.shutdown('halt') 272 273 274def _cmd_reboot(client: QemuGuestAgentClient, args: Sequence[str]) -> None: 275 assert not args 276 client.shutdown('reboot') 277 278 279commands = [m.replace('_cmd_', '') for m in dir() if '_cmd_' in m] 280 281 282def send_command(address: str, cmd: str, args: Sequence[str]) -> None: 283 if not os.path.exists(address): 284 print('%s not found' % address) 285 sys.exit(1) 286 287 if cmd not in commands: 288 print('Invalid command: ' + cmd) 289 print('Available commands: ' + ', '.join(commands)) 290 sys.exit(1) 291 292 try: 293 client = QemuGuestAgentClient(address) 294 except OSError as err: 295 print(err) 296 if err.errno == errno.ECONNREFUSED: 297 print('Hint: qemu is not running?') 298 sys.exit(1) 299 300 if cmd == 'fsfreeze' and args[0] == 'freeze': 301 client.sync(60) 302 elif cmd != 'ping': 303 client.sync() 304 305 globals()['_cmd_' + cmd](client, args) 306 307 308def main() -> None: 309 address = os.environ.get('QGA_CLIENT_ADDRESS') 310 311 parser = argparse.ArgumentParser() 312 parser.add_argument('--address', action='store', 313 default=address, 314 help='Specify a ip:port pair or a unix socket path') 315 parser.add_argument('command', choices=commands) 316 parser.add_argument('args', nargs='*') 317 318 args = parser.parse_args() 319 if args.address is None: 320 parser.error('address is not specified') 321 sys.exit(1) 322 323 send_command(args.address, args.command, args.args) 324 325 326if __name__ == '__main__': 327 main() 328