1#!/usr/bin/env python 2# NBD server - fault injection utility 3# 4# Configuration file syntax: 5# [inject-error "disconnect-neg1"] 6# event=neg1 7# io=readwrite 8# when=before 9# 10# Note that Python's ConfigParser squashes together all sections with the same 11# name, so give each [inject-error] a unique name. 12# 13# inject-error options: 14# event - name of the trigger event 15# "neg1" - first part of negotiation struct 16# "export" - export struct 17# "neg2" - second part of negotiation struct 18# "request" - NBD request struct 19# "reply" - NBD reply struct 20# "data" - request/reply data 21# io - I/O direction that triggers this rule: 22# "read", "write", or "readwrite" 23# default: readwrite 24# when - after how many bytes to inject the fault 25# -1 - inject error after I/O 26# 0 - inject error before I/O 27# integer - inject error after integer bytes 28# "before" - alias for 0 29# "after" - alias for -1 30# default: before 31# 32# Currently the only error injection action is to terminate the server process. 33# This resets the TCP connection and thus forces the client to handle 34# unexpected connection termination. 35# 36# Other error injection actions could be added in the future. 37# 38# Copyright Red Hat, Inc. 2014 39# 40# Authors: 41# Stefan Hajnoczi <stefanha@redhat.com> 42# 43# This work is licensed under the terms of the GNU GPL, version 2 or later. 44# See the COPYING file in the top-level directory. 45 46from __future__ import print_function 47import sys 48import socket 49import struct 50import collections 51if sys.version_info.major >= 3: 52 import configparser 53else: 54 import ConfigParser as configparser 55 56FAKE_DISK_SIZE = 8 * 1024 * 1024 * 1024 # 8 GB 57 58# Protocol constants 59NBD_CMD_READ = 0 60NBD_CMD_WRITE = 1 61NBD_CMD_DISC = 2 62NBD_REQUEST_MAGIC = 0x25609513 63NBD_SIMPLE_REPLY_MAGIC = 0x67446698 64NBD_PASSWD = 0x4e42444d41474943 65NBD_OPTS_MAGIC = 0x49484156454F5054 66NBD_CLIENT_MAGIC = 0x0000420281861253 67NBD_OPT_EXPORT_NAME = 1 << 0 68 69# Protocol structs 70neg_classic_struct = struct.Struct('>QQQI124x') 71neg1_struct = struct.Struct('>QQH') 72export_tuple = collections.namedtuple('Export', 'reserved magic opt len') 73export_struct = struct.Struct('>IQII') 74neg2_struct = struct.Struct('>QH124x') 75request_tuple = collections.namedtuple('Request', 'magic type handle from_ len') 76request_struct = struct.Struct('>IIQQI') 77reply_struct = struct.Struct('>IIQ') 78 79def err(msg): 80 sys.stderr.write(msg + '\n') 81 sys.exit(1) 82 83def recvall(sock, bufsize): 84 received = 0 85 chunks = [] 86 while received < bufsize: 87 chunk = sock.recv(bufsize - received) 88 if len(chunk) == 0: 89 raise Exception('unexpected disconnect') 90 chunks.append(chunk) 91 received += len(chunk) 92 return b''.join(chunks) 93 94class Rule(object): 95 def __init__(self, name, event, io, when): 96 self.name = name 97 self.event = event 98 self.io = io 99 self.when = when 100 101 def match(self, event, io): 102 if event != self.event: 103 return False 104 if io != self.io and self.io != 'readwrite': 105 return False 106 return True 107 108class FaultInjectionSocket(object): 109 def __init__(self, sock, rules): 110 self.sock = sock 111 self.rules = rules 112 113 def check(self, event, io, bufsize=None): 114 for rule in self.rules: 115 if rule.match(event, io): 116 if rule.when == 0 or bufsize is None: 117 print('Closing connection on rule match %s' % rule.name) 118 self.sock.close() 119 sys.stdout.flush() 120 sys.exit(0) 121 if rule.when != -1: 122 return rule.when 123 return bufsize 124 125 def send(self, buf, event): 126 bufsize = self.check(event, 'write', bufsize=len(buf)) 127 self.sock.sendall(buf[:bufsize]) 128 self.check(event, 'write') 129 130 def recv(self, bufsize, event): 131 bufsize = self.check(event, 'read', bufsize=bufsize) 132 data = recvall(self.sock, bufsize) 133 self.check(event, 'read') 134 return data 135 136 def close(self): 137 self.sock.close() 138 139def negotiate_classic(conn): 140 buf = neg_classic_struct.pack(NBD_PASSWD, NBD_CLIENT_MAGIC, 141 FAKE_DISK_SIZE, 0) 142 conn.send(buf, event='neg-classic') 143 144def negotiate_export(conn): 145 # Send negotiation part 1 146 buf = neg1_struct.pack(NBD_PASSWD, NBD_OPTS_MAGIC, 0) 147 conn.send(buf, event='neg1') 148 149 # Receive export option 150 buf = conn.recv(export_struct.size, event='export') 151 export = export_tuple._make(export_struct.unpack(buf)) 152 assert export.magic == NBD_OPTS_MAGIC 153 assert export.opt == NBD_OPT_EXPORT_NAME 154 name = conn.recv(export.len, event='export-name') 155 156 # Send negotiation part 2 157 buf = neg2_struct.pack(FAKE_DISK_SIZE, 0) 158 conn.send(buf, event='neg2') 159 160def negotiate(conn, use_export): 161 '''Negotiate export with client''' 162 if use_export: 163 negotiate_export(conn) 164 else: 165 negotiate_classic(conn) 166 167def read_request(conn): 168 '''Parse NBD request from client''' 169 buf = conn.recv(request_struct.size, event='request') 170 req = request_tuple._make(request_struct.unpack(buf)) 171 assert req.magic == NBD_REQUEST_MAGIC 172 return req 173 174def write_reply(conn, error, handle): 175 buf = reply_struct.pack(NBD_SIMPLE_REPLY_MAGIC, error, handle) 176 conn.send(buf, event='reply') 177 178def handle_connection(conn, use_export): 179 negotiate(conn, use_export) 180 while True: 181 req = read_request(conn) 182 if req.type == NBD_CMD_READ: 183 write_reply(conn, 0, req.handle) 184 conn.send(b'\0' * req.len, event='data') 185 elif req.type == NBD_CMD_WRITE: 186 _ = conn.recv(req.len, event='data') 187 write_reply(conn, 0, req.handle) 188 elif req.type == NBD_CMD_DISC: 189 break 190 else: 191 print('unrecognized command type %#02x' % req.type) 192 break 193 conn.close() 194 195def run_server(sock, rules, use_export): 196 while True: 197 conn, _ = sock.accept() 198 handle_connection(FaultInjectionSocket(conn, rules), use_export) 199 200def parse_inject_error(name, options): 201 if 'event' not in options: 202 err('missing \"event\" option in %s' % name) 203 event = options['event'] 204 if event not in ('neg-classic', 'neg1', 'export', 'neg2', 'request', 'reply', 'data'): 205 err('invalid \"event\" option value \"%s\" in %s' % (event, name)) 206 io = options.get('io', 'readwrite') 207 if io not in ('read', 'write', 'readwrite'): 208 err('invalid \"io\" option value \"%s\" in %s' % (io, name)) 209 when = options.get('when', 'before') 210 try: 211 when = int(when) 212 except ValueError: 213 if when == 'before': 214 when = 0 215 elif when == 'after': 216 when = -1 217 else: 218 err('invalid \"when\" option value \"%s\" in %s' % (when, name)) 219 return Rule(name, event, io, when) 220 221def parse_config(config): 222 rules = [] 223 for name in config.sections(): 224 if name.startswith('inject-error'): 225 options = dict(config.items(name)) 226 rules.append(parse_inject_error(name, options)) 227 else: 228 err('invalid config section name: %s' % name) 229 return rules 230 231def load_rules(filename): 232 config = configparser.RawConfigParser() 233 with open(filename, 'rt') as f: 234 config.readfp(f, filename) 235 return parse_config(config) 236 237def open_socket(path): 238 '''Open a TCP or UNIX domain listen socket''' 239 if ':' in path: 240 host, port = path.split(':', 1) 241 sock = socket.socket() 242 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 243 sock.bind((host, int(port))) 244 245 # If given port was 0 the final port number is now available 246 path = '%s:%d' % sock.getsockname() 247 else: 248 sock = socket.socket(socket.AF_UNIX) 249 sock.bind(path) 250 sock.listen(0) 251 print('Listening on %s' % path) 252 sys.stdout.flush() # another process may be waiting, show message now 253 return sock 254 255def usage(args): 256 sys.stderr.write('usage: %s [--classic-negotiation] <tcp-port>|<unix-path> <config-file>\n' % args[0]) 257 sys.stderr.write('Run an fault injector NBD server with rules defined in a config file.\n') 258 sys.exit(1) 259 260def main(args): 261 if len(args) != 3 and len(args) != 4: 262 usage(args) 263 use_export = True 264 if args[1] == '--classic-negotiation': 265 use_export = False 266 elif len(args) == 4: 267 usage(args) 268 sock = open_socket(args[1 if use_export else 2]) 269 rules = load_rules(args[2 if use_export else 3]) 270 run_server(sock, rules, use_export) 271 return 0 272 273if __name__ == '__main__': 274 sys.exit(main(sys.argv)) 275