1#!/usr/bin/env python3 2# 3# Test case for NBD's blockdev-add interface 4# 5# Copyright (C) 2016 Red Hat, Inc. 6# 7# This program is free software; you can redistribute it and/or modify 8# it under the terms of the GNU General Public License as published by 9# the Free Software Foundation; either version 2 of the License, or 10# (at your option) any later version. 11# 12# This program is distributed in the hope that it will be useful, 13# but WITHOUT ANY WARRANTY; without even the implied warranty of 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15# GNU General Public License for more details. 16# 17# You should have received a copy of the GNU General Public License 18# along with this program. If not, see <http://www.gnu.org/licenses/>. 19# 20 21import os 22import random 23import socket 24import stat 25import time 26import iotests 27from iotests import cachemode, aiomode, imgfmt, qemu_img, qemu_nbd, qemu_nbd_early_pipe 28 29NBD_PORT_START = 32768 30NBD_PORT_END = NBD_PORT_START + 1024 31NBD_IPV6_PORT_START = NBD_PORT_END 32NBD_IPV6_PORT_END = NBD_IPV6_PORT_START + 1024 33 34test_img = os.path.join(iotests.test_dir, 'test.img') 35unix_socket = os.path.join(iotests.sock_dir, 'nbd.socket') 36 37 38def flatten_sock_addr(crumpled_address): 39 result = { 'type': crumpled_address['type'] } 40 result.update(crumpled_address['data']) 41 return result 42 43 44class NBDBlockdevAddBase(iotests.QMPTestCase): 45 def blockdev_add_options(self, address, export, node_name): 46 options = { 'node-name': node_name, 47 'driver': 'raw', 48 'file': { 49 'driver': 'nbd', 50 'read-only': True, 51 'server': address 52 } } 53 if export is not None: 54 options['file']['export'] = export 55 return options 56 57 def client_test(self, filename, address, export=None, 58 node_name='nbd-blockdev', delete=True): 59 bao = self.blockdev_add_options(address, export, node_name) 60 result = self.vm.qmp('blockdev-add', **bao) 61 self.assert_qmp(result, 'return', {}) 62 63 found = False 64 result = self.vm.qmp('query-named-block-nodes') 65 for node in result['return']: 66 if node['node-name'] == node_name: 67 found = True 68 if isinstance(filename, str): 69 self.assert_qmp(node, 'image/filename', filename) 70 else: 71 self.assert_json_filename_equal(node['image']['filename'], 72 filename) 73 break 74 self.assertTrue(found) 75 76 if delete: 77 result = self.vm.qmp('blockdev-del', node_name=node_name) 78 self.assert_qmp(result, 'return', {}) 79 80 81class QemuNBD(NBDBlockdevAddBase): 82 def setUp(self): 83 qemu_img('create', '-f', iotests.imgfmt, test_img, '64k') 84 self.vm = iotests.VM() 85 self.vm.launch() 86 87 def tearDown(self): 88 self.vm.shutdown() 89 os.remove(test_img) 90 try: 91 os.remove(unix_socket) 92 except OSError: 93 pass 94 95 def _try_server_up(self, *args): 96 status, msg = qemu_nbd_early_pipe('-f', imgfmt, test_img, *args) 97 if status == 0: 98 return True 99 if 'Address already in use' in msg: 100 return False 101 self.fail(msg) 102 103 def _server_up(self, *args): 104 self.assertTrue(self._try_server_up(*args)) 105 106 def test_inet(self): 107 while True: 108 nbd_port = random.randrange(NBD_PORT_START, NBD_PORT_END) 109 if self._try_server_up('-b', 'localhost', '-p', str(nbd_port)): 110 break 111 112 address = { 'type': 'inet', 113 'data': { 114 'host': 'localhost', 115 'port': str(nbd_port) 116 } } 117 self.client_test('nbd://localhost:%i' % nbd_port, 118 flatten_sock_addr(address)) 119 120 def test_unix(self): 121 self._server_up('-k', unix_socket) 122 address = { 'type': 'unix', 123 'data': { 'path': unix_socket } } 124 self.client_test('nbd+unix://?socket=' + unix_socket, 125 flatten_sock_addr(address)) 126 127 128class BuiltinNBD(NBDBlockdevAddBase): 129 def setUp(self): 130 qemu_img('create', '-f', iotests.imgfmt, test_img, '64k') 131 self.vm = iotests.VM() 132 self.vm.launch() 133 self.server = iotests.VM('.server') 134 self.server.add_drive_raw('if=none,id=nbd-export,' + 135 'file=%s,' % test_img + 136 'format=%s,' % imgfmt + 137 'cache=%s,' % cachemode + 138 'aio=%s' % aiomode) 139 self.server.launch() 140 141 def tearDown(self): 142 self.vm.shutdown() 143 self.server.shutdown() 144 os.remove(test_img) 145 try: 146 os.remove(unix_socket) 147 except OSError: 148 pass 149 150 # Returns False on EADDRINUSE; fails an assertion on other errors. 151 # Returns True on success. 152 def _try_server_up(self, address, export_name=None, export_name2=None): 153 result = self.server.qmp('nbd-server-start', addr=address) 154 if 'error' in result and \ 155 'Address already in use' in result['error']['desc']: 156 return False 157 self.assert_qmp(result, 'return', {}) 158 159 if export_name is None: 160 result = self.server.qmp('nbd-server-add', device='nbd-export') 161 else: 162 result = self.server.qmp('nbd-server-add', device='nbd-export', 163 name=export_name) 164 self.assert_qmp(result, 'return', {}) 165 166 if export_name2 is not None: 167 result = self.server.qmp('nbd-server-add', device='nbd-export', 168 name=export_name2) 169 self.assert_qmp(result, 'return', {}) 170 171 return True 172 173 def _server_up(self, address, export_name=None, export_name2=None): 174 self.assertTrue(self._try_server_up(address, export_name, export_name2)) 175 176 def _server_down(self): 177 result = self.server.qmp('nbd-server-stop') 178 self.assert_qmp(result, 'return', {}) 179 180 def do_test_inet(self, export_name=None): 181 while True: 182 nbd_port = random.randrange(NBD_PORT_START, NBD_PORT_END) 183 address = { 'type': 'inet', 184 'data': { 185 'host': 'localhost', 186 'port': str(nbd_port) 187 } } 188 if self._try_server_up(address, export_name): 189 break 190 191 export_name = export_name or 'nbd-export' 192 self.client_test('nbd://localhost:%i/%s' % (nbd_port, export_name), 193 flatten_sock_addr(address), export_name) 194 self._server_down() 195 196 def test_inet_default_export_name(self): 197 self.do_test_inet() 198 199 def test_inet_same_export_name(self): 200 self.do_test_inet('nbd-export') 201 202 def test_inet_different_export_name(self): 203 self.do_test_inet('shadow') 204 205 def test_inet_two_exports(self): 206 while True: 207 nbd_port = random.randrange(NBD_PORT_START, NBD_PORT_END) 208 address = { 'type': 'inet', 209 'data': { 210 'host': 'localhost', 211 'port': str(nbd_port) 212 } } 213 if self._try_server_up(address, 'exp1', 'exp2'): 214 break 215 216 self.client_test('nbd://localhost:%i/%s' % (nbd_port, 'exp1'), 217 flatten_sock_addr(address), 'exp1', 'node1', False) 218 self.client_test('nbd://localhost:%i/%s' % (nbd_port, 'exp2'), 219 flatten_sock_addr(address), 'exp2', 'node2', False) 220 result = self.vm.qmp('blockdev-del', node_name='node1') 221 self.assert_qmp(result, 'return', {}) 222 result = self.vm.qmp('blockdev-del', node_name='node2') 223 self.assert_qmp(result, 'return', {}) 224 self._server_down() 225 226 def test_inet6(self): 227 try: 228 socket.getaddrinfo("::0", "0", socket.AF_INET6, 229 socket.SOCK_STREAM, socket.IPPROTO_TCP, 230 socket.AI_ADDRCONFIG | socket.AI_CANONNAME) 231 except socket.gaierror: 232 # IPv6 not available, skip 233 return 234 235 while True: 236 nbd_port = random.randrange(NBD_IPV6_PORT_START, NBD_IPV6_PORT_END) 237 address = { 'type': 'inet', 238 'data': { 239 'host': '::1', 240 'port': str(nbd_port), 241 'ipv4': False, 242 'ipv6': True 243 } } 244 if self._try_server_up(address): 245 break 246 247 filename = { 'driver': 'raw', 248 'file': { 249 'driver': 'nbd', 250 'export': 'nbd-export', 251 'server': flatten_sock_addr(address) 252 } } 253 self.client_test(filename, flatten_sock_addr(address), 'nbd-export') 254 self._server_down() 255 256 def test_unix(self): 257 address = { 'type': 'unix', 258 'data': { 'path': unix_socket } } 259 self._server_up(address) 260 self.client_test('nbd+unix:///nbd-export?socket=' + unix_socket, 261 flatten_sock_addr(address), 'nbd-export') 262 self._server_down() 263 264 def test_fd(self): 265 self._server_up({ 'type': 'unix', 266 'data': { 'path': unix_socket } }) 267 268 sockfd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 269 sockfd.connect(unix_socket) 270 271 result = self.vm.send_fd_scm(fd=sockfd.fileno()) 272 self.assertEqual(result, 0, 'Failed to send socket FD') 273 274 result = self.vm.qmp('getfd', fdname='nbd-fifo') 275 self.assert_qmp(result, 'return', {}) 276 277 address = { 'type': 'fd', 278 'data': { 'str': 'nbd-fifo' } } 279 filename = { 'driver': 'raw', 280 'file': { 281 'driver': 'nbd', 282 'export': 'nbd-export', 283 'server': flatten_sock_addr(address) 284 } } 285 self.client_test(filename, flatten_sock_addr(address), 'nbd-export') 286 287 self._server_down() 288 289 290if __name__ == '__main__': 291 iotests.main(supported_fmts=['raw'], 292 supported_protocols=['nbd']) 293