xref: /openbmc/linux/tools/testing/selftests/tc-testing/tdc.py (revision 4f727ecefefbd180de10e25b3e74c03dce3f1e75)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3
4"""
5tdc.py - Linux tc (Traffic Control) unit test driver
6
7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
8"""
9
10import re
11import os
12import sys
13import argparse
14import importlib
15import json
16import subprocess
17import time
18import traceback
19from collections import OrderedDict
20from string import Template
21
22from tdc_config import *
23from tdc_helper import *
24
25import TdcPlugin
26from TdcResults import *
27
28
29class PluginMgrTestFail(Exception):
30    def __init__(self, stage, output, message):
31        self.stage = stage
32        self.output = output
33        self.message = message
34
35class PluginMgr:
36    def __init__(self, argparser):
37        super().__init__()
38        self.plugins = {}
39        self.plugin_instances = []
40        self.args = []
41        self.argparser = argparser
42
43        # TODO, put plugins in order
44        plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
45        for dirpath, dirnames, filenames in os.walk(plugindir):
46            for fn in filenames:
47                if (fn.endswith('.py') and
48                    not fn == '__init__.py' and
49                    not fn.startswith('#') and
50                    not fn.startswith('.#')):
51                    mn = fn[0:-3]
52                    foo = importlib.import_module('plugins.' + mn)
53                    self.plugins[mn] = foo
54                    self.plugin_instances.append(foo.SubPlugin())
55
56    def call_pre_suite(self, testcount, testidlist):
57        for pgn_inst in self.plugin_instances:
58            pgn_inst.pre_suite(testcount, testidlist)
59
60    def call_post_suite(self, index):
61        for pgn_inst in reversed(self.plugin_instances):
62            pgn_inst.post_suite(index)
63
64    def call_pre_case(self, testid, test_name, *, test_skip=False):
65        for pgn_inst in self.plugin_instances:
66            try:
67                pgn_inst.pre_case(testid, test_name, test_skip)
68            except Exception as ee:
69                print('exception {} in call to pre_case for {} plugin'.
70                      format(ee, pgn_inst.__class__))
71                print('test_ordinal is {}'.format(test_ordinal))
72                print('testid is {}'.format(testid))
73                raise
74
75    def call_post_case(self):
76        for pgn_inst in reversed(self.plugin_instances):
77            pgn_inst.post_case()
78
79    def call_pre_execute(self):
80        for pgn_inst in self.plugin_instances:
81            pgn_inst.pre_execute()
82
83    def call_post_execute(self):
84        for pgn_inst in reversed(self.plugin_instances):
85            pgn_inst.post_execute()
86
87    def call_add_args(self, parser):
88        for pgn_inst in self.plugin_instances:
89            parser = pgn_inst.add_args(parser)
90        return parser
91
92    def call_check_args(self, args, remaining):
93        for pgn_inst in self.plugin_instances:
94            pgn_inst.check_args(args, remaining)
95
96    def call_adjust_command(self, stage, command):
97        for pgn_inst in self.plugin_instances:
98            command = pgn_inst.adjust_command(stage, command)
99        return command
100
101    @staticmethod
102    def _make_argparser(args):
103        self.argparser = argparse.ArgumentParser(
104            description='Linux TC unit tests')
105
106def replace_keywords(cmd):
107    """
108    For a given executable command, substitute any known
109    variables contained within NAMES with the correct values
110    """
111    tcmd = Template(cmd)
112    subcmd = tcmd.safe_substitute(NAMES)
113    return subcmd
114
115
116def exec_cmd(args, pm, stage, command):
117    """
118    Perform any required modifications on an executable command, then run
119    it in a subprocess and return the results.
120    """
121    if len(command.strip()) == 0:
122        return None, None
123    if '$' in command:
124        command = replace_keywords(command)
125
126    command = pm.call_adjust_command(stage, command)
127    if args.verbose > 0:
128        print('command "{}"'.format(command))
129    proc = subprocess.Popen(command,
130        shell=True,
131        stdout=subprocess.PIPE,
132        stderr=subprocess.PIPE,
133        env=ENVIR)
134
135    try:
136        (rawout, serr) = proc.communicate(timeout=NAMES['TIMEOUT'])
137        if proc.returncode != 0 and len(serr) > 0:
138            foutput = serr.decode("utf-8", errors="ignore")
139        else:
140            foutput = rawout.decode("utf-8", errors="ignore")
141    except subprocess.TimeoutExpired:
142        foutput = "Command \"{}\" timed out\n".format(command)
143        proc.returncode = 255
144
145    proc.stdout.close()
146    proc.stderr.close()
147    return proc, foutput
148
149
150def prepare_env(args, pm, stage, prefix, cmdlist, output = None):
151    """
152    Execute the setup/teardown commands for a test case.
153    Optionally terminate test execution if the command fails.
154    """
155    if args.verbose > 0:
156        print('{}'.format(prefix))
157    for cmdinfo in cmdlist:
158        if isinstance(cmdinfo, list):
159            exit_codes = cmdinfo[1:]
160            cmd = cmdinfo[0]
161        else:
162            exit_codes = [0]
163            cmd = cmdinfo
164
165        if not cmd:
166            continue
167
168        (proc, foutput) = exec_cmd(args, pm, stage, cmd)
169
170        if proc and (proc.returncode not in exit_codes):
171            print('', file=sys.stderr)
172            print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
173                  file=sys.stderr)
174            print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
175                  file=sys.stderr)
176            print("returncode {}; expected {}".format(proc.returncode,
177                                                      exit_codes))
178            print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
179            print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
180            print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
181            raise PluginMgrTestFail(
182                stage, output,
183                '"{}" did not complete successfully'.format(prefix))
184
185def run_one_test(pm, args, index, tidx):
186    global NAMES
187    result = True
188    tresult = ""
189    tap = ""
190    res = TestResult(tidx['id'], tidx['name'])
191    if args.verbose > 0:
192        print("\t====================\n=====> ", end="")
193    print("Test " + tidx["id"] + ": " + tidx["name"])
194
195    if 'skip' in tidx:
196        if tidx['skip'] == 'yes':
197            res = TestResult(tidx['id'], tidx['name'])
198            res.set_result(ResultState.skip)
199            res.set_errormsg('Test case designated as skipped.')
200            pm.call_pre_case(tidx['id'], tidx['name'], test_skip=True)
201            pm.call_post_execute()
202            return res
203
204    # populate NAMES with TESTID for this test
205    NAMES['TESTID'] = tidx['id']
206
207    pm.call_pre_case(tidx['id'], tidx['name'])
208    prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"])
209
210    if (args.verbose > 0):
211        print('-----> execute stage')
212    pm.call_pre_execute()
213    (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"])
214    if p:
215        exit_code = p.returncode
216    else:
217        exit_code = None
218
219    pm.call_post_execute()
220
221    if (exit_code is None or exit_code != int(tidx["expExitCode"])):
222        print("exit: {!r}".format(exit_code))
223        print("exit: {}".format(int(tidx["expExitCode"])))
224        #print("exit: {!r} {}".format(exit_code, int(tidx["expExitCode"])))
225        res.set_result(ResultState.fail)
226        res.set_failmsg('Command exited with {}, expected {}\n{}'.format(exit_code, tidx["expExitCode"], procout))
227        print(procout)
228    else:
229        if args.verbose > 0:
230            print('-----> verify stage')
231        match_pattern = re.compile(
232            str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
233        (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"])
234        if procout:
235            match_index = re.findall(match_pattern, procout)
236            if len(match_index) != int(tidx["matchCount"]):
237                res.set_result(ResultState.fail)
238                res.set_failmsg('Could not match regex pattern. Verify command output:\n{}'.format(procout))
239            else:
240                res.set_result(ResultState.success)
241        elif int(tidx["matchCount"]) != 0:
242            res.set_result(ResultState.fail)
243            res.set_failmsg('No output generated by verify command.')
244        else:
245            res.set_result(ResultState.success)
246
247    prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout)
248    pm.call_post_case()
249
250    index += 1
251
252    # remove TESTID from NAMES
253    del(NAMES['TESTID'])
254    return res
255
256def test_runner(pm, args, filtered_tests):
257    """
258    Driver function for the unit tests.
259
260    Prints information about the tests being run, executes the setup and
261    teardown commands and the command under test itself. Also determines
262    success/failure based on the information in the test case and generates
263    TAP output accordingly.
264    """
265    testlist = filtered_tests
266    tcount = len(testlist)
267    index = 1
268    tap = ''
269    badtest = None
270    stage = None
271    emergency_exit = False
272    emergency_exit_message = ''
273
274    tsr = TestSuiteReport()
275
276    try:
277        pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist])
278    except Exception as ee:
279        ex_type, ex, ex_tb = sys.exc_info()
280        print('Exception {} {} (caught in pre_suite).'.
281              format(ex_type, ex))
282        traceback.print_tb(ex_tb)
283        emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
284        emergency_exit = True
285        stage = 'pre-SUITE'
286
287    if emergency_exit:
288        pm.call_post_suite(index)
289        return emergency_exit_message
290    if args.verbose > 1:
291        print('give test rig 2 seconds to stabilize')
292    time.sleep(2)
293    for tidx in testlist:
294        if "flower" in tidx["category"] and args.device == None:
295            if args.verbose > 1:
296                print('Not executing test {} {} because DEV2 not defined'.
297                      format(tidx['id'], tidx['name']))
298            res = TestResult(tidx['id'], tidx['name'])
299            res.set_result(ResultState.skip)
300            res.set_errormsg('Not executed because DEV2 is not defined')
301            tsr.add_resultdata(res)
302            continue
303        try:
304            badtest = tidx  # in case it goes bad
305            res = run_one_test(pm, args, index, tidx)
306            tsr.add_resultdata(res)
307        except PluginMgrTestFail as pmtf:
308            ex_type, ex, ex_tb = sys.exc_info()
309            stage = pmtf.stage
310            message = pmtf.message
311            output = pmtf.output
312            res = TestResult(tidx['id'], tidx['name'])
313            res.set_result(ResultState.skip)
314            res.set_errormsg(pmtf.message)
315            res.set_failmsg(pmtf.output)
316            tsr.add_resultdata(res)
317            index += 1
318            print(message)
319            print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
320                  format(ex_type, ex, index, tidx['id'], tidx['name'], stage))
321            print('---------------')
322            print('traceback')
323            traceback.print_tb(ex_tb)
324            print('---------------')
325            if stage == 'teardown':
326                print('accumulated output for this test:')
327                if pmtf.output:
328                    print(pmtf.output)
329            print('---------------')
330            break
331        index += 1
332
333    # if we failed in setup or teardown,
334    # fill in the remaining tests with ok-skipped
335    count = index
336
337    if tcount + 1 != count:
338        for tidx in testlist[count - 1:]:
339            res = TestResult(tidx['id'], tidx['name'])
340            res.set_result(ResultState.skip)
341            msg = 'skipped - previous {} failed {} {}'.format(stage,
342                index, badtest.get('id', '--Unknown--'))
343            res.set_errormsg(msg)
344            tsr.add_resultdata(res)
345            count += 1
346
347    if args.pause:
348        print('Want to pause\nPress enter to continue ...')
349        if input(sys.stdin):
350            print('got something on stdin')
351
352    pm.call_post_suite(index)
353
354    return tsr
355
356def has_blank_ids(idlist):
357    """
358    Search the list for empty ID fields and return true/false accordingly.
359    """
360    return not(all(k for k in idlist))
361
362
363def load_from_file(filename):
364    """
365    Open the JSON file containing the test cases and return them
366    as list of ordered dictionary objects.
367    """
368    try:
369        with open(filename) as test_data:
370            testlist = json.load(test_data, object_pairs_hook=OrderedDict)
371    except json.JSONDecodeError as jde:
372        print('IGNORING test case file {}\n\tBECAUSE:  {}'.format(filename, jde))
373        testlist = list()
374    else:
375        idlist = get_id_list(testlist)
376        if (has_blank_ids(idlist)):
377            for k in testlist:
378                k['filename'] = filename
379    return testlist
380
381
382def args_parse():
383    """
384    Create the argument parser.
385    """
386    parser = argparse.ArgumentParser(description='Linux TC unit tests')
387    return parser
388
389
390def set_args(parser):
391    """
392    Set the command line arguments for tdc.
393    """
394    parser.add_argument(
395        '--outfile', type=str,
396        help='Path to the file in which results should be saved. ' +
397        'Default target is the current directory.')
398    parser.add_argument(
399        '-p', '--path', type=str,
400        help='The full path to the tc executable to use')
401    sg = parser.add_argument_group(
402        'selection', 'select which test cases: ' +
403        'files plus directories; filtered by categories plus testids')
404    ag = parser.add_argument_group(
405        'action', 'select action to perform on selected test cases')
406
407    sg.add_argument(
408        '-D', '--directory', nargs='+', metavar='DIR',
409        help='Collect tests from the specified directory(ies) ' +
410        '(default [tc-tests])')
411    sg.add_argument(
412        '-f', '--file', nargs='+', metavar='FILE',
413        help='Run tests from the specified file(s)')
414    sg.add_argument(
415        '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
416        help='Run tests only from the specified category/ies, ' +
417        'or if no category/ies is/are specified, list known categories.')
418    sg.add_argument(
419        '-e', '--execute', nargs='+', metavar='ID',
420        help='Execute the specified test cases with specified IDs')
421    ag.add_argument(
422        '-l', '--list', action='store_true',
423        help='List all test cases, or those only within the specified category')
424    ag.add_argument(
425        '-s', '--show', action='store_true', dest='showID',
426        help='Display the selected test cases')
427    ag.add_argument(
428        '-i', '--id', action='store_true', dest='gen_id',
429        help='Generate ID numbers for new test cases')
430    parser.add_argument(
431        '-v', '--verbose', action='count', default=0,
432        help='Show the commands that are being run')
433    parser.add_argument(
434        '--format', default='tap', const='tap', nargs='?',
435        choices=['none', 'xunit', 'tap'],
436        help='Specify the format for test results. (Default: TAP)')
437    parser.add_argument('-d', '--device',
438                        help='Execute the test case in flower category')
439    parser.add_argument(
440        '-P', '--pause', action='store_true',
441        help='Pause execution just before post-suite stage')
442    return parser
443
444
445def check_default_settings(args, remaining, pm):
446    """
447    Process any arguments overriding the default settings,
448    and ensure the settings are correct.
449    """
450    # Allow for overriding specific settings
451    global NAMES
452
453    if args.path != None:
454        NAMES['TC'] = args.path
455    if args.device != None:
456        NAMES['DEV2'] = args.device
457    if 'TIMEOUT' not in NAMES:
458        NAMES['TIMEOUT'] = None
459    if not os.path.isfile(NAMES['TC']):
460        print("The specified tc path " + NAMES['TC'] + " does not exist.")
461        exit(1)
462
463    pm.call_check_args(args, remaining)
464
465
466def get_id_list(alltests):
467    """
468    Generate a list of all IDs in the test cases.
469    """
470    return [x["id"] for x in alltests]
471
472
473def check_case_id(alltests):
474    """
475    Check for duplicate test case IDs.
476    """
477    idl = get_id_list(alltests)
478    return [x for x in idl if idl.count(x) > 1]
479
480
481def does_id_exist(alltests, newid):
482    """
483    Check if a given ID already exists in the list of test cases.
484    """
485    idl = get_id_list(alltests)
486    return (any(newid == x for x in idl))
487
488
489def generate_case_ids(alltests):
490    """
491    If a test case has a blank ID field, generate a random hex ID for it
492    and then write the test cases back to disk.
493    """
494    import random
495    for c in alltests:
496        if (c["id"] == ""):
497            while True:
498                newid = str('{:04x}'.format(random.randrange(16**4)))
499                if (does_id_exist(alltests, newid)):
500                    continue
501                else:
502                    c['id'] = newid
503                    break
504
505    ufilename = []
506    for c in alltests:
507        if ('filename' in c):
508            ufilename.append(c['filename'])
509    ufilename = get_unique_item(ufilename)
510    for f in ufilename:
511        testlist = []
512        for t in alltests:
513            if 'filename' in t:
514                if t['filename'] == f:
515                    del t['filename']
516                    testlist.append(t)
517        outfile = open(f, "w")
518        json.dump(testlist, outfile, indent=4)
519        outfile.write("\n")
520        outfile.close()
521
522def filter_tests_by_id(args, testlist):
523    '''
524    Remove tests from testlist that are not in the named id list.
525    If id list is empty, return empty list.
526    '''
527    newlist = list()
528    if testlist and args.execute:
529        target_ids = args.execute
530
531        if isinstance(target_ids, list) and (len(target_ids) > 0):
532            newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
533    return newlist
534
535def filter_tests_by_category(args, testlist):
536    '''
537    Remove tests from testlist that are not in a named category.
538    '''
539    answer = list()
540    if args.category and testlist:
541        test_ids = list()
542        for catg in set(args.category):
543            if catg == '+c':
544                continue
545            print('considering category {}'.format(catg))
546            for tc in testlist:
547                if catg in tc['category'] and tc['id'] not in test_ids:
548                    answer.append(tc)
549                    test_ids.append(tc['id'])
550
551    return answer
552
553def get_test_cases(args):
554    """
555    If a test case file is specified, retrieve tests from that file.
556    Otherwise, glob for all json files in subdirectories and load from
557    each one.
558    Also, if requested, filter by category, and add tests matching
559    certain ids.
560    """
561    import fnmatch
562
563    flist = []
564    testdirs = ['tc-tests']
565
566    if args.file:
567        # at least one file was specified - remove the default directory
568        testdirs = []
569
570        for ff in args.file:
571            if not os.path.isfile(ff):
572                print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
573            else:
574                flist.append(os.path.abspath(ff))
575
576    if args.directory:
577        testdirs = args.directory
578
579    for testdir in testdirs:
580        for root, dirnames, filenames in os.walk(testdir):
581            for filename in fnmatch.filter(filenames, '*.json'):
582                candidate = os.path.abspath(os.path.join(root, filename))
583                if candidate not in testdirs:
584                    flist.append(candidate)
585
586    alltestcases = list()
587    for casefile in flist:
588        alltestcases = alltestcases + (load_from_file(casefile))
589
590    allcatlist = get_test_categories(alltestcases)
591    allidlist = get_id_list(alltestcases)
592
593    testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
594    idtestcases = filter_tests_by_id(args, alltestcases)
595    cattestcases = filter_tests_by_category(args, alltestcases)
596
597    cat_ids = [x['id'] for x in cattestcases]
598    if args.execute:
599        if args.category:
600            alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
601        else:
602            alltestcases = idtestcases
603    else:
604        if cat_ids:
605            alltestcases = cattestcases
606        else:
607            # just accept the existing value of alltestcases,
608            # which has been filtered by file/directory
609            pass
610
611    return allcatlist, allidlist, testcases_by_cats, alltestcases
612
613
614def set_operation_mode(pm, args):
615    """
616    Load the test case data and process remaining arguments to determine
617    what the script should do for this run, and call the appropriate
618    function.
619    """
620    ucat, idlist, testcases, alltests = get_test_cases(args)
621
622    if args.gen_id:
623        if (has_blank_ids(idlist)):
624            alltests = generate_case_ids(alltests)
625        else:
626            print("No empty ID fields found in test files.")
627        exit(0)
628
629    duplicate_ids = check_case_id(alltests)
630    if (len(duplicate_ids) > 0):
631        print("The following test case IDs are not unique:")
632        print(str(set(duplicate_ids)))
633        print("Please correct them before continuing.")
634        exit(1)
635
636    if args.showID:
637        for atest in alltests:
638            print_test_case(atest)
639        exit(0)
640
641    if isinstance(args.category, list) and (len(args.category) == 0):
642        print("Available categories:")
643        print_sll(ucat)
644        exit(0)
645
646    if args.list:
647        if args.list:
648            list_test_cases(alltests)
649            exit(0)
650
651    if len(alltests):
652        catresults = test_runner(pm, args, alltests)
653        if args.format == 'none':
654            print('Test results output suppression requested\n')
655        else:
656            print('\nAll test results: \n')
657            if args.format == 'xunit':
658                suffix = 'xml'
659                res = catresults.format_xunit()
660            elif args.format == 'tap':
661                suffix = 'tap'
662                res = catresults.format_tap()
663            print(res)
664            print('\n\n')
665            if not args.outfile:
666                fname = 'test-results.{}'.format(suffix)
667            else:
668                fname = args.outfile
669            with open(fname, 'w') as fh:
670                fh.write(res)
671                fh.close()
672                if os.getenv('SUDO_UID') is not None:
673                    os.chown(fname, uid=int(os.getenv('SUDO_UID')),
674                        gid=int(os.getenv('SUDO_GID')))
675    else:
676        print('No tests found\n')
677
678def main():
679    """
680    Start of execution; set up argument parser and get the arguments,
681    and start operations.
682    """
683    parser = args_parse()
684    parser = set_args(parser)
685    pm = PluginMgr(parser)
686    parser = pm.call_add_args(parser)
687    (args, remaining) = parser.parse_known_args()
688    args.NAMES = NAMES
689    check_default_settings(args, remaining, pm)
690    if args.verbose > 2:
691        print('args is {}'.format(args))
692
693    set_operation_mode(pm, args)
694
695    exit(0)
696
697
698if __name__ == "__main__":
699    main()
700