xref: /openbmc/linux/tools/testing/selftests/tc-testing/tdc.py (revision b24413180f5600bcb3bb70fbed5cf186b60864bd)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3
4"""
5tdc.py - Linux tc (Traffic Control) unit test driver
6
7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
8"""
9
10import re
11import os
12import sys
13import argparse
14import json
15import subprocess
16from collections import OrderedDict
17from string import Template
18
19from tdc_config import *
20from tdc_helper import *
21
22
23USE_NS = True
24
25
26def replace_keywords(cmd):
27    """
28    For a given executable command, substitute any known
29    variables contained within NAMES with the correct values
30    """
31    tcmd = Template(cmd)
32    subcmd = tcmd.safe_substitute(NAMES)
33    return subcmd
34
35
36def exec_cmd(command, nsonly=True):
37    """
38    Perform any required modifications on an executable command, then run
39    it in a subprocess and return the results.
40    """
41    if (USE_NS and nsonly):
42        command = 'ip netns exec $NS ' + command
43
44    if '$' in command:
45        command = replace_keywords(command)
46
47    proc = subprocess.Popen(command,
48        shell=True,
49        stdout=subprocess.PIPE,
50        stderr=subprocess.PIPE)
51    (rawout, serr) = proc.communicate()
52
53    if proc.returncode != 0:
54        foutput = serr.decode("utf-8")
55    else:
56        foutput = rawout.decode("utf-8")
57
58    proc.stdout.close()
59    proc.stderr.close()
60    return proc, foutput
61
62
63def prepare_env(cmdlist):
64    """
65    Execute the setup/teardown commands for a test case. Optionally
66    terminate test execution if the command fails.
67    """
68    for cmdinfo in cmdlist:
69        if (type(cmdinfo) == list):
70            exit_codes = cmdinfo[1:]
71            cmd = cmdinfo[0]
72        else:
73            exit_codes = [0]
74            cmd = cmdinfo
75
76        if (len(cmd) == 0):
77            continue
78
79        (proc, foutput) = exec_cmd(cmd)
80
81        if proc.returncode not in exit_codes:
82            print
83            print("Could not execute:")
84            print(cmd)
85            print("\nError message:")
86            print(foutput)
87            print("\nAborting test run.")
88            ns_destroy()
89            exit(1)
90
91
92def test_runner(filtered_tests):
93    """
94    Driver function for the unit tests.
95
96    Prints information about the tests being run, executes the setup and
97    teardown commands and the command under test itself. Also determines
98    success/failure based on the information in the test case and generates
99    TAP output accordingly.
100    """
101    testlist = filtered_tests
102    tcount = len(testlist)
103    index = 1
104    tap = str(index) + ".." + str(tcount) + "\n"
105
106    for tidx in testlist:
107        result = True
108        tresult = ""
109        print("Test " + tidx["id"] + ": " + tidx["name"])
110        prepare_env(tidx["setup"])
111        (p, procout) = exec_cmd(tidx["cmdUnderTest"])
112        exit_code = p.returncode
113
114        if (exit_code != int(tidx["expExitCode"])):
115            result = False
116            print("exit:", exit_code, int(tidx["expExitCode"]))
117            print(procout)
118        else:
119            match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL)
120            (p, procout) = exec_cmd(tidx["verifyCmd"])
121            match_index = re.findall(match_pattern, procout)
122            if len(match_index) != int(tidx["matchCount"]):
123                result = False
124
125        if result == True:
126            tresult += "ok "
127        else:
128            tresult += "not ok "
129        tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n"
130
131        if result == False:
132            tap += procout
133
134        prepare_env(tidx["teardown"])
135        index += 1
136
137    return tap
138
139
140def ns_create():
141    """
142    Create the network namespace in which the tests will be run and set up
143    the required network devices for it.
144    """
145    if (USE_NS):
146        cmd = 'ip netns add $NS'
147        exec_cmd(cmd, False)
148        cmd = 'ip link add $DEV0 type veth peer name $DEV1'
149        exec_cmd(cmd, False)
150        cmd = 'ip link set $DEV1 netns $NS'
151        exec_cmd(cmd, False)
152        cmd = 'ip link set $DEV0 up'
153        exec_cmd(cmd, False)
154        cmd = 'ip -s $NS link set $DEV1 up'
155        exec_cmd(cmd, False)
156
157
158def ns_destroy():
159    """
160    Destroy the network namespace for testing (and any associated network
161    devices as well)
162    """
163    if (USE_NS):
164        cmd = 'ip netns delete $NS'
165        exec_cmd(cmd, False)
166
167
168def has_blank_ids(idlist):
169    """
170    Search the list for empty ID fields and return true/false accordingly.
171    """
172    return not(all(k for k in idlist))
173
174
175def load_from_file(filename):
176    """
177    Open the JSON file containing the test cases and return them as an
178    ordered dictionary object.
179    """
180    with open(filename) as test_data:
181        testlist = json.load(test_data, object_pairs_hook=OrderedDict)
182    idlist = get_id_list(testlist)
183    if (has_blank_ids(idlist)):
184        for k in testlist:
185            k['filename'] = filename
186    return testlist
187
188
189def args_parse():
190    """
191    Create the argument parser.
192    """
193    parser = argparse.ArgumentParser(description='Linux TC unit tests')
194    return parser
195
196
197def set_args(parser):
198    """
199    Set the command line arguments for tdc.
200    """
201    parser.add_argument('-p', '--path', type=str,
202                        help='The full path to the tc executable to use')
203    parser.add_argument('-c', '--category', type=str, nargs='?', const='+c',
204                        help='Run tests only from the specified category, or if no category is specified, list known categories.')
205    parser.add_argument('-f', '--file', type=str,
206                        help='Run tests from the specified file')
207    parser.add_argument('-l', '--list', type=str, nargs='?', const="", metavar='CATEGORY',
208                        help='List all test cases, or those only within the specified category')
209    parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID',
210                        help='Display the test case with specified id')
211    parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID',
212                        help='Execute the single test case with specified ID')
213    parser.add_argument('-i', '--id', action='store_true', dest='gen_id',
214                        help='Generate ID numbers for new test cases')
215    return parser
216    return parser
217
218
219def check_default_settings(args):
220    """
221    Process any arguments overriding the default settings, and ensure the
222    settings are correct.
223    """
224    # Allow for overriding specific settings
225    global NAMES
226
227    if args.path != None:
228         NAMES['TC'] = args.path
229    if not os.path.isfile(NAMES['TC']):
230        print("The specified tc path " + NAMES['TC'] + " does not exist.")
231        exit(1)
232
233
234def get_id_list(alltests):
235    """
236    Generate a list of all IDs in the test cases.
237    """
238    return [x["id"] for x in alltests]
239
240
241def check_case_id(alltests):
242    """
243    Check for duplicate test case IDs.
244    """
245    idl = get_id_list(alltests)
246    return [x for x in idl if idl.count(x) > 1]
247
248
249def does_id_exist(alltests, newid):
250    """
251    Check if a given ID already exists in the list of test cases.
252    """
253    idl = get_id_list(alltests)
254    return (any(newid == x for x in idl))
255
256
257def generate_case_ids(alltests):
258    """
259    If a test case has a blank ID field, generate a random hex ID for it
260    and then write the test cases back to disk.
261    """
262    import random
263    for c in alltests:
264        if (c["id"] == ""):
265            while True:
266                newid = str('%04x' % random.randrange(16**4))
267                if (does_id_exist(alltests, newid)):
268                    continue
269                else:
270                    c['id'] = newid
271                    break
272
273    ufilename = []
274    for c in alltests:
275        if ('filename' in c):
276            ufilename.append(c['filename'])
277    ufilename = get_unique_item(ufilename)
278    for f in ufilename:
279        testlist = []
280        for t in alltests:
281            if 'filename' in t:
282                if t['filename'] == f:
283                    del t['filename']
284                    testlist.append(t)
285        outfile = open(f, "w")
286        json.dump(testlist, outfile, indent=4)
287        outfile.close()
288
289
290def get_test_cases(args):
291    """
292    If a test case file is specified, retrieve tests from that file.
293    Otherwise, glob for all json files in subdirectories and load from
294    each one.
295    """
296    import fnmatch
297    if args.file != None:
298        if not os.path.isfile(args.file):
299            print("The specified test case file " + args.file + " does not exist.")
300            exit(1)
301        flist = [args.file]
302    else:
303        flist = []
304        for root, dirnames, filenames in os.walk('tc-tests'):
305            for filename in fnmatch.filter(filenames, '*.json'):
306                flist.append(os.path.join(root, filename))
307    alltests = list()
308    for casefile in flist:
309        alltests = alltests + (load_from_file(casefile))
310    return alltests
311
312
313def set_operation_mode(args):
314    """
315    Load the test case data and process remaining arguments to determine
316    what the script should do for this run, and call the appropriate
317    function.
318    """
319    alltests = get_test_cases(args)
320
321    if args.gen_id:
322        idlist = get_id_list(alltests)
323        if (has_blank_ids(idlist)):
324            alltests = generate_case_ids(alltests)
325        else:
326            print("No empty ID fields found in test files.")
327        exit(0)
328
329    duplicate_ids = check_case_id(alltests)
330    if (len(duplicate_ids) > 0):
331        print("The following test case IDs are not unique:")
332        print(str(set(duplicate_ids)))
333        print("Please correct them before continuing.")
334        exit(1)
335
336    ucat = get_test_categories(alltests)
337
338    if args.showID:
339        show_test_case_by_id(alltests, args.showID[0])
340        exit(0)
341
342    if args.execute:
343        target_id = args.execute[0]
344    else:
345        target_id = ""
346
347    if args.category:
348        if (args.category == '+c'):
349            print("Available categories:")
350            print_sll(ucat)
351            exit(0)
352        else:
353            target_category = args.category
354    else:
355        target_category = ""
356
357
358    testcases = get_categorized_testlist(alltests, ucat)
359
360    if args.list:
361        if (len(args.list) == 0):
362            list_test_cases(alltests)
363            exit(0)
364        elif(len(args.list > 0)):
365            if (args.list not in ucat):
366                print("Unknown category " + args.list)
367                print("Available categories:")
368                print_sll(ucat)
369                exit(1)
370            list_test_cases(testcases[args.list])
371            exit(0)
372
373    if (os.geteuid() != 0):
374        print("This script must be run with root privileges.\n")
375        exit(1)
376
377    ns_create()
378
379    if (len(target_category) == 0):
380        if (len(target_id) > 0):
381            alltests = list(filter(lambda x: target_id in x['id'], alltests))
382            if (len(alltests) == 0):
383                print("Cannot find a test case with ID matching " + target_id)
384                exit(1)
385        catresults = test_runner(alltests)
386        print("All test results: " + "\n\n" + catresults)
387    elif (len(target_category) > 0):
388        if (target_category not in ucat):
389            print("Specified category is not present in this file.")
390            exit(1)
391        else:
392            catresults = test_runner(testcases[target_category])
393            print("Category " + target_category + "\n\n" + catresults)
394
395    ns_destroy()
396
397
398def main():
399    """
400    Start of execution; set up argument parser and get the arguments,
401    and start operations.
402    """
403    parser = args_parse()
404    parser = set_args(parser)
405    (args, remaining) = parser.parse_known_args()
406    check_default_settings(args)
407
408    set_operation_mode(args)
409
410    exit(0)
411
412
413if __name__ == "__main__":
414    main()
415