2 # SPDX-License-Identifier: GPL-2.0
5 tdc.py - Linux tc (Traffic Control) unit test driver
20 from multiprocessing import Pool
21 from collections import OrderedDict
22 from string import Template
24 from tdc_config import *
25 from tdc_helper import *
28 from TdcResults import *
30 class PluginDependencyException(Exception):
31 def __init__(self, missing_pg):
32 self.missing_pg = missing_pg
34 class PluginMgrTestFail(Exception):
35 def __init__(self, stage, output, message):
38 self.message = message
41 def __init__(self, argparser):
44 self.plugin_instances = []
45 self.failed_plugins = {}
46 self.argparser = argparser
48 plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
49 for dirpath, dirnames, filenames in os.walk(plugindir):
51 if (fn.endswith('.py') and
52 not fn == '__init__.py' and
53 not fn.startswith('#') and
54 not fn.startswith('.#')):
56 foo = importlib.import_module('plugins.' + mn)
58 self.plugin_instances[mn] = foo.SubPlugin()
60 def load_plugin(self, pgdir, pgname):
62 self.plugins.add(pgname)
64 foo = importlib.import_module('{}.{}'.format(pgdir, pgname))
66 # nsPlugin must always be the first one
67 if pgname == "nsPlugin":
68 self.plugin_instances.insert(0, (pgname, foo.SubPlugin()))
69 self.plugin_instances[0][1].check_args(self.args, None)
71 self.plugin_instances.append((pgname, foo.SubPlugin()))
72 self.plugin_instances[-1][1].check_args(self.args, None)
74 def get_required_plugins(self, testlist):
76 Get all required plugins from the list of test cases and return
82 if 'requires' in t['plugins']:
83 if isinstance(t['plugins']['requires'], list):
84 reqs.update(set(t['plugins']['requires']))
86 reqs.add(t['plugins']['requires'])
87 t['plugins'] = t['plugins']['requires']
96 def load_required_plugins(self, reqs, parser, args, remaining):
98 Get all required plugins from the list of test cases and load any plugin
99 that is not already enabled.
101 pgd = ['plugin-lib', 'plugin-lib-custom']
105 if r not in self.plugins:
106 fname = '{}.py'.format(r)
109 pgpath = '{}/{}'.format(d, fname)
110 if os.path.isfile(pgpath):
111 source_path.append(pgpath)
112 if len(source_path) == 0:
113 print('ERROR: unable to find required plugin {}'.format(r))
116 elif len(source_path) > 1:
117 print('WARNING: multiple copies of plugin {} found, using version found')
118 print('at {}'.format(source_path[0]))
119 pgdir = source_path[0]
120 pgdir = pgdir.split('/')[0]
121 self.load_plugin(pgdir, fname)
123 raise PluginDependencyException(pnf)
125 parser = self.call_add_args(parser)
126 (args, remaining) = parser.parse_known_args(args=remaining, namespace=args)
129 def call_pre_suite(self, testcount, testidlist):
130 for (_, pgn_inst) in self.plugin_instances:
131 pgn_inst.pre_suite(testcount, testidlist)
133 def call_post_suite(self, index):
134 for (_, pgn_inst) in reversed(self.plugin_instances):
135 pgn_inst.post_suite(index)
137 def call_pre_case(self, caseinfo, *, test_skip=False):
138 for (pgn, pgn_inst) in self.plugin_instances:
139 if pgn not in caseinfo['plugins']:
142 pgn_inst.pre_case(caseinfo, test_skip)
143 except Exception as ee:
144 print('exception {} in call to pre_case for {} plugin'.
145 format(ee, pgn_inst.__class__))
146 print('testid is {}'.format(caseinfo['id']))
149 def call_post_case(self, caseinfo):
150 for (pgn, pgn_inst) in reversed(self.plugin_instances):
151 if pgn not in caseinfo['plugins']:
155 def call_pre_execute(self, caseinfo):
156 for (pgn, pgn_inst) in self.plugin_instances:
157 if pgn not in caseinfo['plugins']:
159 pgn_inst.pre_execute()
161 def call_post_execute(self, caseinfo):
162 for (pgn, pgn_inst) in reversed(self.plugin_instances):
163 if pgn not in caseinfo['plugins']:
165 pgn_inst.post_execute()
167 def call_add_args(self, parser):
168 for (pgn, pgn_inst) in self.plugin_instances:
169 parser = pgn_inst.add_args(parser)
172 def call_check_args(self, args, remaining):
173 for (pgn, pgn_inst) in self.plugin_instances:
174 pgn_inst.check_args(args, remaining)
176 def call_adjust_command(self, caseinfo, stage, command):
177 for (pgn, pgn_inst) in self.plugin_instances:
178 if pgn not in caseinfo['plugins']:
180 command = pgn_inst.adjust_command(stage, command)
183 def set_args(self, args):
187 def _make_argparser(args):
188 self.argparser = argparse.ArgumentParser(
189 description='Linux TC unit tests')
191 def replace_keywords(cmd):
193 For a given executable command, substitute any known
194 variables contained within NAMES with the correct values
197 subcmd = tcmd.safe_substitute(NAMES)
201 def exec_cmd(caseinfo, args, pm, stage, command):
203 Perform any required modifications on an executable command, then run
204 it in a subprocess and return the results.
206 if len(command.strip()) == 0:
209 command = replace_keywords(command)
211 command = pm.call_adjust_command(caseinfo, stage, command)
213 print('command "{}"'.format(command))
215 proc = subprocess.Popen(command,
217 stdout=subprocess.PIPE,
218 stderr=subprocess.PIPE,
222 (rawout, serr) = proc.communicate(timeout=NAMES['TIMEOUT'])
223 if proc.returncode != 0 and len(serr) > 0:
224 foutput = serr.decode("utf-8", errors="ignore")
226 foutput = rawout.decode("utf-8", errors="ignore")
227 except subprocess.TimeoutExpired:
228 foutput = "Command \"{}\" timed out\n".format(command)
229 proc.returncode = 255
236 def prepare_env(caseinfo, args, pm, stage, prefix, cmdlist, output = None):
238 Execute the setup/teardown commands for a test case.
239 Optionally terminate test execution if the command fails.
242 print('{}'.format(prefix))
243 for cmdinfo in cmdlist:
244 if isinstance(cmdinfo, list):
245 exit_codes = cmdinfo[1:]
254 (proc, foutput) = exec_cmd(caseinfo, args, pm, stage, cmd)
256 if proc and (proc.returncode not in exit_codes):
257 print('', file=sys.stderr)
258 print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
260 print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
262 print("returncode {}; expected {}".format(proc.returncode,
264 print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
265 print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
266 print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
267 raise PluginMgrTestFail(
269 '"{}" did not complete successfully'.format(prefix))
271 def verify_by_json(procout, res, tidx, args, pm):
273 outputJSON = json.loads(procout)
274 except json.JSONDecodeError:
275 res.set_result(ResultState.fail)
276 res.set_failmsg('Cannot decode verify command\'s output. Is it JSON?')
279 matchJSON = json.loads(json.dumps(tidx['matchJSON']))
281 if type(outputJSON) != type(matchJSON):
282 failmsg = 'Original output and matchJSON value are not the same type: output: {} != matchJSON: {} '
283 failmsg = failmsg.format(type(outputJSON).__name__, type(matchJSON).__name__)
284 res.set_result(ResultState.fail)
285 res.set_failmsg(failmsg)
288 if len(matchJSON) > len(outputJSON):
289 failmsg = "Your matchJSON value is an array, and it contains more elements than the command under test\'s output:\ncommand output (length: {}):\n{}\nmatchJSON value (length: {}):\n{}"
290 failmsg = failmsg.format(len(outputJSON), outputJSON, len(matchJSON), matchJSON)
291 res.set_result(ResultState.fail)
292 res.set_failmsg(failmsg)
294 res = find_in_json(res, outputJSON, matchJSON, 0)
298 def find_in_json(res, outputJSONVal, matchJSONVal, matchJSONKey=None):
299 if res.get_result() == ResultState.fail:
302 if type(matchJSONVal) == list:
303 res = find_in_json_list(res, outputJSONVal, matchJSONVal, matchJSONKey)
305 elif type(matchJSONVal) == dict:
306 res = find_in_json_dict(res, outputJSONVal, matchJSONVal)
308 res = find_in_json_other(res, outputJSONVal, matchJSONVal, matchJSONKey)
310 if res.get_result() != ResultState.fail:
311 res.set_result(ResultState.success)
316 def find_in_json_list(res, outputJSONVal, matchJSONVal, matchJSONKey=None):
317 if (type(matchJSONVal) != type(outputJSONVal)):
318 failmsg = 'Original output and matchJSON value are not the same type: output: {} != matchJSON: {}'
319 failmsg = failmsg.format(outputJSONVal, matchJSONVal)
320 res.set_result(ResultState.fail)
321 res.set_failmsg(failmsg)
324 if len(matchJSONVal) > len(outputJSONVal):
325 failmsg = "Your matchJSON value is an array, and it contains more elements than the command under test\'s output:\ncommand output (length: {}):\n{}\nmatchJSON value (length: {}):\n{}"
326 failmsg = failmsg.format(len(outputJSONVal), outputJSONVal, len(matchJSONVal), matchJSONVal)
327 res.set_result(ResultState.fail)
328 res.set_failmsg(failmsg)
331 for matchJSONIdx, matchJSONVal in enumerate(matchJSONVal):
332 res = find_in_json(res, outputJSONVal[matchJSONIdx], matchJSONVal,
336 def find_in_json_dict(res, outputJSONVal, matchJSONVal):
337 for matchJSONKey, matchJSONVal in matchJSONVal.items():
338 if type(outputJSONVal) == dict:
339 if matchJSONKey not in outputJSONVal:
340 failmsg = 'Key not found in json output: {}: {}\nMatching against output: {}'
341 failmsg = failmsg.format(matchJSONKey, matchJSONVal, outputJSONVal)
342 res.set_result(ResultState.fail)
343 res.set_failmsg(failmsg)
347 failmsg = 'Original output and matchJSON value are not the same type: output: {} != matchJSON: {}'
348 failmsg = failmsg.format(type(outputJSON).__name__, type(matchJSON).__name__)
349 res.set_result(ResultState.fail)
350 res.set_failmsg(failmsg)
353 if type(outputJSONVal) == dict and (type(outputJSONVal[matchJSONKey]) == dict or
354 type(outputJSONVal[matchJSONKey]) == list):
355 if len(matchJSONVal) > 0:
356 res = find_in_json(res, outputJSONVal[matchJSONKey], matchJSONVal, matchJSONKey)
357 # handling corner case where matchJSONVal == [] or matchJSONVal == {}
359 res = find_in_json_other(res, outputJSONVal, matchJSONVal, matchJSONKey)
361 res = find_in_json(res, outputJSONVal, matchJSONVal, matchJSONKey)
364 def find_in_json_other(res, outputJSONVal, matchJSONVal, matchJSONKey=None):
365 if matchJSONKey in outputJSONVal:
366 if matchJSONVal != outputJSONVal[matchJSONKey]:
367 failmsg = 'Value doesn\'t match: {}: {} != {}\nMatching against output: {}'
368 failmsg = failmsg.format(matchJSONKey, matchJSONVal, outputJSONVal[matchJSONKey], outputJSONVal)
369 res.set_result(ResultState.fail)
370 res.set_failmsg(failmsg)
375 def run_one_test(pm, args, index, tidx):
380 dummy = NAMES['DUMMY']
384 res = TestResult(tidx['id'], tidx['name'])
386 print("\t====================\n=====> ", end="")
387 print("Test " + tidx["id"] + ": " + tidx["name"])
390 if tidx['skip'] == 'yes':
391 res = TestResult(tidx['id'], tidx['name'])
392 res.set_result(ResultState.skip)
393 res.set_errormsg('Test case designated as skipped.')
394 pm.call_pre_case(tidx, test_skip=True)
395 pm.call_post_execute(tidx)
398 if 'dependsOn' in tidx:
399 if (args.verbose > 0):
400 print('probe command for test skip')
401 (p, procout) = exec_cmd(tidx, args, pm, 'execute', tidx['dependsOn'])
403 if (p.returncode != 0):
404 res = TestResult(tidx['id'], tidx['name'])
405 res.set_result(ResultState.skip)
406 res.set_errormsg('probe command: test skipped.')
407 pm.call_pre_case(tidx, test_skip=True)
408 pm.call_post_execute(tidx)
411 # populate NAMES with TESTID for this test
412 NAMES['TESTID'] = tidx['id']
413 NAMES['NS'] = '{}-{}'.format(NAMES['NS'], tidx['random'])
414 NAMES['DEV0'] = '{}id{}'.format(NAMES['DEV0'], tidx['id'])
415 NAMES['DEV1'] = '{}id{}'.format(NAMES['DEV1'], tidx['id'])
416 NAMES['DUMMY'] = '{}id{}'.format(NAMES['DUMMY'], tidx['id'])
418 pm.call_pre_case(tidx)
419 prepare_env(tidx, args, pm, 'setup', "-----> prepare stage", tidx["setup"])
421 if (args.verbose > 0):
422 print('-----> execute stage')
423 pm.call_pre_execute(tidx)
424 (p, procout) = exec_cmd(tidx, args, pm, 'execute', tidx["cmdUnderTest"])
426 exit_code = p.returncode
430 pm.call_post_execute(tidx)
432 if (exit_code is None or exit_code != int(tidx["expExitCode"])):
433 print("exit: {!r}".format(exit_code))
434 print("exit: {}".format(int(tidx["expExitCode"])))
435 #print("exit: {!r} {}".format(exit_code, int(tidx["expExitCode"])))
436 res.set_result(ResultState.fail)
437 res.set_failmsg('Command exited with {}, expected {}\n{}'.format(exit_code, tidx["expExitCode"], procout))
441 print('-----> verify stage')
442 (p, procout) = exec_cmd(tidx, args, pm, 'verify', tidx["verifyCmd"])
444 if 'matchJSON' in tidx:
445 verify_by_json(procout, res, tidx, args, pm)
446 elif 'matchPattern' in tidx:
447 match_pattern = re.compile(
448 str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
449 match_index = re.findall(match_pattern, procout)
450 if len(match_index) != int(tidx["matchCount"]):
451 res.set_result(ResultState.fail)
452 res.set_failmsg('Could not match regex pattern. Verify command output:\n{}'.format(procout))
454 res.set_result(ResultState.success)
456 res.set_result(ResultState.fail)
457 res.set_failmsg('Must specify a match option: matchJSON or matchPattern\n{}'.format(procout))
458 elif int(tidx["matchCount"]) != 0:
459 res.set_result(ResultState.fail)
460 res.set_failmsg('No output generated by verify command.')
462 res.set_result(ResultState.success)
464 prepare_env(tidx, args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout)
465 pm.call_post_case(tidx)
469 # remove TESTID from NAMES
476 NAMES['DUMMY'] = dummy
480 def prepare_run(pm, args, testlist):
481 tcount = len(testlist)
482 emergency_exit = False
483 emergency_exit_message = ''
486 pm.call_pre_suite(tcount, testlist)
487 except Exception as ee:
488 ex_type, ex, ex_tb = sys.exc_info()
489 print('Exception {} {} (caught in pre_suite).'.
491 traceback.print_tb(ex_tb)
492 emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
493 emergency_exit = True
496 pm.call_post_suite(1)
497 return emergency_exit_message
499 def purge_run(pm, index):
500 pm.call_post_suite(index)
502 def test_runner(pm, args, filtered_tests):
504 Driver function for the unit tests.
506 Prints information about the tests being run, executes the setup and
507 teardown commands and the command under test itself. Also determines
508 success/failure based on the information in the test case and generates
509 TAP output accordingly.
511 testlist = filtered_tests
512 tcount = len(testlist)
518 tsr = TestSuiteReport()
520 for tidx in testlist:
521 if "flower" in tidx["category"] and args.device == None:
522 errmsg = "Tests using the DEV2 variable must define the name of a "
523 errmsg += "physical NIC with the -d option when running tdc.\n"
524 errmsg += "Test has been skipped."
527 res = TestResult(tidx['id'], tidx['name'])
528 res.set_result(ResultState.skip)
529 res.set_errormsg(errmsg)
530 tsr.add_resultdata(res)
534 badtest = tidx # in case it goes bad
535 res = run_one_test(pm, args, index, tidx)
536 tsr.add_resultdata(res)
537 except PluginMgrTestFail as pmtf:
538 ex_type, ex, ex_tb = sys.exc_info()
540 message = pmtf.message
542 res = TestResult(tidx['id'], tidx['name'])
543 res.set_result(ResultState.fail)
544 res.set_errormsg(pmtf.message)
545 res.set_failmsg(pmtf.output)
546 tsr.add_resultdata(res)
549 print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
550 format(ex_type, ex, index, tidx['id'], tidx['name'], stage))
551 print('---------------')
553 traceback.print_tb(ex_tb)
554 print('---------------')
555 if stage == 'teardown':
556 print('accumulated output for this test:')
559 print('---------------')
563 # if we failed in setup or teardown,
564 # fill in the remaining tests with ok-skipped
567 if tcount + 1 != count:
568 for tidx in testlist[count - 1:]:
569 res = TestResult(tidx['id'], tidx['name'])
570 res.set_result(ResultState.skip)
571 msg = 'skipped - previous {} failed {} {}'.format(stage,
572 index, badtest.get('id', '--Unknown--'))
573 res.set_errormsg(msg)
574 tsr.add_resultdata(res)
578 print('Want to pause\nPress enter to continue ...')
580 print('got something on stdin')
584 def mp_bins(alltests):
588 for test in alltests:
589 if 'nsPlugin' not in test['plugins']:
592 # We can only create one netdevsim device at a time
593 if 'netdevsim/new_device' in str(test['setup']):
596 parallel.append(test)
598 return (serial, parallel)
600 def __mp_runner(tests):
601 (_, tsr) = test_runner(mp_pm, mp_args, tests)
602 return tsr._testsuite
604 def test_runner_mp(pm, args, alltests):
605 prepare_run(pm, args, alltests)
607 (serial, parallel) = mp_bins(alltests)
609 batches = [parallel[n : n + 32] for n in range(0, len(parallel), 32)]
610 batches.insert(0, serial)
612 print("Executing {} tests in parallel and {} in serial".format(len(parallel), len(serial)))
613 print("Using {} batches and {} workers".format(len(batches), args.mp))
615 # We can't pickle these objects so workaround them
622 with Pool(args.mp) as p:
623 pres = p.map(__mp_runner, batches)
625 tsr = TestSuiteReport()
628 tsr.add_resultdata(res)
630 # Passing an index is not useful in MP
635 def test_runner_serial(pm, args, alltests):
636 prepare_run(pm, args, alltests)
639 print("Executing {} tests in serial".format(len(alltests)))
641 (index, tsr) = test_runner(pm, args, alltests)
647 def has_blank_ids(idlist):
649 Search the list for empty ID fields and return true/false accordingly.
651 return not(all(k for k in idlist))
654 def load_from_file(filename):
656 Open the JSON file containing the test cases and return them
657 as list of ordered dictionary objects.
660 with open(filename) as test_data:
661 testlist = json.load(test_data, object_pairs_hook=OrderedDict)
662 except json.JSONDecodeError as jde:
663 print('IGNORING test case file {}\n\tBECAUSE: {}'.format(filename, jde))
666 idlist = get_id_list(testlist)
667 if (has_blank_ids(idlist)):
669 k['filename'] = filename
672 def identity(string):
677 Create the argument parser.
679 parser = argparse.ArgumentParser(description='Linux TC unit tests')
680 parser.register('type', None, identity)
684 def set_args(parser):
686 Set the command line arguments for tdc.
689 '--outfile', type=str,
690 help='Path to the file in which results should be saved. ' +
691 'Default target is the current directory.')
693 '-p', '--path', type=str,
694 help='The full path to the tc executable to use')
695 sg = parser.add_argument_group(
696 'selection', 'select which test cases: ' +
697 'files plus directories; filtered by categories plus testids')
698 ag = parser.add_argument_group(
699 'action', 'select action to perform on selected test cases')
702 '-D', '--directory', nargs='+', metavar='DIR',
703 help='Collect tests from the specified directory(ies) ' +
704 '(default [tc-tests])')
706 '-f', '--file', nargs='+', metavar='FILE',
707 help='Run tests from the specified file(s)')
709 '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
710 help='Run tests only from the specified category/ies, ' +
711 'or if no category/ies is/are specified, list known categories.')
713 '-e', '--execute', nargs='+', metavar='ID',
714 help='Execute the specified test cases with specified IDs')
716 '-l', '--list', action='store_true',
717 help='List all test cases, or those only within the specified category')
719 '-s', '--show', action='store_true', dest='showID',
720 help='Display the selected test cases')
722 '-i', '--id', action='store_true', dest='gen_id',
723 help='Generate ID numbers for new test cases')
725 '-v', '--verbose', action='count', default=0,
726 help='Show the commands that are being run')
728 '--format', default='tap', const='tap', nargs='?',
729 choices=['none', 'xunit', 'tap'],
730 help='Specify the format for test results. (Default: TAP)')
731 parser.add_argument('-d', '--device',
732 help='Execute test cases that use a physical device, ' +
733 'where DEVICE is its name. (If not defined, tests ' +
734 'that require a physical device will be skipped)')
736 '-P', '--pause', action='store_true',
737 help='Pause execution just before post-suite stage')
739 '-J', '--multiprocess', type=int, default=1, dest='mp',
740 help='Run tests in parallel whenever possible')
744 def check_default_settings(args, remaining, pm):
746 Process any arguments overriding the default settings,
747 and ensure the settings are correct.
749 # Allow for overriding specific settings
752 if args.path != None:
753 NAMES['TC'] = args.path
754 if args.device != None:
755 NAMES['DEV2'] = args.device
756 if 'TIMEOUT' not in NAMES:
757 NAMES['TIMEOUT'] = None
758 if not os.path.isfile(NAMES['TC']):
759 print("The specified tc path " + NAMES['TC'] + " does not exist.")
762 pm.call_check_args(args, remaining)
765 def get_id_list(alltests):
767 Generate a list of all IDs in the test cases.
769 return [x["id"] for x in alltests]
771 def check_case_id(alltests):
773 Check for duplicate test case IDs.
775 idl = get_id_list(alltests)
776 return [x for x in idl if idl.count(x) > 1]
779 def does_id_exist(alltests, newid):
781 Check if a given ID already exists in the list of test cases.
783 idl = get_id_list(alltests)
784 return (any(newid == x for x in idl))
787 def generate_case_ids(alltests):
789 If a test case has a blank ID field, generate a random hex ID for it
790 and then write the test cases back to disk.
795 newid = str('{:04x}'.format(random.randrange(16**4)))
796 if (does_id_exist(alltests, newid)):
804 if ('filename' in c):
805 ufilename.append(c['filename'])
806 ufilename = get_unique_item(ufilename)
811 if t['filename'] == f:
814 outfile = open(f, "w")
815 json.dump(testlist, outfile, indent=4)
819 def filter_tests_by_id(args, testlist):
821 Remove tests from testlist that are not in the named id list.
822 If id list is empty, return empty list.
825 if testlist and args.execute:
826 target_ids = args.execute
828 if isinstance(target_ids, list) and (len(target_ids) > 0):
829 newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
832 def filter_tests_by_category(args, testlist):
834 Remove tests from testlist that are not in a named category.
837 if args.category and testlist:
839 for catg in set(args.category):
842 print('considering category {}'.format(catg))
844 if catg in tc['category'] and tc['id'] not in test_ids:
846 test_ids.append(tc['id'])
850 def set_random(alltests):
851 for tidx in alltests:
852 tidx['random'] = random.getrandbits(32)
854 def get_test_cases(args):
856 If a test case file is specified, retrieve tests from that file.
857 Otherwise, glob for all json files in subdirectories and load from
859 Also, if requested, filter by category, and add tests matching
865 testdirs = ['tc-tests']
868 # at least one file was specified - remove the default directory
872 if not os.path.isfile(ff):
873 print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
875 flist.append(os.path.abspath(ff))
878 testdirs = args.directory
880 for testdir in testdirs:
881 for root, dirnames, filenames in os.walk(testdir):
882 for filename in fnmatch.filter(filenames, '*.json'):
883 candidate = os.path.abspath(os.path.join(root, filename))
884 if candidate not in testdirs:
885 flist.append(candidate)
887 alltestcases = list()
888 for casefile in flist:
889 alltestcases = alltestcases + (load_from_file(casefile))
891 allcatlist = get_test_categories(alltestcases)
892 allidlist = get_id_list(alltestcases)
894 testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
895 idtestcases = filter_tests_by_id(args, alltestcases)
896 cattestcases = filter_tests_by_category(args, alltestcases)
898 cat_ids = [x['id'] for x in cattestcases]
901 alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
903 alltestcases = idtestcases
906 alltestcases = cattestcases
908 # just accept the existing value of alltestcases,
909 # which has been filtered by file/directory
912 return allcatlist, allidlist, testcases_by_cats, alltestcases
915 def set_operation_mode(pm, parser, args, remaining):
917 Load the test case data and process remaining arguments to determine
918 what the script should do for this run, and call the appropriate
921 ucat, idlist, testcases, alltests = get_test_cases(args)
924 if (has_blank_ids(idlist)):
925 alltests = generate_case_ids(alltests)
927 print("No empty ID fields found in test files.")
930 duplicate_ids = check_case_id(alltests)
931 if (len(duplicate_ids) > 0):
932 print("The following test case IDs are not unique:")
933 print(str(set(duplicate_ids)))
934 print("Please correct them before continuing.")
938 for atest in alltests:
939 print_test_case(atest)
942 if isinstance(args.category, list) and (len(args.category) == 0):
943 print("Available categories:")
948 list_test_cases(alltests)
953 exit_code = 0 # KSFT_PASS
955 req_plugins = pm.get_required_plugins(alltests)
957 args = pm.load_required_plugins(req_plugins, parser, args, remaining)
958 except PluginDependencyException as pde:
959 print('The following plugins were not found:')
960 print('{}'.format(pde.missing_pg))
963 catresults = test_runner_mp(pm, args, alltests)
965 catresults = test_runner_serial(pm, args, alltests)
967 if catresults.count_failures() != 0:
968 exit_code = 1 # KSFT_FAIL
969 if args.format == 'none':
970 print('Test results output suppression requested\n')
972 print('\nAll test results: \n')
973 if args.format == 'xunit':
975 res = catresults.format_xunit()
976 elif args.format == 'tap':
978 res = catresults.format_tap()
982 fname = 'test-results.{}'.format(suffix)
985 with open(fname, 'w') as fh:
988 if os.getenv('SUDO_UID') is not None:
989 os.chown(fname, uid=int(os.getenv('SUDO_UID')),
990 gid=int(os.getenv('SUDO_GID')))
992 print('No tests found\n')
993 exit_code = 4 # KSFT_SKIP
998 Start of execution; set up argument parser and get the arguments,
999 and start operations.
1003 if sys.version_info.major < 3 or sys.version_info.minor < 8:
1004 sys.exit("tdc requires at least python 3.8")
1006 resource.setrlimit(resource.RLIMIT_NOFILE, (1048576, 1048576))
1008 parser = args_parse()
1009 parser = set_args(parser)
1010 pm = PluginMgr(parser)
1011 parser = pm.call_add_args(parser)
1012 (args, remaining) = parser.parse_known_args()
1014 args.mp = min(args.mp, 4)
1016 check_default_settings(args, remaining, pm)
1017 if args.verbose > 2:
1018 print('args is {}'.format(args))
1021 set_operation_mode(pm, parser, args, remaining)
1022 except KeyboardInterrupt:
1024 pm.call_post_suite(None)
1026 if __name__ == "__main__":