if os.environ.get('QEMU_IO_OPTIONS'):
qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
+qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
+if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
+ qemu_io_args_no_fmt += \
+ os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
+
qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
if os.environ.get('QEMU_NBD_OPTIONS'):
qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
imgfmt = os.environ.get('IMGFMT', 'raw')
imgproto = os.environ.get('IMGPROTO', 'file')
test_dir = os.environ.get('TEST_DIR')
+sock_dir = os.environ.get('SOCK_DIR')
output_dir = os.environ.get('OUTPUT_DIR', '.')
cachemode = os.environ.get('CACHEMODE')
qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
return subp.communicate()[0]
+def qemu_io_log(*args):
+ result = qemu_io(*args)
+ log(result, filters=[filter_testfiles, filter_qemu_io])
+ return result
+
def qemu_io_silent(*args):
'''Run qemu-io and return the exit code, suppressing stdout'''
args = qemu_io_args + list(args)
qemu_img('create', img_path, '1G')
# migration_sock_path is automatically deleted
"""
- def __init__(self, names):
+ def __init__(self, names, base_dir=test_dir):
self.paths = []
for name in names:
- self.paths.append(os.path.join(test_dir, file_pattern(name)))
+ self.paths.append(os.path.join(base_dir, file_pattern(name)))
def __enter__(self):
return self.paths
"""
FilePath is a specialization of FilePaths that takes a single filename.
"""
- def __init__(self, name):
- super(FilePath, self).__init__([name])
+ def __init__(self, name, base_dir=test_dir):
+ super(FilePath, self).__init__([name], base_dir)
def __enter__(self):
return self.paths[0]
pass
-def file_path(*names):
+def file_path(*names, base_dir=test_dir):
''' Another way to get auto-generated filename that cleans itself up.
Use is as simple as:
paths = []
for name in names:
filename = file_pattern(name)
- path = os.path.join(test_dir, filename)
+ path = os.path.join(base_dir, filename)
file_path_remover.paths.append(path)
paths.append(path)
name = "qemu%s-%d" % (path_suffix, os.getpid())
super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
test_dir=test_dir,
- socket_scm_helper=socket_scm_helper)
+ socket_scm_helper=socket_scm_helper,
+ sock_dir=sock_dir)
self._num_drives = 0
def add_object(self, opts):
self.fail('no match for "%s" in %s' % (str(result), str(value)))
else:
self.assertEqual(result, value,
- 'values not equal "%s" and "%s"'
- % (str(result), str(value)))
+ '"%s" is "%s", expected "%s"'
+ % (path, str(result), str(value)))
def assert_no_active_block_jobs(self):
result = self.vm.qmp('query-block-jobs')
return self.pause_wait(job_id)
return result
+ def case_skip(self, reason):
+ '''Skip this test case'''
+ case_notrun(reason)
+ self.skipTest(reason)
+
def notrun(reason):
'''Skip this test suite'''
sys.exit(0)
def case_notrun(reason):
- '''Skip this test case'''
+ '''Mark this test case as not having been run (without actually
+ skipping it, that is left to the caller). See
+ QMPTestCase.case_skip() for a variant that actually skips the
+ current test case.'''
+
# Each test in qemu-iotests has a number ("seq")
seq = os.path.basename(sys.argv[0])
def supported_formats(read_only=False):
'''Set 'read_only' to True to check ro-whitelist
Otherwise, rw-whitelist is checked'''
- format_message = qemu_pipe("-drive", "format=help")
- line = 1 if read_only else 0
- return format_message.splitlines()[line].split(":")[1].split()
+
+ if not hasattr(supported_formats, "formats"):
+ supported_formats.formats = {}
+
+ if read_only not in supported_formats.formats:
+ format_message = qemu_pipe("-drive", "format=help")
+ line = 1 if read_only else 0
+ supported_formats.formats[read_only] = \
+ format_message.splitlines()[line].split(":")[1].split()
+
+ return supported_formats.formats[read_only]
def skip_if_unsupported(required_formats=[], read_only=False):
'''Skip Test Decorator
Runs the test if all the required formats are whitelisted'''
def skip_test_decorator(func):
- def func_wrapper(*args, **kwargs):
- usf_list = list(set(required_formats) -
- set(supported_formats(read_only)))
+ def func_wrapper(test_case: QMPTestCase, *args, **kwargs):
+ if callable(required_formats):
+ fmts = required_formats(test_case)
+ else:
+ fmts = required_formats
+
+ usf_list = list(set(fmts) - set(supported_formats(read_only)))
if usf_list:
- case_notrun('{}: formats {} are not whitelisted'.format(
- args[0], usf_list))
+ test_case.case_skip('{}: formats {} are not whitelisted'.format(
+ test_case, usf_list))
else:
- return func(*args, **kwargs)
+ return func(test_case, *args, **kwargs)
return func_wrapper
return skip_test_decorator
+def skip_if_user_is_root(func):
+ '''Skip Test Decorator
+ Runs the test only without root permissions'''
+ def func_wrapper(*args, **kwargs):
+ if os.getuid() == 0:
+ case_notrun('{}: cannot be run as root'.format(args[0]))
+ else:
+ return func(*args, **kwargs)
+ return func_wrapper
+
def execute_unittest(output, verbosity, debug):
runner = unittest.TextTestRunner(stream=output, descriptions=True,
verbosity=verbosity)
unittest.main(testRunner=runner)
finally:
if not debug:
- sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s',
- r'Ran \1 tests', output.getvalue()))
+ out = output.getvalue()
+ out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
+
+ # Hide skipped tests from the reference output
+ out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
+ out_first_line, out_rest = out.split('\n', 1)
+ out = out_first_line.replace('s', '.') + '\n' + out_rest
+
+ sys.stderr.write(out)
def execute_test(test_function=None,
supported_fmts=[], supported_oses=['linux'],