1#!/usr/bin/env python 2 3# Tool for running fuzz tests 4# 5# Copyright (C) 2014 Maria Kustova <maria.k@catit.be> 6# 7# This program is free software: you can redistribute it and/or modify 8# it under the terms of the GNU General Public License as published by 9# the Free Software Foundation, either version 2 of the License, or 10# (at your option) any later version. 11# 12# This program is distributed in the hope that it will be useful, 13# but WITHOUT ANY WARRANTY; without even the implied warranty of 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15# GNU General Public License for more details. 16# 17# You should have received a copy of the GNU General Public License 18# along with this program. If not, see <http://www.gnu.org/licenses/>. 19# 20 21import sys 22import os 23import signal 24import subprocess 25import random 26import shutil 27from itertools import count 28import getopt 29import StringIO 30import resource 31 32try: 33 import json 34except ImportError: 35 try: 36 import simplejson as json 37 except ImportError: 38 print >>sys.stderr, \ 39 "Warning: Module for JSON processing is not found.\n" \ 40 "'--config' and '--command' options are not supported." 41 42# Backing file sizes in MB 43MAX_BACKING_FILE_SIZE = 10 44MIN_BACKING_FILE_SIZE = 1 45 46 47def multilog(msg, *output): 48 """ Write an object to all of specified file descriptors.""" 49 for fd in output: 50 fd.write(msg) 51 fd.flush() 52 53 54def str_signal(sig): 55 """ Convert a numeric value of a system signal to the string one 56 defined by the current operational system. 57 """ 58 for k, v in signal.__dict__.items(): 59 if v == sig: 60 return k 61 62 63def run_app(fd, q_args): 64 """Start an application with specified arguments and return its exit code 65 or kill signal depending on the result of execution. 66 """ 67 devnull = open('/dev/null', 'r+') 68 process = subprocess.Popen(q_args, stdin=devnull, 69 stdout=subprocess.PIPE, 70 stderr=subprocess.PIPE) 71 out, err = process.communicate() 72 fd.write(out) 73 fd.write(err) 74 return process.returncode 75 76 77class TestException(Exception): 78 """Exception for errors risen by TestEnv objects.""" 79 pass 80 81 82class TestEnv(object): 83 84 """Test object. 85 86 The class sets up test environment, generates backing and test images 87 and executes application under tests with specified arguments and a test 88 image provided. 89 90 All logs are collected. 91 92 The summary log will contain short descriptions and statuses of tests in 93 a run. 94 95 The test log will include application (e.g. 'qemu-img') logs besides info 96 sent to the summary log. 97 """ 98 99 def __init__(self, test_id, seed, work_dir, run_log, 100 cleanup=True, log_all=False): 101 """Set test environment in a specified work directory. 102 103 Path to qemu-img and qemu-io will be retrieved from 'QEMU_IMG' and 104 'QEMU_IO' environment variables. 105 """ 106 if seed is not None: 107 self.seed = seed 108 else: 109 self.seed = str(random.randint(0, sys.maxint)) 110 random.seed(self.seed) 111 112 self.init_path = os.getcwd() 113 self.work_dir = work_dir 114 self.current_dir = os.path.join(work_dir, 'test-' + test_id) 115 self.qemu_img = os.environ.get('QEMU_IMG', 'qemu-img')\ 116 .strip().split(' ') 117 self.qemu_io = os.environ.get('QEMU_IO', 'qemu-io').strip().split(' ') 118 self.commands = [['qemu-img', 'check', '-f', 'qcow2', '$test_img'], 119 ['qemu-img', 'info', '-f', 'qcow2', '$test_img'], 120 ['qemu-io', '$test_img', '-c', 'read $off $len'], 121 ['qemu-io', '$test_img', '-c', 'write $off $len'], 122 ['qemu-io', '$test_img', '-c', 123 'aio_read $off $len'], 124 ['qemu-io', '$test_img', '-c', 125 'aio_write $off $len'], 126 ['qemu-io', '$test_img', '-c', 'flush'], 127 ['qemu-io', '$test_img', '-c', 128 'discard $off $len'], 129 ['qemu-io', '$test_img', '-c', 130 'truncate $off']] 131 for fmt in ['raw', 'vmdk', 'vdi', 'cow', 'qcow2', 'file', 132 'qed', 'vpc']: 133 self.commands.append( 134 ['qemu-img', 'convert', '-f', 'qcow2', '-O', fmt, 135 '$test_img', 'converted_image.' + fmt]) 136 137 try: 138 os.makedirs(self.current_dir) 139 except OSError, e: 140 print >>sys.stderr, \ 141 "Error: The working directory '%s' cannot be used. Reason: %s"\ 142 % (self.work_dir, e[1]) 143 raise TestException 144 self.log = open(os.path.join(self.current_dir, "test.log"), "w") 145 self.parent_log = open(run_log, "a") 146 self.failed = False 147 self.cleanup = cleanup 148 self.log_all = log_all 149 150 def _create_backing_file(self): 151 """Create a backing file in the current directory. 152 153 Return a tuple of a backing file name and format. 154 155 Format of a backing file is randomly chosen from all formats supported 156 by 'qemu-img create'. 157 """ 158 # All formats supported by the 'qemu-img create' command. 159 backing_file_fmt = random.choice(['raw', 'vmdk', 'vdi', 'cow', 'qcow2', 160 'file', 'qed', 'vpc']) 161 backing_file_name = 'backing_img.' + backing_file_fmt 162 backing_file_size = random.randint(MIN_BACKING_FILE_SIZE, 163 MAX_BACKING_FILE_SIZE) * (1 << 20) 164 cmd = self.qemu_img + ['create', '-f', backing_file_fmt, 165 backing_file_name, str(backing_file_size)] 166 temp_log = StringIO.StringIO() 167 retcode = run_app(temp_log, cmd) 168 if retcode == 0: 169 temp_log.close() 170 return (backing_file_name, backing_file_fmt) 171 else: 172 multilog("Warning: The %s backing file was not created.\n\n" 173 % backing_file_fmt, sys.stderr, self.log, self.parent_log) 174 self.log.write("Log for the failure:\n" + temp_log.getvalue() + 175 '\n\n') 176 temp_log.close() 177 return (None, None) 178 179 def execute(self, input_commands=None, fuzz_config=None): 180 """ Execute a test. 181 182 The method creates backing and test images, runs test app and analyzes 183 its exit status. If the application was killed by a signal, the test 184 is marked as failed. 185 """ 186 if input_commands is None: 187 commands = self.commands 188 else: 189 commands = input_commands 190 191 os.chdir(self.current_dir) 192 backing_file_name, backing_file_fmt = self._create_backing_file() 193 img_size = image_generator.create_image('test.img', 194 backing_file_name, 195 backing_file_fmt, 196 fuzz_config) 197 for item in commands: 198 shutil.copy('test.img', 'copy.img') 199 # 'off' and 'len' are multiple of the sector size 200 sector_size = 512 201 start = random.randrange(0, img_size + 1, sector_size) 202 end = random.randrange(start, img_size + 1, sector_size) 203 204 if item[0] == 'qemu-img': 205 current_cmd = list(self.qemu_img) 206 elif item[0] == 'qemu-io': 207 current_cmd = list(self.qemu_io) 208 else: 209 multilog("Warning: test command '%s' is not defined.\n" \ 210 % item[0], sys.stderr, self.log, self.parent_log) 211 continue 212 # Replace all placeholders with their real values 213 for v in item[1:]: 214 c = (v 215 .replace('$test_img', 'copy.img') 216 .replace('$off', str(start)) 217 .replace('$len', str(end - start))) 218 current_cmd.append(c) 219 220 # Log string with the test header 221 test_summary = "Seed: %s\nCommand: %s\nTest directory: %s\n" \ 222 "Backing file: %s\n" \ 223 % (self.seed, " ".join(current_cmd), 224 self.current_dir, backing_file_name) 225 226 temp_log = StringIO.StringIO() 227 try: 228 retcode = run_app(temp_log, current_cmd) 229 except OSError, e: 230 multilog(test_summary + "Error: Start of '%s' failed. " \ 231 "Reason: %s\n\n" % (os.path.basename( 232 current_cmd[0]), e[1]), 233 sys.stderr, self.log, self.parent_log) 234 raise TestException 235 236 if retcode < 0: 237 self.log.write(temp_log.getvalue()) 238 multilog(test_summary + "FAIL: Test terminated by signal " + 239 "%s\n\n" % str_signal(-retcode), sys.stderr, self.log, 240 self.parent_log) 241 self.failed = True 242 else: 243 if self.log_all: 244 self.log.write(temp_log.getvalue()) 245 multilog(test_summary + "PASS: Application exited with" + 246 " the code '%d'\n\n" % retcode, sys.stdout, 247 self.log, self.parent_log) 248 temp_log.close() 249 os.remove('copy.img') 250 251 def finish(self): 252 """Restore the test environment after a test execution.""" 253 self.log.close() 254 self.parent_log.close() 255 os.chdir(self.init_path) 256 if self.cleanup and not self.failed: 257 shutil.rmtree(self.current_dir) 258 259if __name__ == '__main__': 260 261 def usage(): 262 print """ 263 Usage: runner.py [OPTION...] TEST_DIR IMG_GENERATOR 264 265 Set up test environment in TEST_DIR and run a test in it. A module for 266 test image generation should be specified via IMG_GENERATOR. 267 Example: 268 runner.py -c '[["qemu-img", "info", "$test_img"]]' /tmp/test qcow2 269 270 Optional arguments: 271 -h, --help display this help and exit 272 -c, --command=JSON run tests for all commands specified in 273 the JSON array 274 -s, --seed=STRING seed for a test image generation, 275 by default will be generated randomly 276 --config=JSON take fuzzer configuration from the JSON 277 array 278 -k, --keep_passed don't remove folders of passed tests 279 -v, --verbose log information about passed tests 280 281 JSON: 282 283 '--command' accepts a JSON array of commands. Each command presents 284 an application under test with all its paramaters as a list of strings, 285 e.g. 286 ["qemu-io", "$test_img", "-c", "write $off $len"] 287 288 Supported application aliases: 'qemu-img' and 'qemu-io'. 289 Supported argument aliases: $test_img for the fuzzed image, $off 290 for an offset, $len for length. 291 292 Values for $off and $len will be generated based on the virtual disk 293 size of the fuzzed image 294 Paths to 'qemu-img' and 'qemu-io' are retrevied from 'QEMU_IMG' and 295 'QEMU_IO' environment variables 296 297 '--config' accepts a JSON array of fields to be fuzzed, e.g. 298 '[["header"], ["header", "version"]]' 299 Each of the list elements can consist of a complex image element only 300 as ["header"] or ["feature_name_table"] or an exact field as 301 ["header", "version"]. In the first case random portion of the element 302 fields will be fuzzed, in the second one the specified field will be 303 fuzzed always. 304 305 If '--config' argument is specified, fields not listed in 306 the configuration array will not be fuzzed. 307 """ 308 309 def run_test(test_id, seed, work_dir, run_log, cleanup, log_all, 310 command, fuzz_config): 311 """Setup environment for one test and execute this test.""" 312 try: 313 test = TestEnv(test_id, seed, work_dir, run_log, cleanup, 314 log_all) 315 except TestException: 316 sys.exit(1) 317 318 # Python 2.4 doesn't support 'finally' and 'except' in the same 'try' 319 # block 320 try: 321 try: 322 test.execute(command, fuzz_config) 323 except TestException: 324 sys.exit(1) 325 finally: 326 test.finish() 327 328 try: 329 opts, args = getopt.gnu_getopt(sys.argv[1:], 'c:hs:kv', 330 ['command=', 'help', 'seed=', 'config=', 331 'keep_passed', 'verbose']) 332 except getopt.error, e: 333 print >>sys.stderr, \ 334 "Error: %s\n\nTry 'runner.py --help' for more information" % e 335 sys.exit(1) 336 337 command = None 338 cleanup = True 339 log_all = False 340 seed = None 341 config = None 342 for opt, arg in opts: 343 if opt in ('-h', '--help'): 344 usage() 345 sys.exit() 346 elif opt in ('-c', '--command'): 347 try: 348 command = json.loads(arg) 349 except (TypeError, ValueError, NameError), e: 350 print >>sys.stderr, \ 351 "Error: JSON array of test commands cannot be loaded.\n" \ 352 "Reason: %s" % e 353 sys.exit(1) 354 elif opt in ('-k', '--keep_passed'): 355 cleanup = False 356 elif opt in ('-v', '--verbose'): 357 log_all = True 358 elif opt in ('-s', '--seed'): 359 seed = arg 360 elif opt == '--config': 361 try: 362 config = json.loads(arg) 363 except (TypeError, ValueError, NameError), e: 364 print >>sys.stderr, \ 365 "Error: JSON array with the fuzzer configuration cannot" \ 366 " be loaded\nReason: %s" % e 367 sys.exit(1) 368 369 if not len(args) == 2: 370 print >>sys.stderr, \ 371 "Expected two parameters\nTry 'runner.py --help'" \ 372 " for more information." 373 sys.exit(1) 374 375 work_dir = os.path.realpath(args[0]) 376 # run_log is created in 'main', because multiple tests are expected to 377 # log in it 378 run_log = os.path.join(work_dir, 'run.log') 379 380 # Add the path to the image generator module to sys.path 381 sys.path.append(os.path.realpath(os.path.dirname(args[1]))) 382 # Remove a script extension from image generator module if any 383 generator_name = os.path.splitext(os.path.basename(args[1]))[0] 384 385 try: 386 image_generator = __import__(generator_name) 387 except ImportError, e: 388 print >>sys.stderr, \ 389 "Error: The image generator '%s' cannot be imported.\n" \ 390 "Reason: %s" % (generator_name, e) 391 sys.exit(1) 392 393 # Enable core dumps 394 resource.setrlimit(resource.RLIMIT_CORE, (-1, -1)) 395 # If a seed is specified, only one test will be executed. 396 # Otherwise runner will terminate after a keyboard interruption 397 for test_id in count(1): 398 try: 399 run_test(str(test_id), seed, work_dir, run_log, cleanup, 400 log_all, command, config) 401 except (KeyboardInterrupt, SystemExit): 402 sys.exit(1) 403 404 if seed is not None: 405 break 406