sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
from qemu import qtest
+assert sys.version_info >= (3,6)
# This will not work if arguments contain spaces but is necessary if we
# want to support the override options that ./check supports.
if os.environ.get('QEMU_IO_OPTIONS'):
qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
+qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
+if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
+ qemu_io_args_no_fmt += \
+ os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
+
qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
if os.environ.get('QEMU_NBD_OPTIONS'):
qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
imgfmt = os.environ.get('IMGFMT', 'raw')
imgproto = os.environ.get('IMGPROTO', 'file')
test_dir = os.environ.get('TEST_DIR')
+sock_dir = os.environ.get('SOCK_DIR')
output_dir = os.environ.get('OUTPUT_DIR', '.')
cachemode = os.environ.get('CACHEMODE')
qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
return subp.communicate()[0]
+def qemu_io_log(*args):
+ result = qemu_io(*args)
+ log(result, filters=[filter_testfiles, filter_qemu_io])
+ return result
+
def qemu_io_silent(*args):
'''Run qemu-io and return the exit code, suppressing stdout'''
args = qemu_io_args + list(args)
(-exitcode, ' '.join(args)))
return exitcode
+def qemu_io_silent_check(*args):
+ '''Run qemu-io and return the true if subprocess returned 0'''
+ args = qemu_io_args + list(args)
+ exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
+ stderr=subprocess.STDOUT)
+ return exitcode == 0
+
def get_virtio_scsi_device():
if qemu_default_machine == 's390-ccw-virtio':
return 'virtio-scsi-ccw'
else:
return exitcode, subp.communicate()[0]
+def qemu_nbd_popen(*args):
+ '''Run qemu-nbd in daemon mode and return the parent's exit code'''
+ return subprocess.Popen(qemu_nbd_args + ['--persistent'] + list(args))
+
def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
'''Return True if two image files are identical'''
return qemu_img('compare', '-f', fmt1,
return json.loads(r)['virtual-size']
def is_str(val):
- if sys.version_info.major >= 3:
- return isinstance(val, str)
- else:
- return isinstance(val, str) or isinstance(val, unicode)
+ return isinstance(val, str)
test_dir_re = re.compile(r"%s" % test_dir)
def filter_test_dir(msg):
qemu_img('create', img_path, '1G')
# migration_sock_path is automatically deleted
"""
- def __init__(self, names):
+ def __init__(self, names, base_dir=test_dir):
self.paths = []
for name in names:
- self.paths.append(os.path.join(test_dir, file_pattern(name)))
+ self.paths.append(os.path.join(base_dir, file_pattern(name)))
def __enter__(self):
return self.paths
"""
FilePath is a specialization of FilePaths that takes a single filename.
"""
- def __init__(self, name):
- super(FilePath, self).__init__([name])
+ def __init__(self, name, base_dir=test_dir):
+ super(FilePath, self).__init__([name], base_dir)
def __enter__(self):
return self.paths[0]
pass
-def file_path(*names):
+def file_path(*names, base_dir=test_dir):
''' Another way to get auto-generated filename that cleans itself up.
Use is as simple as:
paths = []
for name in names:
filename = file_pattern(name)
- path = os.path.join(test_dir, filename)
+ path = os.path.join(base_dir, filename)
file_path_remover.paths.append(path)
paths.append(path)
name = "qemu%s-%d" % (path_suffix, os.getpid())
super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
test_dir=test_dir,
- socket_scm_helper=socket_scm_helper)
+ socket_scm_helper=socket_scm_helper,
+ sock_dir=sock_dir)
self._num_drives = 0
def add_object(self, opts):
return x
return None
+ def query_bitmaps(self):
+ res = self.qmp("query-named-block-nodes")
+ return {device['node-name']: device['dirty-bitmaps']
+ for device in res['return'] if 'dirty-bitmaps' in device}
+
+ def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
+ """
+ get a specific bitmap from the object returned by query_bitmaps.
+ :param recording: If specified, filter results by the specified value.
+ :param bitmaps: If specified, use it instead of call query_bitmaps()
+ """
+ if bitmaps is None:
+ bitmaps = self.query_bitmaps()
+
+ for bitmap in bitmaps[node_name]:
+ if bitmap.get('name', '') == bitmap_name:
+ if recording is None:
+ return bitmap
+ elif bitmap.get('recording') == recording:
+ return bitmap
+ return None
+
+ def check_bitmap_status(self, node_name, bitmap_name, fields):
+ ret = self.get_bitmap(node_name, bitmap_name)
+
+ return fields.items() <= ret.items()
+
index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
self.fail('no match for "%s" in %s' % (str(result), str(value)))
else:
self.assertEqual(result, value,
- 'values not equal "%s" and "%s"'
- % (str(result), str(value)))
+ '"%s" is "%s", expected "%s"'
+ % (path, str(result), str(value)))
def assert_no_active_block_jobs(self):
result = self.vm.qmp('query-block-jobs')
return self.pause_wait(job_id)
return result
+ def case_skip(self, reason):
+ '''Skip this test case'''
+ case_notrun(reason)
+ self.skipTest(reason)
+
def notrun(reason):
'''Skip this test suite'''
sys.exit(0)
def case_notrun(reason):
- '''Skip this test case'''
+ '''Mark this test case as not having been run (without actually
+ skipping it, that is left to the caller). See
+ QMPTestCase.case_skip() for a variant that actually skips the
+ current test case.'''
+
# Each test in qemu-iotests has a number ("seq")
seq = os.path.basename(sys.argv[0])
def supported_formats(read_only=False):
'''Set 'read_only' to True to check ro-whitelist
Otherwise, rw-whitelist is checked'''
- format_message = qemu_pipe("-drive", "format=help")
- line = 1 if read_only else 0
- return format_message.splitlines()[line].split(":")[1].split()
+
+ if not hasattr(supported_formats, "formats"):
+ supported_formats.formats = {}
+
+ if read_only not in supported_formats.formats:
+ format_message = qemu_pipe("-drive", "format=help")
+ line = 1 if read_only else 0
+ supported_formats.formats[read_only] = \
+ format_message.splitlines()[line].split(":")[1].split()
+
+ return supported_formats.formats[read_only]
def skip_if_unsupported(required_formats=[], read_only=False):
'''Skip Test Decorator
Runs the test if all the required formats are whitelisted'''
def skip_test_decorator(func):
- def func_wrapper(*args, **kwargs):
- usf_list = list(set(required_formats) -
- set(supported_formats(read_only)))
+ def func_wrapper(test_case: QMPTestCase, *args, **kwargs):
+ if callable(required_formats):
+ fmts = required_formats(test_case)
+ else:
+ fmts = required_formats
+
+ usf_list = list(set(fmts) - set(supported_formats(read_only)))
if usf_list:
- case_notrun('{}: formats {} are not whitelisted'.format(
- args[0], usf_list))
+ test_case.case_skip('{}: formats {} are not whitelisted'.format(
+ test_case, usf_list))
else:
- return func(*args, **kwargs)
+ return func(test_case, *args, **kwargs)
return func_wrapper
return skip_test_decorator
+def skip_if_user_is_root(func):
+ '''Skip Test Decorator
+ Runs the test only without root permissions'''
+ def func_wrapper(*args, **kwargs):
+ if os.getuid() == 0:
+ case_notrun('{}: cannot be run as root'.format(args[0]))
+ else:
+ return func(*args, **kwargs)
+ return func_wrapper
+
def execute_unittest(output, verbosity, debug):
runner = unittest.TextTestRunner(stream=output, descriptions=True,
verbosity=verbosity)
unittest.main(testRunner=runner)
finally:
if not debug:
- sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s',
- r'Ran \1 tests', output.getvalue()))
+ out = output.getvalue()
+ out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
+
+ # Hide skipped tests from the reference output
+ out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
+ out_first_line, out_rest = out.split('\n', 1)
+ out = out_first_line.replace('s', '.') + '\n' + out_rest
+
+ sys.stderr.write(out)
def execute_test(test_function=None,
supported_fmts=[], supported_oses=['linux'],
- supported_cache_modes=[], unsupported_fmts=[]):
+ supported_cache_modes=[], unsupported_fmts=[],
+ supported_protocols=[], unsupported_protocols=[]):
"""Run either unittest or script-style tests."""
# We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
debug = '-d' in sys.argv
verbosity = 1
verify_image_format(supported_fmts, unsupported_fmts)
+ verify_protocol(supported_protocols, unsupported_protocols)
verify_platform(supported_oses)
verify_cache_mode(supported_cache_modes)
else:
# We need to filter out the time taken from the output so that
# qemu-iotest can reliably diff the results against master output.
- if sys.version_info.major >= 3:
- output = io.StringIO()
- else:
- # io.StringIO is for unicode strings, which is not what
- # 2.x's test runner emits.
- output = io.BytesIO()
+ output = io.StringIO()
logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))