1""" 2QEMU Object Model FUSE filesystem tool 3 4This script offers a simple FUSE filesystem within which the QOM tree 5may be browsed, queried and edited using traditional shell tooling. 6 7This script requires the 'fusepy' python package. 8 9 10usage: qom-fuse [-h] [--socket SOCKET] <mount> 11 12Mount a QOM tree as a FUSE filesystem 13 14positional arguments: 15 <mount> Mount point 16 17optional arguments: 18 -h, --help show this help message and exit 19 --socket SOCKET, -s SOCKET 20 QMP socket path or address (addr:port). May also be 21 set via QMP_SOCKET environment variable. 22""" 23## 24# Copyright IBM, Corp. 2012 25# Copyright (C) 2020 Red Hat, Inc. 26# 27# Authors: 28# Anthony Liguori <aliguori@us.ibm.com> 29# Markus Armbruster <armbru@redhat.com> 30# 31# This work is licensed under the terms of the GNU GPL, version 2 or later. 32# See the COPYING file in the top-level directory. 33## 34 35import argparse 36from errno import ENOENT, EPERM 37import stat 38import sys 39from typing import ( 40 IO, 41 Dict, 42 Iterator, 43 Mapping, 44 Optional, 45 Union, 46) 47 48import fuse 49from fuse import FUSE, FuseOSError, Operations 50 51from qemu.qmp import ExecuteError 52 53from .qom_common import QOMCommand 54 55 56fuse.fuse_python_api = (0, 2) 57 58 59class QOMFuse(QOMCommand, Operations): 60 """ 61 QOMFuse implements both fuse.Operations and QOMCommand. 62 63 Operations implements the FS, and QOMCommand implements the CLI command. 64 """ 65 name = 'fuse' 66 help = 'Mount a QOM tree as a FUSE filesystem' 67 fuse: FUSE 68 69 @classmethod 70 def configure_parser(cls, parser: argparse.ArgumentParser) -> None: 71 super().configure_parser(parser) 72 parser.add_argument( 73 'mount', 74 metavar='<mount>', 75 action='store', 76 help="Mount point", 77 ) 78 79 def __init__(self, args: argparse.Namespace): 80 super().__init__(args) 81 self.mount = args.mount 82 self.ino_map: Dict[str, int] = {} 83 self.ino_count = 1 84 85 def run(self) -> int: 86 print(f"Mounting QOMFS to '{self.mount}'", file=sys.stderr) 87 self.fuse = FUSE(self, self.mount, foreground=True) 88 return 0 89 90 def get_ino(self, path: str) -> int: 91 """Get an inode number for a given QOM path.""" 92 if path in self.ino_map: 93 return self.ino_map[path] 94 self.ino_map[path] = self.ino_count 95 self.ino_count += 1 96 return self.ino_map[path] 97 98 def is_object(self, path: str) -> bool: 99 """Is the given QOM path an object?""" 100 try: 101 self.qom_list(path) 102 return True 103 except ExecuteError: 104 return False 105 106 def is_property(self, path: str) -> bool: 107 """Is the given QOM path a property?""" 108 path, prop = path.rsplit('/', 1) 109 if path == '': 110 path = '/' 111 try: 112 for item in self.qom_list(path): 113 if item.name == prop: 114 return True 115 return False 116 except ExecuteError: 117 return False 118 119 def is_link(self, path: str) -> bool: 120 """Is the given QOM path a link?""" 121 path, prop = path.rsplit('/', 1) 122 if path == '': 123 path = '/' 124 try: 125 for item in self.qom_list(path): 126 if item.name == prop and item.link: 127 return True 128 return False 129 except ExecuteError: 130 return False 131 132 def read(self, path: str, size: int, offset: int, fh: IO[bytes]) -> bytes: 133 if not self.is_property(path): 134 raise FuseOSError(ENOENT) 135 136 path, prop = path.rsplit('/', 1) 137 if path == '': 138 path = '/' 139 try: 140 data = str(self.qmp.cmd('qom-get', path=path, property=prop)) 141 data += '\n' # make values shell friendly 142 except ExecuteError as err: 143 raise FuseOSError(EPERM) from err 144 145 if offset > len(data): 146 return b'' 147 148 return bytes(data[offset:][:size], encoding='utf-8') 149 150 def readlink(self, path: str) -> Union[bool, str]: 151 if not self.is_link(path): 152 return False 153 path, prop = path.rsplit('/', 1) 154 prefix = '/'.join(['..'] * (len(path.split('/')) - 1)) 155 return prefix + str(self.qmp.cmd('qom-get', path=path, 156 property=prop)) 157 158 def getattr(self, path: str, 159 fh: Optional[IO[bytes]] = None) -> Mapping[str, object]: 160 if self.is_link(path): 161 value = { 162 'st_mode': 0o755 | stat.S_IFLNK, 163 'st_ino': self.get_ino(path), 164 'st_dev': 0, 165 'st_nlink': 2, 166 'st_uid': 1000, 167 'st_gid': 1000, 168 'st_size': 4096, 169 'st_atime': 0, 170 'st_mtime': 0, 171 'st_ctime': 0 172 } 173 elif self.is_object(path): 174 value = { 175 'st_mode': 0o755 | stat.S_IFDIR, 176 'st_ino': self.get_ino(path), 177 'st_dev': 0, 178 'st_nlink': 2, 179 'st_uid': 1000, 180 'st_gid': 1000, 181 'st_size': 4096, 182 'st_atime': 0, 183 'st_mtime': 0, 184 'st_ctime': 0 185 } 186 elif self.is_property(path): 187 value = { 188 'st_mode': 0o644 | stat.S_IFREG, 189 'st_ino': self.get_ino(path), 190 'st_dev': 0, 191 'st_nlink': 1, 192 'st_uid': 1000, 193 'st_gid': 1000, 194 'st_size': 4096, 195 'st_atime': 0, 196 'st_mtime': 0, 197 'st_ctime': 0 198 } 199 else: 200 raise FuseOSError(ENOENT) 201 return value 202 203 def readdir(self, path: str, fh: IO[bytes]) -> Iterator[str]: 204 yield '.' 205 yield '..' 206 for item in self.qom_list(path): 207 yield item.name 208