]> Git Repo - linux.git/blob - tools/testing/selftests/tc-testing/tdc.py
scsi: zfcp: Trace when request remove fails after qdio send fails
[linux.git] / tools / testing / selftests / tc-testing / tdc.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: GPL-2.0
3
4 """
5 tdc.py - Linux tc (Traffic Control) unit test driver
6
7 Copyright (C) 2017 Lucas Bates <[email protected]>
8 """
9
10 import re
11 import os
12 import sys
13 import argparse
14 import importlib
15 import json
16 import subprocess
17 import time
18 import traceback
19 from collections import OrderedDict
20 from string import Template
21
22 from tdc_config import *
23 from tdc_helper import *
24
25 import TdcPlugin
26 from TdcResults import *
27
28 class PluginDependencyException(Exception):
29     def __init__(self, missing_pg):
30         self.missing_pg = missing_pg
31
32 class PluginMgrTestFail(Exception):
33     def __init__(self, stage, output, message):
34         self.stage = stage
35         self.output = output
36         self.message = message
37
38 class PluginMgr:
39     def __init__(self, argparser):
40         super().__init__()
41         self.plugins = {}
42         self.plugin_instances = []
43         self.failed_plugins = {}
44         self.argparser = argparser
45
46         # TODO, put plugins in order
47         plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
48         for dirpath, dirnames, filenames in os.walk(plugindir):
49             for fn in filenames:
50                 if (fn.endswith('.py') and
51                     not fn == '__init__.py' and
52                     not fn.startswith('#') and
53                     not fn.startswith('.#')):
54                     mn = fn[0:-3]
55                     foo = importlib.import_module('plugins.' + mn)
56                     self.plugins[mn] = foo
57                     self.plugin_instances.append(foo.SubPlugin())
58
59     def load_plugin(self, pgdir, pgname):
60         pgname = pgname[0:-3]
61         foo = importlib.import_module('{}.{}'.format(pgdir, pgname))
62         self.plugins[pgname] = foo
63         self.plugin_instances.append(foo.SubPlugin())
64         self.plugin_instances[-1].check_args(self.args, None)
65
66     def get_required_plugins(self, testlist):
67         '''
68         Get all required plugins from the list of test cases and return
69         all unique items.
70         '''
71         reqs = []
72         for t in testlist:
73             try:
74                 if 'requires' in t['plugins']:
75                     if isinstance(t['plugins']['requires'], list):
76                         reqs.extend(t['plugins']['requires'])
77                     else:
78                         reqs.append(t['plugins']['requires'])
79             except KeyError:
80                 continue
81         reqs = get_unique_item(reqs)
82         return reqs
83
84     def load_required_plugins(self, reqs, parser, args, remaining):
85         '''
86         Get all required plugins from the list of test cases and load any plugin
87         that is not already enabled.
88         '''
89         pgd = ['plugin-lib', 'plugin-lib-custom']
90         pnf = []
91
92         for r in reqs:
93             if r not in self.plugins:
94                 fname = '{}.py'.format(r)
95                 source_path = []
96                 for d in pgd:
97                     pgpath = '{}/{}'.format(d, fname)
98                     if os.path.isfile(pgpath):
99                         source_path.append(pgpath)
100                 if len(source_path) == 0:
101                     print('ERROR: unable to find required plugin {}'.format(r))
102                     pnf.append(fname)
103                     continue
104                 elif len(source_path) > 1:
105                     print('WARNING: multiple copies of plugin {} found, using version found')
106                     print('at {}'.format(source_path[0]))
107                 pgdir = source_path[0]
108                 pgdir = pgdir.split('/')[0]
109                 self.load_plugin(pgdir, fname)
110         if len(pnf) > 0:
111             raise PluginDependencyException(pnf)
112
113         parser = self.call_add_args(parser)
114         (args, remaining) = parser.parse_known_args(args=remaining, namespace=args)
115         return args
116
117     def call_pre_suite(self, testcount, testidlist):
118         for pgn_inst in self.plugin_instances:
119             pgn_inst.pre_suite(testcount, testidlist)
120
121     def call_post_suite(self, index):
122         for pgn_inst in reversed(self.plugin_instances):
123             pgn_inst.post_suite(index)
124
125     def call_pre_case(self, caseinfo, *, test_skip=False):
126         for pgn_inst in self.plugin_instances:
127             try:
128                 pgn_inst.pre_case(caseinfo, test_skip)
129             except Exception as ee:
130                 print('exception {} in call to pre_case for {} plugin'.
131                       format(ee, pgn_inst.__class__))
132                 print('test_ordinal is {}'.format(test_ordinal))
133                 print('testid is {}'.format(caseinfo['id']))
134                 raise
135
136     def call_post_case(self):
137         for pgn_inst in reversed(self.plugin_instances):
138             pgn_inst.post_case()
139
140     def call_pre_execute(self):
141         for pgn_inst in self.plugin_instances:
142             pgn_inst.pre_execute()
143
144     def call_post_execute(self):
145         for pgn_inst in reversed(self.plugin_instances):
146             pgn_inst.post_execute()
147
148     def call_add_args(self, parser):
149         for pgn_inst in self.plugin_instances:
150             parser = pgn_inst.add_args(parser)
151         return parser
152
153     def call_check_args(self, args, remaining):
154         for pgn_inst in self.plugin_instances:
155             pgn_inst.check_args(args, remaining)
156
157     def call_adjust_command(self, stage, command):
158         for pgn_inst in self.plugin_instances:
159             command = pgn_inst.adjust_command(stage, command)
160         return command
161
162     def set_args(self, args):
163         self.args = args
164
165     @staticmethod
166     def _make_argparser(args):
167         self.argparser = argparse.ArgumentParser(
168             description='Linux TC unit tests')
169
170 def replace_keywords(cmd):
171     """
172     For a given executable command, substitute any known
173     variables contained within NAMES with the correct values
174     """
175     tcmd = Template(cmd)
176     subcmd = tcmd.safe_substitute(NAMES)
177     return subcmd
178
179
180 def exec_cmd(args, pm, stage, command):
181     """
182     Perform any required modifications on an executable command, then run
183     it in a subprocess and return the results.
184     """
185     if len(command.strip()) == 0:
186         return None, None
187     if '$' in command:
188         command = replace_keywords(command)
189
190     command = pm.call_adjust_command(stage, command)
191     if args.verbose > 0:
192         print('command "{}"'.format(command))
193     proc = subprocess.Popen(command,
194         shell=True,
195         stdout=subprocess.PIPE,
196         stderr=subprocess.PIPE,
197         env=ENVIR)
198
199     try:
200         (rawout, serr) = proc.communicate(timeout=NAMES['TIMEOUT'])
201         if proc.returncode != 0 and len(serr) > 0:
202             foutput = serr.decode("utf-8", errors="ignore")
203         else:
204             foutput = rawout.decode("utf-8", errors="ignore")
205     except subprocess.TimeoutExpired:
206         foutput = "Command \"{}\" timed out\n".format(command)
207         proc.returncode = 255
208
209     proc.stdout.close()
210     proc.stderr.close()
211     return proc, foutput
212
213
214 def prepare_env(args, pm, stage, prefix, cmdlist, output = None):
215     """
216     Execute the setup/teardown commands for a test case.
217     Optionally terminate test execution if the command fails.
218     """
219     if args.verbose > 0:
220         print('{}'.format(prefix))
221     for cmdinfo in cmdlist:
222         if isinstance(cmdinfo, list):
223             exit_codes = cmdinfo[1:]
224             cmd = cmdinfo[0]
225         else:
226             exit_codes = [0]
227             cmd = cmdinfo
228
229         if not cmd:
230             continue
231
232         (proc, foutput) = exec_cmd(args, pm, stage, cmd)
233
234         if proc and (proc.returncode not in exit_codes):
235             print('', file=sys.stderr)
236             print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
237                   file=sys.stderr)
238             print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
239                   file=sys.stderr)
240             print("returncode {}; expected {}".format(proc.returncode,
241                                                       exit_codes))
242             print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
243             print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
244             print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
245             raise PluginMgrTestFail(
246                 stage, output,
247                 '"{}" did not complete successfully'.format(prefix))
248
249 def verify_by_json(procout, res, tidx, args, pm):
250     try:
251         outputJSON = json.loads(procout)
252     except json.JSONDecodeError:
253         res.set_result(ResultState.fail)
254         res.set_failmsg('Cannot decode verify command\'s output. Is it JSON?')
255         return res
256
257     matchJSON = json.loads(json.dumps(tidx['matchJSON']))
258
259     if type(outputJSON) != type(matchJSON):
260         failmsg = 'Original output and matchJSON value are not the same type: output: {} != matchJSON: {} '
261         failmsg = failmsg.format(type(outputJSON).__name__, type(matchJSON).__name__)
262         res.set_result(ResultState.fail)
263         res.set_failmsg(failmsg)
264         return res
265
266     if len(matchJSON) > len(outputJSON):
267         failmsg = "Your matchJSON value is an array, and it contains more elements than the command under test\'s output:\ncommand output (length: {}):\n{}\nmatchJSON value (length: {}):\n{}"
268         failmsg = failmsg.format(len(outputJSON), outputJSON, len(matchJSON), matchJSON)
269         res.set_result(ResultState.fail)
270         res.set_failmsg(failmsg)
271         return res
272     res = find_in_json(res, outputJSON, matchJSON, 0)
273
274     return res
275
276 def find_in_json(res, outputJSONVal, matchJSONVal, matchJSONKey=None):
277     if res.get_result() == ResultState.fail:
278         return res
279
280     if type(matchJSONVal) == list:
281         res = find_in_json_list(res, outputJSONVal, matchJSONVal, matchJSONKey)
282
283     elif type(matchJSONVal) == dict:
284         res = find_in_json_dict(res, outputJSONVal, matchJSONVal)
285     else:
286         res = find_in_json_other(res, outputJSONVal, matchJSONVal, matchJSONKey)
287
288     if res.get_result() != ResultState.fail:
289         res.set_result(ResultState.success)
290         return res
291
292     return res
293
294 def find_in_json_list(res, outputJSONVal, matchJSONVal, matchJSONKey=None):
295     if (type(matchJSONVal) != type(outputJSONVal)):
296         failmsg = 'Original output and matchJSON value are not the same type: output: {} != matchJSON: {}'
297         failmsg = failmsg.format(outputJSONVal, matchJSONVal)
298         res.set_result(ResultState.fail)
299         res.set_failmsg(failmsg)
300         return res
301
302     if len(matchJSONVal) > len(outputJSONVal):
303         failmsg = "Your matchJSON value is an array, and it contains more elements than the command under test\'s output:\ncommand output (length: {}):\n{}\nmatchJSON value (length: {}):\n{}"
304         failmsg = failmsg.format(len(outputJSONVal), outputJSONVal, len(matchJSONVal), matchJSONVal)
305         res.set_result(ResultState.fail)
306         res.set_failmsg(failmsg)
307         return res
308
309     for matchJSONIdx, matchJSONVal in enumerate(matchJSONVal):
310         res = find_in_json(res, outputJSONVal[matchJSONIdx], matchJSONVal,
311                            matchJSONKey)
312     return res
313
314 def find_in_json_dict(res, outputJSONVal, matchJSONVal):
315     for matchJSONKey, matchJSONVal in matchJSONVal.items():
316         if type(outputJSONVal) == dict:
317             if matchJSONKey not in outputJSONVal:
318                 failmsg = 'Key not found in json output: {}: {}\nMatching against output: {}'
319                 failmsg = failmsg.format(matchJSONKey, matchJSONVal, outputJSONVal)
320                 res.set_result(ResultState.fail)
321                 res.set_failmsg(failmsg)
322                 return res
323
324         else:
325             failmsg = 'Original output and matchJSON value are not the same type: output: {} != matchJSON: {}'
326             failmsg = failmsg.format(type(outputJSON).__name__, type(matchJSON).__name__)
327             res.set_result(ResultState.fail)
328             res.set_failmsg(failmsg)
329             return rest
330
331         if type(outputJSONVal) == dict and (type(outputJSONVal[matchJSONKey]) == dict or
332                 type(outputJSONVal[matchJSONKey]) == list):
333             if len(matchJSONVal) > 0:
334                 res = find_in_json(res, outputJSONVal[matchJSONKey], matchJSONVal, matchJSONKey)
335             # handling corner case where matchJSONVal == [] or matchJSONVal == {}
336             else:
337                 res = find_in_json_other(res, outputJSONVal, matchJSONVal, matchJSONKey)
338         else:
339             res = find_in_json(res, outputJSONVal, matchJSONVal, matchJSONKey)
340     return res
341
342 def find_in_json_other(res, outputJSONVal, matchJSONVal, matchJSONKey=None):
343     if matchJSONKey in outputJSONVal:
344         if matchJSONVal != outputJSONVal[matchJSONKey]:
345             failmsg = 'Value doesn\'t match: {}: {} != {}\nMatching against output: {}'
346             failmsg = failmsg.format(matchJSONKey, matchJSONVal, outputJSONVal[matchJSONKey], outputJSONVal)
347             res.set_result(ResultState.fail)
348             res.set_failmsg(failmsg)
349             return res
350
351     return res
352
353 def run_one_test(pm, args, index, tidx):
354     global NAMES
355     result = True
356     tresult = ""
357     tap = ""
358     res = TestResult(tidx['id'], tidx['name'])
359     if args.verbose > 0:
360         print("\t====================\n=====> ", end="")
361     print("Test " + tidx["id"] + ": " + tidx["name"])
362
363     if 'skip' in tidx:
364         if tidx['skip'] == 'yes':
365             res = TestResult(tidx['id'], tidx['name'])
366             res.set_result(ResultState.skip)
367             res.set_errormsg('Test case designated as skipped.')
368             pm.call_pre_case(tidx, test_skip=True)
369             pm.call_post_execute()
370             return res
371
372     # populate NAMES with TESTID for this test
373     NAMES['TESTID'] = tidx['id']
374
375     pm.call_pre_case(tidx)
376     prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"])
377
378     if (args.verbose > 0):
379         print('-----> execute stage')
380     pm.call_pre_execute()
381     (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"])
382     if p:
383         exit_code = p.returncode
384     else:
385         exit_code = None
386
387     pm.call_post_execute()
388
389     if (exit_code is None or exit_code != int(tidx["expExitCode"])):
390         print("exit: {!r}".format(exit_code))
391         print("exit: {}".format(int(tidx["expExitCode"])))
392         #print("exit: {!r} {}".format(exit_code, int(tidx["expExitCode"])))
393         res.set_result(ResultState.fail)
394         res.set_failmsg('Command exited with {}, expected {}\n{}'.format(exit_code, tidx["expExitCode"], procout))
395         print(procout)
396     else:
397         if args.verbose > 0:
398             print('-----> verify stage')
399         (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"])
400         if procout:
401             if 'matchJSON' in tidx:
402                 verify_by_json(procout, res, tidx, args, pm)
403             elif 'matchPattern' in tidx:
404                 match_pattern = re.compile(
405                     str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
406                 match_index = re.findall(match_pattern, procout)
407                 if len(match_index) != int(tidx["matchCount"]):
408                     res.set_result(ResultState.fail)
409                     res.set_failmsg('Could not match regex pattern. Verify command output:\n{}'.format(procout))
410                 else:
411                     res.set_result(ResultState.success)
412             else:
413                 res.set_result(ResultState.fail)
414                 res.set_failmsg('Must specify a match option: matchJSON or matchPattern\n{}'.format(procout))
415         elif int(tidx["matchCount"]) != 0:
416             res.set_result(ResultState.fail)
417             res.set_failmsg('No output generated by verify command.')
418         else:
419             res.set_result(ResultState.success)
420
421     prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout)
422     pm.call_post_case()
423
424     index += 1
425
426     # remove TESTID from NAMES
427     del(NAMES['TESTID'])
428     return res
429
430 def test_runner(pm, args, filtered_tests):
431     """
432     Driver function for the unit tests.
433
434     Prints information about the tests being run, executes the setup and
435     teardown commands and the command under test itself. Also determines
436     success/failure based on the information in the test case and generates
437     TAP output accordingly.
438     """
439     testlist = filtered_tests
440     tcount = len(testlist)
441     index = 1
442     tap = ''
443     badtest = None
444     stage = None
445     emergency_exit = False
446     emergency_exit_message = ''
447
448     tsr = TestSuiteReport()
449
450     try:
451         pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist])
452     except Exception as ee:
453         ex_type, ex, ex_tb = sys.exc_info()
454         print('Exception {} {} (caught in pre_suite).'.
455               format(ex_type, ex))
456         traceback.print_tb(ex_tb)
457         emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
458         emergency_exit = True
459         stage = 'pre-SUITE'
460
461     if emergency_exit:
462         pm.call_post_suite(index)
463         return emergency_exit_message
464     if args.verbose > 1:
465         print('give test rig 2 seconds to stabilize')
466     time.sleep(2)
467     for tidx in testlist:
468         if "flower" in tidx["category"] and args.device == None:
469             errmsg = "Tests using the DEV2 variable must define the name of a "
470             errmsg += "physical NIC with the -d option when running tdc.\n"
471             errmsg += "Test has been skipped."
472             if args.verbose > 1:
473                 print(errmsg)
474             res = TestResult(tidx['id'], tidx['name'])
475             res.set_result(ResultState.skip)
476             res.set_errormsg(errmsg)
477             tsr.add_resultdata(res)
478             index += 1
479             continue
480         try:
481             badtest = tidx  # in case it goes bad
482             res = run_one_test(pm, args, index, tidx)
483             tsr.add_resultdata(res)
484         except PluginMgrTestFail as pmtf:
485             ex_type, ex, ex_tb = sys.exc_info()
486             stage = pmtf.stage
487             message = pmtf.message
488             output = pmtf.output
489             res = TestResult(tidx['id'], tidx['name'])
490             res.set_result(ResultState.skip)
491             res.set_errormsg(pmtf.message)
492             res.set_failmsg(pmtf.output)
493             tsr.add_resultdata(res)
494             index += 1
495             print(message)
496             print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
497                   format(ex_type, ex, index, tidx['id'], tidx['name'], stage))
498             print('---------------')
499             print('traceback')
500             traceback.print_tb(ex_tb)
501             print('---------------')
502             if stage == 'teardown':
503                 print('accumulated output for this test:')
504                 if pmtf.output:
505                     print(pmtf.output)
506             print('---------------')
507             break
508         index += 1
509
510     # if we failed in setup or teardown,
511     # fill in the remaining tests with ok-skipped
512     count = index
513
514     if tcount + 1 != count:
515         for tidx in testlist[count - 1:]:
516             res = TestResult(tidx['id'], tidx['name'])
517             res.set_result(ResultState.skip)
518             msg = 'skipped - previous {} failed {} {}'.format(stage,
519                 index, badtest.get('id', '--Unknown--'))
520             res.set_errormsg(msg)
521             tsr.add_resultdata(res)
522             count += 1
523
524     if args.pause:
525         print('Want to pause\nPress enter to continue ...')
526         if input(sys.stdin):
527             print('got something on stdin')
528
529     pm.call_post_suite(index)
530
531     return tsr
532
533 def has_blank_ids(idlist):
534     """
535     Search the list for empty ID fields and return true/false accordingly.
536     """
537     return not(all(k for k in idlist))
538
539
540 def load_from_file(filename):
541     """
542     Open the JSON file containing the test cases and return them
543     as list of ordered dictionary objects.
544     """
545     try:
546         with open(filename) as test_data:
547             testlist = json.load(test_data, object_pairs_hook=OrderedDict)
548     except json.JSONDecodeError as jde:
549         print('IGNORING test case file {}\n\tBECAUSE:  {}'.format(filename, jde))
550         testlist = list()
551     else:
552         idlist = get_id_list(testlist)
553         if (has_blank_ids(idlist)):
554             for k in testlist:
555                 k['filename'] = filename
556     return testlist
557
558
559 def args_parse():
560     """
561     Create the argument parser.
562     """
563     parser = argparse.ArgumentParser(description='Linux TC unit tests')
564     return parser
565
566
567 def set_args(parser):
568     """
569     Set the command line arguments for tdc.
570     """
571     parser.add_argument(
572         '--outfile', type=str,
573         help='Path to the file in which results should be saved. ' +
574         'Default target is the current directory.')
575     parser.add_argument(
576         '-p', '--path', type=str,
577         help='The full path to the tc executable to use')
578     sg = parser.add_argument_group(
579         'selection', 'select which test cases: ' +
580         'files plus directories; filtered by categories plus testids')
581     ag = parser.add_argument_group(
582         'action', 'select action to perform on selected test cases')
583
584     sg.add_argument(
585         '-D', '--directory', nargs='+', metavar='DIR',
586         help='Collect tests from the specified directory(ies) ' +
587         '(default [tc-tests])')
588     sg.add_argument(
589         '-f', '--file', nargs='+', metavar='FILE',
590         help='Run tests from the specified file(s)')
591     sg.add_argument(
592         '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
593         help='Run tests only from the specified category/ies, ' +
594         'or if no category/ies is/are specified, list known categories.')
595     sg.add_argument(
596         '-e', '--execute', nargs='+', metavar='ID',
597         help='Execute the specified test cases with specified IDs')
598     ag.add_argument(
599         '-l', '--list', action='store_true',
600         help='List all test cases, or those only within the specified category')
601     ag.add_argument(
602         '-s', '--show', action='store_true', dest='showID',
603         help='Display the selected test cases')
604     ag.add_argument(
605         '-i', '--id', action='store_true', dest='gen_id',
606         help='Generate ID numbers for new test cases')
607     parser.add_argument(
608         '-v', '--verbose', action='count', default=0,
609         help='Show the commands that are being run')
610     parser.add_argument(
611         '--format', default='tap', const='tap', nargs='?',
612         choices=['none', 'xunit', 'tap'],
613         help='Specify the format for test results. (Default: TAP)')
614     parser.add_argument('-d', '--device',
615                         help='Execute test cases that use a physical device, ' +
616                         'where DEVICE is its name. (If not defined, tests ' +
617                         'that require a physical device will be skipped)')
618     parser.add_argument(
619         '-P', '--pause', action='store_true',
620         help='Pause execution just before post-suite stage')
621     return parser
622
623
624 def check_default_settings(args, remaining, pm):
625     """
626     Process any arguments overriding the default settings,
627     and ensure the settings are correct.
628     """
629     # Allow for overriding specific settings
630     global NAMES
631
632     if args.path != None:
633         NAMES['TC'] = args.path
634     if args.device != None:
635         NAMES['DEV2'] = args.device
636     if 'TIMEOUT' not in NAMES:
637         NAMES['TIMEOUT'] = None
638     if not os.path.isfile(NAMES['TC']):
639         print("The specified tc path " + NAMES['TC'] + " does not exist.")
640         exit(1)
641
642     pm.call_check_args(args, remaining)
643
644
645 def get_id_list(alltests):
646     """
647     Generate a list of all IDs in the test cases.
648     """
649     return [x["id"] for x in alltests]
650
651
652 def check_case_id(alltests):
653     """
654     Check for duplicate test case IDs.
655     """
656     idl = get_id_list(alltests)
657     return [x for x in idl if idl.count(x) > 1]
658
659
660 def does_id_exist(alltests, newid):
661     """
662     Check if a given ID already exists in the list of test cases.
663     """
664     idl = get_id_list(alltests)
665     return (any(newid == x for x in idl))
666
667
668 def generate_case_ids(alltests):
669     """
670     If a test case has a blank ID field, generate a random hex ID for it
671     and then write the test cases back to disk.
672     """
673     import random
674     for c in alltests:
675         if (c["id"] == ""):
676             while True:
677                 newid = str('{:04x}'.format(random.randrange(16**4)))
678                 if (does_id_exist(alltests, newid)):
679                     continue
680                 else:
681                     c['id'] = newid
682                     break
683
684     ufilename = []
685     for c in alltests:
686         if ('filename' in c):
687             ufilename.append(c['filename'])
688     ufilename = get_unique_item(ufilename)
689     for f in ufilename:
690         testlist = []
691         for t in alltests:
692             if 'filename' in t:
693                 if t['filename'] == f:
694                     del t['filename']
695                     testlist.append(t)
696         outfile = open(f, "w")
697         json.dump(testlist, outfile, indent=4)
698         outfile.write("\n")
699         outfile.close()
700
701 def filter_tests_by_id(args, testlist):
702     '''
703     Remove tests from testlist that are not in the named id list.
704     If id list is empty, return empty list.
705     '''
706     newlist = list()
707     if testlist and args.execute:
708         target_ids = args.execute
709
710         if isinstance(target_ids, list) and (len(target_ids) > 0):
711             newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
712     return newlist
713
714 def filter_tests_by_category(args, testlist):
715     '''
716     Remove tests from testlist that are not in a named category.
717     '''
718     answer = list()
719     if args.category and testlist:
720         test_ids = list()
721         for catg in set(args.category):
722             if catg == '+c':
723                 continue
724             print('considering category {}'.format(catg))
725             for tc in testlist:
726                 if catg in tc['category'] and tc['id'] not in test_ids:
727                     answer.append(tc)
728                     test_ids.append(tc['id'])
729
730     return answer
731
732
733 def get_test_cases(args):
734     """
735     If a test case file is specified, retrieve tests from that file.
736     Otherwise, glob for all json files in subdirectories and load from
737     each one.
738     Also, if requested, filter by category, and add tests matching
739     certain ids.
740     """
741     import fnmatch
742
743     flist = []
744     testdirs = ['tc-tests']
745
746     if args.file:
747         # at least one file was specified - remove the default directory
748         testdirs = []
749
750         for ff in args.file:
751             if not os.path.isfile(ff):
752                 print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
753             else:
754                 flist.append(os.path.abspath(ff))
755
756     if args.directory:
757         testdirs = args.directory
758
759     for testdir in testdirs:
760         for root, dirnames, filenames in os.walk(testdir):
761             for filename in fnmatch.filter(filenames, '*.json'):
762                 candidate = os.path.abspath(os.path.join(root, filename))
763                 if candidate not in testdirs:
764                     flist.append(candidate)
765
766     alltestcases = list()
767     for casefile in flist:
768         alltestcases = alltestcases + (load_from_file(casefile))
769
770     allcatlist = get_test_categories(alltestcases)
771     allidlist = get_id_list(alltestcases)
772
773     testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
774     idtestcases = filter_tests_by_id(args, alltestcases)
775     cattestcases = filter_tests_by_category(args, alltestcases)
776
777     cat_ids = [x['id'] for x in cattestcases]
778     if args.execute:
779         if args.category:
780             alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
781         else:
782             alltestcases = idtestcases
783     else:
784         if cat_ids:
785             alltestcases = cattestcases
786         else:
787             # just accept the existing value of alltestcases,
788             # which has been filtered by file/directory
789             pass
790
791     return allcatlist, allidlist, testcases_by_cats, alltestcases
792
793
794 def set_operation_mode(pm, parser, args, remaining):
795     """
796     Load the test case data and process remaining arguments to determine
797     what the script should do for this run, and call the appropriate
798     function.
799     """
800     ucat, idlist, testcases, alltests = get_test_cases(args)
801
802     if args.gen_id:
803         if (has_blank_ids(idlist)):
804             alltests = generate_case_ids(alltests)
805         else:
806             print("No empty ID fields found in test files.")
807         exit(0)
808
809     duplicate_ids = check_case_id(alltests)
810     if (len(duplicate_ids) > 0):
811         print("The following test case IDs are not unique:")
812         print(str(set(duplicate_ids)))
813         print("Please correct them before continuing.")
814         exit(1)
815
816     if args.showID:
817         for atest in alltests:
818             print_test_case(atest)
819         exit(0)
820
821     if isinstance(args.category, list) and (len(args.category) == 0):
822         print("Available categories:")
823         print_sll(ucat)
824         exit(0)
825
826     if args.list:
827         list_test_cases(alltests)
828         exit(0)
829
830     exit_code = 0 # KSFT_PASS
831     if len(alltests):
832         req_plugins = pm.get_required_plugins(alltests)
833         try:
834             args = pm.load_required_plugins(req_plugins, parser, args, remaining)
835         except PluginDependencyException as pde:
836             print('The following plugins were not found:')
837             print('{}'.format(pde.missing_pg))
838         catresults = test_runner(pm, args, alltests)
839         if catresults.count_failures() != 0:
840             exit_code = 1 # KSFT_FAIL
841         if args.format == 'none':
842             print('Test results output suppression requested\n')
843         else:
844             print('\nAll test results: \n')
845             if args.format == 'xunit':
846                 suffix = 'xml'
847                 res = catresults.format_xunit()
848             elif args.format == 'tap':
849                 suffix = 'tap'
850                 res = catresults.format_tap()
851             print(res)
852             print('\n\n')
853             if not args.outfile:
854                 fname = 'test-results.{}'.format(suffix)
855             else:
856                 fname = args.outfile
857             with open(fname, 'w') as fh:
858                 fh.write(res)
859                 fh.close()
860                 if os.getenv('SUDO_UID') is not None:
861                     os.chown(fname, uid=int(os.getenv('SUDO_UID')),
862                         gid=int(os.getenv('SUDO_GID')))
863     else:
864         print('No tests found\n')
865         exit_code = 4 # KSFT_SKIP
866     exit(exit_code)
867
868 def main():
869     """
870     Start of execution; set up argument parser and get the arguments,
871     and start operations.
872     """
873     parser = args_parse()
874     parser = set_args(parser)
875     pm = PluginMgr(parser)
876     parser = pm.call_add_args(parser)
877     (args, remaining) = parser.parse_known_args()
878     args.NAMES = NAMES
879     pm.set_args(args)
880     check_default_settings(args, remaining, pm)
881     if args.verbose > 2:
882         print('args is {}'.format(args))
883
884     set_operation_mode(pm, parser, args, remaining)
885
886 if __name__ == "__main__":
887     main()
This page took 0.090184 seconds and 4 git commands to generate.