1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3 4""" 5tdc.py - Linux tc (Traffic Control) unit test driver 6 7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com> 8""" 9 10import re 11import os 12import sys 13import argparse 14import json 15import subprocess 16from collections import OrderedDict 17from string import Template 18 19from tdc_config import * 20from tdc_helper import * 21 22 23USE_NS = True 24 25 26def replace_keywords(cmd): 27 """ 28 For a given executable command, substitute any known 29 variables contained within NAMES with the correct values 30 """ 31 tcmd = Template(cmd) 32 subcmd = tcmd.safe_substitute(NAMES) 33 return subcmd 34 35 36def exec_cmd(command, nsonly=True): 37 """ 38 Perform any required modifications on an executable command, then run 39 it in a subprocess and return the results. 40 """ 41 if (USE_NS and nsonly): 42 command = 'ip netns exec $NS ' + command 43 44 if '$' in command: 45 command = replace_keywords(command) 46 47 proc = subprocess.Popen(command, 48 shell=True, 49 stdout=subprocess.PIPE, 50 stderr=subprocess.PIPE) 51 (rawout, serr) = proc.communicate() 52 53 if proc.returncode != 0 and len(serr) > 0: 54 foutput = serr.decode("utf-8") 55 else: 56 foutput = rawout.decode("utf-8") 57 58 proc.stdout.close() 59 proc.stderr.close() 60 return proc, foutput 61 62 63def prepare_env(cmdlist): 64 """ 65 Execute the setup/teardown commands for a test case. Optionally 66 terminate test execution if the command fails. 67 """ 68 for cmdinfo in cmdlist: 69 if (type(cmdinfo) == list): 70 exit_codes = cmdinfo[1:] 71 cmd = cmdinfo[0] 72 else: 73 exit_codes = [0] 74 cmd = cmdinfo 75 76 if (len(cmd) == 0): 77 continue 78 79 (proc, foutput) = exec_cmd(cmd) 80 81 if proc.returncode not in exit_codes: 82 print 83 print("Could not execute:") 84 print(cmd) 85 print("\nError message:") 86 print(foutput) 87 print("\nAborting test run.") 88 ns_destroy() 89 exit(1) 90 91 92def test_runner(filtered_tests, args): 93 """ 94 Driver function for the unit tests. 95 96 Prints information about the tests being run, executes the setup and 97 teardown commands and the command under test itself. Also determines 98 success/failure based on the information in the test case and generates 99 TAP output accordingly. 100 """ 101 testlist = filtered_tests 102 tcount = len(testlist) 103 index = 1 104 tap = str(index) + ".." + str(tcount) + "\n" 105 106 for tidx in testlist: 107 result = True 108 tresult = "" 109 if "flower" in tidx["category"] and args.device == None: 110 continue 111 print("Test " + tidx["id"] + ": " + tidx["name"]) 112 prepare_env(tidx["setup"]) 113 (p, procout) = exec_cmd(tidx["cmdUnderTest"]) 114 exit_code = p.returncode 115 116 if (exit_code != int(tidx["expExitCode"])): 117 result = False 118 print("exit:", exit_code, int(tidx["expExitCode"])) 119 print(procout) 120 else: 121 match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL) 122 (p, procout) = exec_cmd(tidx["verifyCmd"]) 123 match_index = re.findall(match_pattern, procout) 124 if len(match_index) != int(tidx["matchCount"]): 125 result = False 126 127 if result == True: 128 tresult += "ok " 129 else: 130 tresult += "not ok " 131 tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n" 132 133 if result == False: 134 tap += procout 135 136 prepare_env(tidx["teardown"]) 137 index += 1 138 139 return tap 140 141 142def ns_create(): 143 """ 144 Create the network namespace in which the tests will be run and set up 145 the required network devices for it. 146 """ 147 if (USE_NS): 148 cmd = 'ip netns add $NS' 149 exec_cmd(cmd, False) 150 cmd = 'ip link add $DEV0 type veth peer name $DEV1' 151 exec_cmd(cmd, False) 152 cmd = 'ip link set $DEV1 netns $NS' 153 exec_cmd(cmd, False) 154 cmd = 'ip link set $DEV0 up' 155 exec_cmd(cmd, False) 156 cmd = 'ip -n $NS link set $DEV1 up' 157 exec_cmd(cmd, False) 158 cmd = 'ip link set $DEV2 netns $NS' 159 exec_cmd(cmd, False) 160 cmd = 'ip -n $NS link set $DEV2 up' 161 exec_cmd(cmd, False) 162 163 164def ns_destroy(): 165 """ 166 Destroy the network namespace for testing (and any associated network 167 devices as well) 168 """ 169 if (USE_NS): 170 cmd = 'ip netns delete $NS' 171 exec_cmd(cmd, False) 172 173 174def has_blank_ids(idlist): 175 """ 176 Search the list for empty ID fields and return true/false accordingly. 177 """ 178 return not(all(k for k in idlist)) 179 180 181def load_from_file(filename): 182 """ 183 Open the JSON file containing the test cases and return them 184 as list of ordered dictionary objects. 185 """ 186 try: 187 with open(filename) as test_data: 188 testlist = json.load(test_data, object_pairs_hook=OrderedDict) 189 except json.JSONDecodeError as jde: 190 print('IGNORING test case file {}\n\tBECAUSE: {}'.format(filename, jde)) 191 testlist = list() 192 else: 193 idlist = get_id_list(testlist) 194 if (has_blank_ids(idlist)): 195 for k in testlist: 196 k['filename'] = filename 197 return testlist 198 199 200def args_parse(): 201 """ 202 Create the argument parser. 203 """ 204 parser = argparse.ArgumentParser(description='Linux TC unit tests') 205 return parser 206 207 208def set_args(parser): 209 """ 210 Set the command line arguments for tdc. 211 """ 212 parser.add_argument('-p', '--path', type=str, 213 help='The full path to the tc executable to use') 214 parser.add_argument('-c', '--category', type=str, nargs='?', const='+c', 215 help='Run tests only from the specified category, or if no category is specified, list known categories.') 216 parser.add_argument('-f', '--file', type=str, 217 help='Run tests from the specified file') 218 parser.add_argument('-l', '--list', type=str, nargs='?', const="++", metavar='CATEGORY', 219 help='List all test cases, or those only within the specified category') 220 parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID', 221 help='Display the test case with specified id') 222 parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID', 223 help='Execute the single test case with specified ID') 224 parser.add_argument('-i', '--id', action='store_true', dest='gen_id', 225 help='Generate ID numbers for new test cases') 226 parser.add_argument('-d', '--device', 227 help='Execute the test case in flower category') 228 return parser 229 230 231def check_default_settings(args): 232 """ 233 Process any arguments overriding the default settings, and ensure the 234 settings are correct. 235 """ 236 # Allow for overriding specific settings 237 global NAMES 238 239 if args.path != None: 240 NAMES['TC'] = args.path 241 if args.device != None: 242 NAMES['DEV2'] = args.device 243 if not os.path.isfile(NAMES['TC']): 244 print("The specified tc path " + NAMES['TC'] + " does not exist.") 245 exit(1) 246 247 248def get_id_list(alltests): 249 """ 250 Generate a list of all IDs in the test cases. 251 """ 252 return [x["id"] for x in alltests] 253 254 255def check_case_id(alltests): 256 """ 257 Check for duplicate test case IDs. 258 """ 259 idl = get_id_list(alltests) 260 return [x for x in idl if idl.count(x) > 1] 261 262 263def does_id_exist(alltests, newid): 264 """ 265 Check if a given ID already exists in the list of test cases. 266 """ 267 idl = get_id_list(alltests) 268 return (any(newid == x for x in idl)) 269 270 271def generate_case_ids(alltests): 272 """ 273 If a test case has a blank ID field, generate a random hex ID for it 274 and then write the test cases back to disk. 275 """ 276 import random 277 for c in alltests: 278 if (c["id"] == ""): 279 while True: 280 newid = str('%04x' % random.randrange(16**4)) 281 if (does_id_exist(alltests, newid)): 282 continue 283 else: 284 c['id'] = newid 285 break 286 287 ufilename = [] 288 for c in alltests: 289 if ('filename' in c): 290 ufilename.append(c['filename']) 291 ufilename = get_unique_item(ufilename) 292 for f in ufilename: 293 testlist = [] 294 for t in alltests: 295 if 'filename' in t: 296 if t['filename'] == f: 297 del t['filename'] 298 testlist.append(t) 299 outfile = open(f, "w") 300 json.dump(testlist, outfile, indent=4) 301 outfile.close() 302 303 304def get_test_cases(args): 305 """ 306 If a test case file is specified, retrieve tests from that file. 307 Otherwise, glob for all json files in subdirectories and load from 308 each one. 309 """ 310 import fnmatch 311 if args.file != None: 312 if not os.path.isfile(args.file): 313 print("The specified test case file " + args.file + " does not exist.") 314 exit(1) 315 flist = [args.file] 316 else: 317 flist = [] 318 for root, dirnames, filenames in os.walk('tc-tests'): 319 for filename in fnmatch.filter(filenames, '*.json'): 320 flist.append(os.path.join(root, filename)) 321 alltests = list() 322 for casefile in flist: 323 alltests = alltests + (load_from_file(casefile)) 324 return alltests 325 326 327def set_operation_mode(args): 328 """ 329 Load the test case data and process remaining arguments to determine 330 what the script should do for this run, and call the appropriate 331 function. 332 """ 333 alltests = get_test_cases(args) 334 335 if args.gen_id: 336 idlist = get_id_list(alltests) 337 if (has_blank_ids(idlist)): 338 alltests = generate_case_ids(alltests) 339 else: 340 print("No empty ID fields found in test files.") 341 exit(0) 342 343 duplicate_ids = check_case_id(alltests) 344 if (len(duplicate_ids) > 0): 345 print("The following test case IDs are not unique:") 346 print(str(set(duplicate_ids))) 347 print("Please correct them before continuing.") 348 exit(1) 349 350 ucat = get_test_categories(alltests) 351 352 if args.showID: 353 show_test_case_by_id(alltests, args.showID[0]) 354 exit(0) 355 356 if args.execute: 357 target_id = args.execute[0] 358 else: 359 target_id = "" 360 361 if args.category: 362 if (args.category == '+c'): 363 print("Available categories:") 364 print_sll(ucat) 365 exit(0) 366 else: 367 target_category = args.category 368 else: 369 target_category = "" 370 371 372 testcases = get_categorized_testlist(alltests, ucat) 373 374 if args.list: 375 if (args.list == "++"): 376 list_test_cases(alltests) 377 exit(0) 378 elif(len(args.list) > 0): 379 if (args.list not in ucat): 380 print("Unknown category " + args.list) 381 print("Available categories:") 382 print_sll(ucat) 383 exit(1) 384 list_test_cases(testcases[args.list]) 385 exit(0) 386 387 if (os.geteuid() != 0): 388 print("This script must be run with root privileges.\n") 389 exit(1) 390 391 ns_create() 392 393 if (len(target_category) == 0): 394 if (len(target_id) > 0): 395 alltests = list(filter(lambda x: target_id in x['id'], alltests)) 396 if (len(alltests) == 0): 397 print("Cannot find a test case with ID matching " + target_id) 398 exit(1) 399 catresults = test_runner(alltests, args) 400 print("All test results: " + "\n\n" + catresults) 401 elif (len(target_category) > 0): 402 if (target_category == "flower") and args.device == None: 403 print("Please specify a NIC device (-d) to run category flower") 404 exit(1) 405 if (target_category not in ucat): 406 print("Specified category is not present in this file.") 407 exit(1) 408 else: 409 catresults = test_runner(testcases[target_category], args) 410 print("Category " + target_category + "\n\n" + catresults) 411 412 ns_destroy() 413 414 415def main(): 416 """ 417 Start of execution; set up argument parser and get the arguments, 418 and start operations. 419 """ 420 parser = args_parse() 421 parser = set_args(parser) 422 (args, remaining) = parser.parse_known_args() 423 check_default_settings(args) 424 425 set_operation_mode(args) 426 427 exit(0) 428 429 430if __name__ == "__main__": 431 main() 432