1#!/usr/bin/env python3
2
3"""
4tdc.py - Linux tc (Traffic Control) unit test driver
5
6Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
7"""
8
9import re
10import os
11import sys
12import argparse
13import json
14import subprocess
15from collections import OrderedDict
16from string import Template
17
18from tdc_config import *
19from tdc_helper import *
20
21
22USE_NS = True
23
24
25def replace_keywords(cmd):
26    """
27    For a given executable command, substitute any known
28    variables contained within NAMES with the correct values
29    """
30    tcmd = Template(cmd)
31    subcmd = tcmd.safe_substitute(NAMES)
32    return subcmd
33
34
35def exec_cmd(command, nsonly=True):
36    """
37    Perform any required modifications on an executable command, then run
38    it in a subprocess and return the results.
39    """
40    if (USE_NS and nsonly):
41        command = 'ip netns exec $NS ' + command
42
43    if '$' in command:
44        command = replace_keywords(command)
45
46    proc = subprocess.Popen(command,
47        shell=True,
48        stdout=subprocess.PIPE,
49        stderr=subprocess.PIPE)
50    (rawout, serr) = proc.communicate()
51
52    if proc.returncode != 0:
53        foutput = serr.decode("utf-8")
54    else:
55        foutput = rawout.decode("utf-8")
56
57    proc.stdout.close()
58    proc.stderr.close()
59    return proc, foutput
60
61
62def prepare_env(cmdlist):
63    """
64    Execute the setup/teardown commands for a test case. Optionally
65    terminate test execution if the command fails.
66    """
67    for cmdinfo in cmdlist:
68        if (type(cmdinfo) == list):
69            exit_codes = cmdinfo[1:]
70            cmd = cmdinfo[0]
71        else:
72            exit_codes = [0]
73            cmd = cmdinfo
74
75        if (len(cmd) == 0):
76            continue
77
78        (proc, foutput) = exec_cmd(cmd)
79
80        if proc.returncode not in exit_codes:
81            print
82            print("Could not execute:")
83            print(cmd)
84            print("\nError message:")
85            print(foutput)
86            print("\nAborting test run.")
87            ns_destroy()
88            exit(1)
89
90
91def test_runner(filtered_tests):
92    """
93    Driver function for the unit tests.
94
95    Prints information about the tests being run, executes the setup and
96    teardown commands and the command under test itself. Also determines
97    success/failure based on the information in the test case and generates
98    TAP output accordingly.
99    """
100    testlist = filtered_tests
101    tcount = len(testlist)
102    index = 1
103    tap = str(index) + ".." + str(tcount) + "\n"
104
105    for tidx in testlist:
106        result = True
107        tresult = ""
108        print("Test " + tidx["id"] + ": " + tidx["name"])
109        prepare_env(tidx["setup"])
110        (p, procout) = exec_cmd(tidx["cmdUnderTest"])
111        exit_code = p.returncode
112
113        if (exit_code != int(tidx["expExitCode"])):
114            result = False
115            print("exit:", exit_code, int(tidx["expExitCode"]))
116            print(procout)
117        else:
118            match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL)
119            (p, procout) = exec_cmd(tidx["verifyCmd"])
120            match_index = re.findall(match_pattern, procout)
121            if len(match_index) != int(tidx["matchCount"]):
122                result = False
123
124        if result == True:
125            tresult += "ok "
126        else:
127            tresult += "not ok "
128        tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n"
129
130        if result == False:
131            tap += procout
132
133        prepare_env(tidx["teardown"])
134        index += 1
135
136    return tap
137
138
139def ns_create():
140    """
141    Create the network namespace in which the tests will be run and set up
142    the required network devices for it.
143    """
144    if (USE_NS):
145        cmd = 'ip netns add $NS'
146        exec_cmd(cmd, False)
147        cmd = 'ip link add $DEV0 type veth peer name $DEV1'
148        exec_cmd(cmd, False)
149        cmd = 'ip link set $DEV1 netns $NS'
150        exec_cmd(cmd, False)
151        cmd = 'ip link set $DEV0 up'
152        exec_cmd(cmd, False)
153        cmd = 'ip -s $NS link set $DEV1 up'
154        exec_cmd(cmd, False)
155
156
157def ns_destroy():
158    """
159    Destroy the network namespace for testing (and any associated network
160    devices as well)
161    """
162    if (USE_NS):
163        cmd = 'ip netns delete $NS'
164        exec_cmd(cmd, False)
165
166
167def has_blank_ids(idlist):
168    """
169    Search the list for empty ID fields and return true/false accordingly.
170    """
171    return not(all(k for k in idlist))
172
173
174def load_from_file(filename):
175    """
176    Open the JSON file containing the test cases and return them as an
177    ordered dictionary object.
178    """
179    with open(filename) as test_data:
180        testlist = json.load(test_data, object_pairs_hook=OrderedDict)
181    idlist = get_id_list(testlist)
182    if (has_blank_ids(idlist)):
183        for k in testlist:
184            k['filename'] = filename
185    return testlist
186
187
188def args_parse():
189    """
190    Create the argument parser.
191    """
192    parser = argparse.ArgumentParser(description='Linux TC unit tests')
193    return parser
194
195
196def set_args(parser):
197    """
198    Set the command line arguments for tdc.
199    """
200    parser.add_argument('-p', '--path', type=str,
201                        help='The full path to the tc executable to use')
202    parser.add_argument('-c', '--category', type=str, nargs='?', const='+c',
203                        help='Run tests only from the specified category, or if no category is specified, list known categories.')
204    parser.add_argument('-f', '--file', type=str,
205                        help='Run tests from the specified file')
206    parser.add_argument('-l', '--list', type=str, nargs='?', const="", metavar='CATEGORY',
207                        help='List all test cases, or those only within the specified category')
208    parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID',
209                        help='Display the test case with specified id')
210    parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID',
211                        help='Execute the single test case with specified ID')
212    parser.add_argument('-i', '--id', action='store_true', dest='gen_id',
213                        help='Generate ID numbers for new test cases')
214    return parser
215    return parser
216
217
218def check_default_settings(args):
219    """
220    Process any arguments overriding the default settings, and ensure the
221    settings are correct.
222    """
223    # Allow for overriding specific settings
224    global NAMES
225
226    if args.path != None:
227         NAMES['TC'] = args.path
228    if not os.path.isfile(NAMES['TC']):
229        print("The specified tc path " + NAMES['TC'] + " does not exist.")
230        exit(1)
231
232
233def get_id_list(alltests):
234    """
235    Generate a list of all IDs in the test cases.
236    """
237    return [x["id"] for x in alltests]
238
239
240def check_case_id(alltests):
241    """
242    Check for duplicate test case IDs.
243    """
244    idl = get_id_list(alltests)
245    return [x for x in idl if idl.count(x) > 1]
246
247
248def does_id_exist(alltests, newid):
249    """
250    Check if a given ID already exists in the list of test cases.
251    """
252    idl = get_id_list(alltests)
253    return (any(newid == x for x in idl))
254
255
256def generate_case_ids(alltests):
257    """
258    If a test case has a blank ID field, generate a random hex ID for it
259    and then write the test cases back to disk.
260    """
261    import random
262    for c in alltests:
263        if (c["id"] == ""):
264            while True:
265                newid = str('%04x' % random.randrange(16**4))
266                if (does_id_exist(alltests, newid)):
267                    continue
268                else:
269                    c['id'] = newid
270                    break
271
272    ufilename = []
273    for c in alltests:
274        if ('filename' in c):
275            ufilename.append(c['filename'])
276    ufilename = get_unique_item(ufilename)
277    for f in ufilename:
278        testlist = []
279        for t in alltests:
280            if 'filename' in t:
281                if t['filename'] == f:
282                    del t['filename']
283                    testlist.append(t)
284        outfile = open(f, "w")
285        json.dump(testlist, outfile, indent=4)
286        outfile.close()
287
288
289def get_test_cases(args):
290    """
291    If a test case file is specified, retrieve tests from that file.
292    Otherwise, glob for all json files in subdirectories and load from
293    each one.
294    """
295    import fnmatch
296    if args.file != None:
297        if not os.path.isfile(args.file):
298            print("The specified test case file " + args.file + " does not exist.")
299            exit(1)
300        flist = [args.file]
301    else:
302        flist = []
303        for root, dirnames, filenames in os.walk('tc-tests'):
304            for filename in fnmatch.filter(filenames, '*.json'):
305                flist.append(os.path.join(root, filename))
306    alltests = list()
307    for casefile in flist:
308        alltests = alltests + (load_from_file(casefile))
309    return alltests
310
311
312def set_operation_mode(args):
313    """
314    Load the test case data and process remaining arguments to determine
315    what the script should do for this run, and call the appropriate
316    function.
317    """
318    alltests = get_test_cases(args)
319
320    if args.gen_id:
321        idlist = get_id_list(alltests)
322        if (has_blank_ids(idlist)):
323            alltests = generate_case_ids(alltests)
324        else:
325            print("No empty ID fields found in test files.")
326        exit(0)
327
328    duplicate_ids = check_case_id(alltests)
329    if (len(duplicate_ids) > 0):
330        print("The following test case IDs are not unique:")
331        print(str(set(duplicate_ids)))
332        print("Please correct them before continuing.")
333        exit(1)
334
335    ucat = get_test_categories(alltests)
336
337    if args.showID:
338        show_test_case_by_id(alltests, args.showID[0])
339        exit(0)
340
341    if args.execute:
342        target_id = args.execute[0]
343    else:
344        target_id = ""
345
346    if args.category:
347        if (args.category == '+c'):
348            print("Available categories:")
349            print_sll(ucat)
350            exit(0)
351        else:
352            target_category = args.category
353    else:
354        target_category = ""
355
356
357    testcases = get_categorized_testlist(alltests, ucat)
358
359    if args.list:
360        if (len(args.list) == 0):
361            list_test_cases(alltests)
362            exit(0)
363        elif(len(args.list > 0)):
364            if (args.list not in ucat):
365                print("Unknown category " + args.list)
366                print("Available categories:")
367                print_sll(ucat)
368                exit(1)
369            list_test_cases(testcases[args.list])
370            exit(0)
371
372    if (os.geteuid() != 0):
373        print("This script must be run with root privileges.\n")
374        exit(1)
375
376    ns_create()
377
378    if (len(target_category) == 0):
379        if (len(target_id) > 0):
380            alltests = list(filter(lambda x: target_id in x['id'], alltests))
381            if (len(alltests) == 0):
382                print("Cannot find a test case with ID matching " + target_id)
383                exit(1)
384        catresults = test_runner(alltests)
385        print("All test results: " + "\n\n" + catresults)
386    elif (len(target_category) > 0):
387        if (target_category not in ucat):
388            print("Specified category is not present in this file.")
389            exit(1)
390        else:
391            catresults = test_runner(testcases[target_category])
392            print("Category " + target_category + "\n\n" + catresults)
393
394    ns_destroy()
395
396
397def main():
398    """
399    Start of execution; set up argument parser and get the arguments,
400    and start operations.
401    """
402    parser = args_parse()
403    parser = set_args(parser)
404    (args, remaining) = parser.parse_known_args()
405    check_default_settings(args)
406
407    set_operation_mode(args)
408
409    exit(0)
410
411
412if __name__ == "__main__":
413    main()
414