1#!/usr/bin/env python3
2
3"""
4tdc.py - Linux tc (Traffic Control) unit test driver
5
6Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
7"""
8
9import re
10import os
11import sys
12import argparse
13import json
14import subprocess
15from collections import OrderedDict
16from string import Template
17
18from tdc_config import *
19from tdc_helper import *
20
21
22USE_NS = True
23
24
25def replace_keywords(cmd):
26    """
27    For a given executable command, substitute any known
28    variables contained within NAMES with the correct values
29    """
30    tcmd = Template(cmd)
31    subcmd = tcmd.safe_substitute(NAMES)
32    return subcmd
33
34
35def exec_cmd(command, nsonly=True):
36    """
37    Perform any required modifications on an executable command, then run
38    it in a subprocess and return the results.
39    """
40    if (USE_NS and nsonly):
41        command = 'ip netns exec $NS ' + command
42
43    if '$' in command:
44        command = replace_keywords(command)
45
46    proc = subprocess.Popen(command,
47        shell=True,
48        stdout=subprocess.PIPE,
49        stderr=subprocess.PIPE)
50    (rawout, serr) = proc.communicate()
51
52    if proc.returncode != 0:
53        foutput = serr.decode("utf-8")
54    else:
55        foutput = rawout.decode("utf-8")
56
57    proc.stdout.close()
58    proc.stderr.close()
59    return proc, foutput
60
61
62def prepare_env(cmdlist):
63    """
64    Execute the setup/teardown commands for a test case. Optionally
65    terminate test execution if the command fails.
66    """
67    for cmdinfo in cmdlist:
68        if (type(cmdinfo) == list):
69            exit_codes = cmdinfo[1:]
70            cmd = cmdinfo[0]
71        else:
72            exit_codes = [0]
73            cmd = cmdinfo
74
75        if (len(cmd) == 0):
76            continue
77
78        (proc, foutput) = exec_cmd(cmd)
79
80        if proc.returncode not in exit_codes:
81            print
82            print("Could not execute:")
83            print(cmd)
84            print("\nError message:")
85            print(foutput)
86            print("\nAborting test run.")
87            ns_destroy()
88            exit(1)
89
90
91def test_runner(filtered_tests, args):
92    """
93    Driver function for the unit tests.
94
95    Prints information about the tests being run, executes the setup and
96    teardown commands and the command under test itself. Also determines
97    success/failure based on the information in the test case and generates
98    TAP output accordingly.
99    """
100    testlist = filtered_tests
101    tcount = len(testlist)
102    index = 1
103    tap = str(index) + ".." + str(tcount) + "\n"
104
105    for tidx in testlist:
106        result = True
107        tresult = ""
108        if "flower" in tidx["category"] and args.device == None:
109            continue
110        print("Test " + tidx["id"] + ": " + tidx["name"])
111        prepare_env(tidx["setup"])
112        (p, procout) = exec_cmd(tidx["cmdUnderTest"])
113        exit_code = p.returncode
114
115        if (exit_code != int(tidx["expExitCode"])):
116            result = False
117            print("exit:", exit_code, int(tidx["expExitCode"]))
118            print(procout)
119        else:
120            match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL)
121            (p, procout) = exec_cmd(tidx["verifyCmd"])
122            match_index = re.findall(match_pattern, procout)
123            if len(match_index) != int(tidx["matchCount"]):
124                result = False
125
126        if result == True:
127            tresult += "ok "
128        else:
129            tresult += "not ok "
130        tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n"
131
132        if result == False:
133            tap += procout
134
135        prepare_env(tidx["teardown"])
136        index += 1
137
138    return tap
139
140
141def ns_create():
142    """
143    Create the network namespace in which the tests will be run and set up
144    the required network devices for it.
145    """
146    if (USE_NS):
147        cmd = 'ip netns add $NS'
148        exec_cmd(cmd, False)
149        cmd = 'ip link add $DEV0 type veth peer name $DEV1'
150        exec_cmd(cmd, False)
151        cmd = 'ip link set $DEV1 netns $NS'
152        exec_cmd(cmd, False)
153        cmd = 'ip link set $DEV0 up'
154        exec_cmd(cmd, False)
155        cmd = 'ip -s $NS link set $DEV1 up'
156        exec_cmd(cmd, False)
157        cmd = 'ip link set $DEV2 netns $NS'
158        exec_cmd(cmd, False)
159        cmd = 'ip -s $NS link set $DEV2 up'
160        exec_cmd(cmd, False)
161
162
163def ns_destroy():
164    """
165    Destroy the network namespace for testing (and any associated network
166    devices as well)
167    """
168    if (USE_NS):
169        cmd = 'ip netns delete $NS'
170        exec_cmd(cmd, False)
171
172
173def has_blank_ids(idlist):
174    """
175    Search the list for empty ID fields and return true/false accordingly.
176    """
177    return not(all(k for k in idlist))
178
179
180def load_from_file(filename):
181    """
182    Open the JSON file containing the test cases and return them as an
183    ordered dictionary object.
184    """
185    with open(filename) as test_data:
186        testlist = json.load(test_data, object_pairs_hook=OrderedDict)
187    idlist = get_id_list(testlist)
188    if (has_blank_ids(idlist)):
189        for k in testlist:
190            k['filename'] = filename
191    return testlist
192
193
194def args_parse():
195    """
196    Create the argument parser.
197    """
198    parser = argparse.ArgumentParser(description='Linux TC unit tests')
199    return parser
200
201
202def set_args(parser):
203    """
204    Set the command line arguments for tdc.
205    """
206    parser.add_argument('-p', '--path', type=str,
207                        help='The full path to the tc executable to use')
208    parser.add_argument('-c', '--category', type=str, nargs='?', const='+c',
209                        help='Run tests only from the specified category, or if no category is specified, list known categories.')
210    parser.add_argument('-f', '--file', type=str,
211                        help='Run tests from the specified file')
212    parser.add_argument('-l', '--list', type=str, nargs='?', const="", metavar='CATEGORY',
213                        help='List all test cases, or those only within the specified category')
214    parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID',
215                        help='Display the test case with specified id')
216    parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID',
217                        help='Execute the single test case with specified ID')
218    parser.add_argument('-i', '--id', action='store_true', dest='gen_id',
219                        help='Generate ID numbers for new test cases')
220    parser.add_argument('-d', '--device',
221                        help='Execute the test case in flower category')
222    return parser
223
224
225def check_default_settings(args):
226    """
227    Process any arguments overriding the default settings, and ensure the
228    settings are correct.
229    """
230    # Allow for overriding specific settings
231    global NAMES
232
233    if args.path != None:
234         NAMES['TC'] = args.path
235    if args.device != None:
236         NAMES['DEV2'] = args.device
237    if not os.path.isfile(NAMES['TC']):
238        print("The specified tc path " + NAMES['TC'] + " does not exist.")
239        exit(1)
240
241
242def get_id_list(alltests):
243    """
244    Generate a list of all IDs in the test cases.
245    """
246    return [x["id"] for x in alltests]
247
248
249def check_case_id(alltests):
250    """
251    Check for duplicate test case IDs.
252    """
253    idl = get_id_list(alltests)
254    return [x for x in idl if idl.count(x) > 1]
255
256
257def does_id_exist(alltests, newid):
258    """
259    Check if a given ID already exists in the list of test cases.
260    """
261    idl = get_id_list(alltests)
262    return (any(newid == x for x in idl))
263
264
265def generate_case_ids(alltests):
266    """
267    If a test case has a blank ID field, generate a random hex ID for it
268    and then write the test cases back to disk.
269    """
270    import random
271    for c in alltests:
272        if (c["id"] == ""):
273            while True:
274                newid = str('%04x' % random.randrange(16**4))
275                if (does_id_exist(alltests, newid)):
276                    continue
277                else:
278                    c['id'] = newid
279                    break
280
281    ufilename = []
282    for c in alltests:
283        if ('filename' in c):
284            ufilename.append(c['filename'])
285    ufilename = get_unique_item(ufilename)
286    for f in ufilename:
287        testlist = []
288        for t in alltests:
289            if 'filename' in t:
290                if t['filename'] == f:
291                    del t['filename']
292                    testlist.append(t)
293        outfile = open(f, "w")
294        json.dump(testlist, outfile, indent=4)
295        outfile.close()
296
297
298def get_test_cases(args):
299    """
300    If a test case file is specified, retrieve tests from that file.
301    Otherwise, glob for all json files in subdirectories and load from
302    each one.
303    """
304    import fnmatch
305    if args.file != None:
306        if not os.path.isfile(args.file):
307            print("The specified test case file " + args.file + " does not exist.")
308            exit(1)
309        flist = [args.file]
310    else:
311        flist = []
312        for root, dirnames, filenames in os.walk('tc-tests'):
313            for filename in fnmatch.filter(filenames, '*.json'):
314                flist.append(os.path.join(root, filename))
315    alltests = list()
316    for casefile in flist:
317        alltests = alltests + (load_from_file(casefile))
318    return alltests
319
320
321def set_operation_mode(args):
322    """
323    Load the test case data and process remaining arguments to determine
324    what the script should do for this run, and call the appropriate
325    function.
326    """
327    alltests = get_test_cases(args)
328
329    if args.gen_id:
330        idlist = get_id_list(alltests)
331        if (has_blank_ids(idlist)):
332            alltests = generate_case_ids(alltests)
333        else:
334            print("No empty ID fields found in test files.")
335        exit(0)
336
337    duplicate_ids = check_case_id(alltests)
338    if (len(duplicate_ids) > 0):
339        print("The following test case IDs are not unique:")
340        print(str(set(duplicate_ids)))
341        print("Please correct them before continuing.")
342        exit(1)
343
344    ucat = get_test_categories(alltests)
345
346    if args.showID:
347        show_test_case_by_id(alltests, args.showID[0])
348        exit(0)
349
350    if args.execute:
351        target_id = args.execute[0]
352    else:
353        target_id = ""
354
355    if args.category:
356        if (args.category == '+c'):
357            print("Available categories:")
358            print_sll(ucat)
359            exit(0)
360        else:
361            target_category = args.category
362    else:
363        target_category = ""
364
365
366    testcases = get_categorized_testlist(alltests, ucat)
367
368    if args.list:
369        if (len(args.list) == 0):
370            list_test_cases(alltests)
371            exit(0)
372        elif(len(args.list > 0)):
373            if (args.list not in ucat):
374                print("Unknown category " + args.list)
375                print("Available categories:")
376                print_sll(ucat)
377                exit(1)
378            list_test_cases(testcases[args.list])
379            exit(0)
380
381    if (os.geteuid() != 0):
382        print("This script must be run with root privileges.\n")
383        exit(1)
384
385    ns_create()
386
387    if (len(target_category) == 0):
388        if (len(target_id) > 0):
389            alltests = list(filter(lambda x: target_id in x['id'], alltests))
390            if (len(alltests) == 0):
391                print("Cannot find a test case with ID matching " + target_id)
392                exit(1)
393        catresults = test_runner(alltests, args)
394        print("All test results: " + "\n\n" + catresults)
395    elif (len(target_category) > 0):
396        if (target_category == "flower") and args.device == None:
397            print("Please specify a NIC device (-d) to run category flower")
398            exit(1)
399        if (target_category not in ucat):
400            print("Specified category is not present in this file.")
401            exit(1)
402        else:
403            catresults = test_runner(testcases[target_category], args)
404            print("Category " + target_category + "\n\n" + catresults)
405
406    ns_destroy()
407
408
409def main():
410    """
411    Start of execution; set up argument parser and get the arguments,
412    and start operations.
413    """
414    parser = args_parse()
415    parser = set_args(parser)
416    (args, remaining) = parser.parse_known_args()
417    check_default_settings(args)
418
419    set_operation_mode(args)
420
421    exit(0)
422
423
424if __name__ == "__main__":
425    main()
426