1#!/usr/bin/env python3 2 3""" 4tdc.py - Linux tc (Traffic Control) unit test driver 5 6Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com> 7""" 8 9import re 10import os 11import sys 12import argparse 13import json 14import subprocess 15from collections import OrderedDict 16from string import Template 17 18from tdc_config import * 19from tdc_helper import * 20 21 22USE_NS = True 23 24 25def replace_keywords(cmd): 26 """ 27 For a given executable command, substitute any known 28 variables contained within NAMES with the correct values 29 """ 30 tcmd = Template(cmd) 31 subcmd = tcmd.safe_substitute(NAMES) 32 return subcmd 33 34 35def exec_cmd(command, nsonly=True): 36 """ 37 Perform any required modifications on an executable command, then run 38 it in a subprocess and return the results. 39 """ 40 if (USE_NS and nsonly): 41 command = 'ip netns exec $NS ' + command 42 43 if '$' in command: 44 command = replace_keywords(command) 45 46 proc = subprocess.Popen(command, 47 shell=True, 48 stdout=subprocess.PIPE, 49 stderr=subprocess.PIPE) 50 (rawout, serr) = proc.communicate() 51 52 if proc.returncode != 0: 53 foutput = serr.decode("utf-8") 54 else: 55 foutput = rawout.decode("utf-8") 56 57 proc.stdout.close() 58 proc.stderr.close() 59 return proc, foutput 60 61 62def prepare_env(cmdlist): 63 """ 64 Execute the setup/teardown commands for a test case. Optionally 65 terminate test execution if the command fails. 66 """ 67 for cmdinfo in cmdlist: 68 if (type(cmdinfo) == list): 69 exit_codes = cmdinfo[1:] 70 cmd = cmdinfo[0] 71 else: 72 exit_codes = [0] 73 cmd = cmdinfo 74 75 if (len(cmd) == 0): 76 continue 77 78 (proc, foutput) = exec_cmd(cmd) 79 80 if proc.returncode not in exit_codes: 81 print 82 print("Could not execute:") 83 print(cmd) 84 print("\nError message:") 85 print(foutput) 86 print("\nAborting test run.") 87 ns_destroy() 88 exit(1) 89 90 91def test_runner(filtered_tests): 92 """ 93 Driver function for the unit tests. 94 95 Prints information about the tests being run, executes the setup and 96 teardown commands and the command under test itself. Also determines 97 success/failure based on the information in the test case and generates 98 TAP output accordingly. 99 """ 100 testlist = filtered_tests 101 tcount = len(testlist) 102 index = 1 103 tap = str(index) + ".." + str(tcount) + "\n" 104 105 for tidx in testlist: 106 result = True 107 tresult = "" 108 print("Test " + tidx["id"] + ": " + tidx["name"]) 109 prepare_env(tidx["setup"]) 110 (p, procout) = exec_cmd(tidx["cmdUnderTest"]) 111 exit_code = p.returncode 112 113 if (exit_code != int(tidx["expExitCode"])): 114 result = False 115 print("exit:", exit_code, int(tidx["expExitCode"])) 116 print(procout) 117 else: 118 match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL) 119 (p, procout) = exec_cmd(tidx["verifyCmd"]) 120 match_index = re.findall(match_pattern, procout) 121 if len(match_index) != int(tidx["matchCount"]): 122 result = False 123 124 if result == True: 125 tresult += "ok " 126 else: 127 tresult += "not ok " 128 tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n" 129 130 if result == False: 131 tap += procout 132 133 prepare_env(tidx["teardown"]) 134 index += 1 135 136 return tap 137 138 139def ns_create(): 140 """ 141 Create the network namespace in which the tests will be run and set up 142 the required network devices for it. 143 """ 144 if (USE_NS): 145 cmd = 'ip netns add $NS' 146 exec_cmd(cmd, False) 147 cmd = 'ip link add $DEV0 type veth peer name $DEV1' 148 exec_cmd(cmd, False) 149 cmd = 'ip link set $DEV1 netns $NS' 150 exec_cmd(cmd, False) 151 cmd = 'ip link set $DEV0 up' 152 exec_cmd(cmd, False) 153 cmd = 'ip -s $NS link set $DEV1 up' 154 exec_cmd(cmd, False) 155 156 157def ns_destroy(): 158 """ 159 Destroy the network namespace for testing (and any associated network 160 devices as well) 161 """ 162 if (USE_NS): 163 cmd = 'ip netns delete $NS' 164 exec_cmd(cmd, False) 165 166 167def has_blank_ids(idlist): 168 """ 169 Search the list for empty ID fields and return true/false accordingly. 170 """ 171 return not(all(k for k in idlist)) 172 173 174def load_from_file(filename): 175 """ 176 Open the JSON file containing the test cases and return them as an 177 ordered dictionary object. 178 """ 179 with open(filename) as test_data: 180 testlist = json.load(test_data, object_pairs_hook=OrderedDict) 181 idlist = get_id_list(testlist) 182 if (has_blank_ids(idlist)): 183 for k in testlist: 184 k['filename'] = filename 185 return testlist 186 187 188def args_parse(): 189 """ 190 Create the argument parser. 191 """ 192 parser = argparse.ArgumentParser(description='Linux TC unit tests') 193 return parser 194 195 196def set_args(parser): 197 """ 198 Set the command line arguments for tdc. 199 """ 200 parser.add_argument('-p', '--path', type=str, 201 help='The full path to the tc executable to use') 202 parser.add_argument('-c', '--category', type=str, nargs='?', const='+c', 203 help='Run tests only from the specified category, or if no category is specified, list known categories.') 204 parser.add_argument('-f', '--file', type=str, 205 help='Run tests from the specified file') 206 parser.add_argument('-l', '--list', type=str, nargs='?', const="", metavar='CATEGORY', 207 help='List all test cases, or those only within the specified category') 208 parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID', 209 help='Display the test case with specified id') 210 parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID', 211 help='Execute the single test case with specified ID') 212 parser.add_argument('-i', '--id', action='store_true', dest='gen_id', 213 help='Generate ID numbers for new test cases') 214 return parser 215 return parser 216 217 218def check_default_settings(args): 219 """ 220 Process any arguments overriding the default settings, and ensure the 221 settings are correct. 222 """ 223 # Allow for overriding specific settings 224 global NAMES 225 226 if args.path != None: 227 NAMES['TC'] = args.path 228 if not os.path.isfile(NAMES['TC']): 229 print("The specified tc path " + NAMES['TC'] + " does not exist.") 230 exit(1) 231 232 233def get_id_list(alltests): 234 """ 235 Generate a list of all IDs in the test cases. 236 """ 237 return [x["id"] for x in alltests] 238 239 240def check_case_id(alltests): 241 """ 242 Check for duplicate test case IDs. 243 """ 244 idl = get_id_list(alltests) 245 return [x for x in idl if idl.count(x) > 1] 246 247 248def does_id_exist(alltests, newid): 249 """ 250 Check if a given ID already exists in the list of test cases. 251 """ 252 idl = get_id_list(alltests) 253 return (any(newid == x for x in idl)) 254 255 256def generate_case_ids(alltests): 257 """ 258 If a test case has a blank ID field, generate a random hex ID for it 259 and then write the test cases back to disk. 260 """ 261 import random 262 for c in alltests: 263 if (c["id"] == ""): 264 while True: 265 newid = str('%04x' % random.randrange(16**4)) 266 if (does_id_exist(alltests, newid)): 267 continue 268 else: 269 c['id'] = newid 270 break 271 272 ufilename = [] 273 for c in alltests: 274 if ('filename' in c): 275 ufilename.append(c['filename']) 276 ufilename = get_unique_item(ufilename) 277 for f in ufilename: 278 testlist = [] 279 for t in alltests: 280 if 'filename' in t: 281 if t['filename'] == f: 282 del t['filename'] 283 testlist.append(t) 284 outfile = open(f, "w") 285 json.dump(testlist, outfile, indent=4) 286 outfile.close() 287 288 289def get_test_cases(args): 290 """ 291 If a test case file is specified, retrieve tests from that file. 292 Otherwise, glob for all json files in subdirectories and load from 293 each one. 294 """ 295 import fnmatch 296 if args.file != None: 297 if not os.path.isfile(args.file): 298 print("The specified test case file " + args.file + " does not exist.") 299 exit(1) 300 flist = [args.file] 301 else: 302 flist = [] 303 for root, dirnames, filenames in os.walk('tc-tests'): 304 for filename in fnmatch.filter(filenames, '*.json'): 305 flist.append(os.path.join(root, filename)) 306 alltests = list() 307 for casefile in flist: 308 alltests = alltests + (load_from_file(casefile)) 309 return alltests 310 311 312def set_operation_mode(args): 313 """ 314 Load the test case data and process remaining arguments to determine 315 what the script should do for this run, and call the appropriate 316 function. 317 """ 318 alltests = get_test_cases(args) 319 320 if args.gen_id: 321 idlist = get_id_list(alltests) 322 if (has_blank_ids(idlist)): 323 alltests = generate_case_ids(alltests) 324 else: 325 print("No empty ID fields found in test files.") 326 exit(0) 327 328 duplicate_ids = check_case_id(alltests) 329 if (len(duplicate_ids) > 0): 330 print("The following test case IDs are not unique:") 331 print(str(set(duplicate_ids))) 332 print("Please correct them before continuing.") 333 exit(1) 334 335 ucat = get_test_categories(alltests) 336 337 if args.showID: 338 show_test_case_by_id(alltests, args.showID[0]) 339 exit(0) 340 341 if args.execute: 342 target_id = args.execute[0] 343 else: 344 target_id = "" 345 346 if args.category: 347 if (args.category == '+c'): 348 print("Available categories:") 349 print_sll(ucat) 350 exit(0) 351 else: 352 target_category = args.category 353 else: 354 target_category = "" 355 356 357 testcases = get_categorized_testlist(alltests, ucat) 358 359 if args.list: 360 if (len(args.list) == 0): 361 list_test_cases(alltests) 362 exit(0) 363 elif(len(args.list > 0)): 364 if (args.list not in ucat): 365 print("Unknown category " + args.list) 366 print("Available categories:") 367 print_sll(ucat) 368 exit(1) 369 list_test_cases(testcases[args.list]) 370 exit(0) 371 372 if (os.geteuid() != 0): 373 print("This script must be run with root privileges.\n") 374 exit(1) 375 376 ns_create() 377 378 if (len(target_category) == 0): 379 if (len(target_id) > 0): 380 alltests = list(filter(lambda x: target_id in x['id'], alltests)) 381 if (len(alltests) == 0): 382 print("Cannot find a test case with ID matching " + target_id) 383 exit(1) 384 catresults = test_runner(alltests) 385 print("All test results: " + "\n\n" + catresults) 386 elif (len(target_category) > 0): 387 if (target_category not in ucat): 388 print("Specified category is not present in this file.") 389 exit(1) 390 else: 391 catresults = test_runner(testcases[target_category]) 392 print("Category " + target_category + "\n\n" + catresults) 393 394 ns_destroy() 395 396 397def main(): 398 """ 399 Start of execution; set up argument parser and get the arguments, 400 and start operations. 401 """ 402 parser = args_parse() 403 parser = set_args(parser) 404 (args, remaining) = parser.parse_known_args() 405 check_default_settings(args) 406 407 set_operation_mode(args) 408 409 exit(0) 410 411 412if __name__ == "__main__": 413 main() 414