import io
from collections import OrderedDict
-sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts'))
-import qtest
+sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
+from qemu import qtest
# This will not work if arguments contain spaces but is necessary if we
sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
return exitcode
-def ordered_qmp(qmsg):
+def ordered_qmp(qmsg, conv_keys=True):
# Dictionaries are not ordered prior to 3.6, therefore:
if isinstance(qmsg, list):
return [ordered_qmp(atom) for atom in qmsg]
if isinstance(qmsg, dict):
od = OrderedDict()
for k, v in sorted(qmsg.items()):
- od[k] = ordered_qmp(v)
+ if conv_keys:
+ k = k.replace('_', '-')
+ od[k] = ordered_qmp(v, conv_keys=False)
return od
return qmsg
r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
return json.loads(r)['virtual-size']
+def is_str(val):
+ if sys.version_info.major >= 3:
+ return isinstance(val, str)
+ else:
+ return isinstance(val, str) or isinstance(val, unicode)
+
test_dir_re = re.compile(r"%s" % test_dir)
def filter_test_dir(msg):
return test_dir_re.sub("TEST_DIR", msg)
def filter_qmp_testfiles(qmsg):
def _filter(key, value):
- if key == 'filename' or key == 'backing-file':
+ if is_str(value):
return filter_testfiles(value)
return value
return filter_qmp(qmsg, _filter)
lines.append(line)
return '\n'.join(lines)
+def filter_imgfmt(msg):
+ return msg.replace(imgfmt, 'IMGFMT')
+
+def filter_qmp_imgfmt(qmsg):
+ def _filter(key, value):
+ if is_str(value):
+ return filter_imgfmt(value)
+ return value
+ return filter_qmp(qmsg, _filter)
+
def log(msg, filters=[], indent=None):
'''Logs either a string message or a JSON serializable message (like QMP).
If indent is provided, JSON serializable messages are pretty-printed.'''
log(result, filters, indent=indent)
return result
+ # Returns None on success, and an error string on failure
def run_job(self, job, auto_finalize=True, auto_dismiss=False):
+ error = None
while True:
for ev in self.get_qmp_events_filtered(wait=True):
if ev['event'] == 'JOB_STATUS_CHANGE':
result = self.qmp('query-jobs')
for j in result['return']:
if j['id'] == job:
+ error = j['error']
log('Job failed: %s' % (j['error']))
elif status == 'pending' and not auto_finalize:
self.qmp_log('job-finalize', id=job)
elif status == 'concluded' and not auto_dismiss:
self.qmp_log('job-dismiss', id=job)
elif status == 'null':
- return
+ return error
else:
iotests.log(ev)
+ def node_info(self, node_name):
+ nodes = self.qmp('query-named-block-nodes')
+ for x in nodes['return']:
+ if x['node-name'] == node_name:
+ return x
+ return None
+
index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
# Each test in qemu-iotests has a number ("seq")
seq = os.path.basename(sys.argv[0])
- open('%s/%s.notrun' % (output_dir, seq), 'wb').write(reason + '\n')
+ open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
print('%s not run: %s' % (seq, reason))
sys.exit(0)
+def case_notrun(reason):
+ '''Skip this test case'''
+ # Each test in qemu-iotests has a number ("seq")
+ seq = os.path.basename(sys.argv[0])
+
+ open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
+ ' [case not run] ' + reason + '\n')
+
def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
assert not (supported_fmts and unsupported_fmts)
if not supports_quorum():
notrun('quorum support missing')
+def qemu_pipe(*args):
+ '''Run qemu with an option to print something and exit (e.g. a help option),
+ and return its output'''
+ args = [qemu_prog] + qemu_opts + list(args)
+ subp = subprocess.Popen(args, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ universal_newlines=True)
+ exitcode = subp.wait()
+ if exitcode < 0:
+ sys.stderr.write('qemu received signal %i: %s\n' % (-exitcode,
+ ' '.join(args)))
+ return subp.communicate()[0]
+
+def supported_formats(read_only=False):
+ '''Set 'read_only' to True to check ro-whitelist
+ Otherwise, rw-whitelist is checked'''
+ format_message = qemu_pipe("-drive", "format=help")
+ line = 1 if read_only else 0
+ return format_message.splitlines()[line].split(":")[1].split()
+
+def skip_if_unsupported(required_formats=[], read_only=False):
+ '''Skip Test Decorator
+ Runs the test if all the required formats are whitelisted'''
+ def skip_test_decorator(func):
+ def func_wrapper(*args, **kwargs):
+ usf_list = list(set(required_formats) -
+ set(supported_formats(read_only)))
+ if usf_list:
+ case_notrun('{}: formats {} are not whitelisted'.format(
+ args[0], usf_list))
+ else:
+ return func(*args, **kwargs)
+ return func_wrapper
+ return skip_test_decorator
+
def main(supported_fmts=[], supported_oses=['linux'], supported_cache_modes=[],
unsupported_fmts=[]):
'''Run tests'''