import logging
import atexit
import io
+from collections import OrderedDict
-sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts'))
-import qtest
+sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
+from qemu import qtest
# This will not work if arguments contain spaces but is necessary if we
debug = False
luks_default_secret_object = 'secret,id=keysec0,data=' + \
- os.environ['IMGKEYSECRET']
+ os.environ.get('IMGKEYSECRET', '')
luks_default_key_secret_opt = 'key-secret=keysec0'
sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
return exitcode
+def ordered_qmp(qmsg, conv_keys=True):
+ # Dictionaries are not ordered prior to 3.6, therefore:
+ if isinstance(qmsg, list):
+ return [ordered_qmp(atom) for atom in qmsg]
+ if isinstance(qmsg, dict):
+ od = OrderedDict()
+ for k, v in sorted(qmsg.items()):
+ if conv_keys:
+ k = k.replace('_', '-')
+ od[k] = ordered_qmp(v, conv_keys=False)
+ return od
+ return qmsg
+
def qemu_img_create(*args):
args = list(args)
'''Run qemu-nbd in daemon mode and return the parent's exit code'''
return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
+def qemu_nbd_pipe(*args):
+ '''Run qemu-nbd in daemon mode and return both the parent's exit code
+ and its output'''
+ subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ universal_newlines=True)
+ exitcode = subp.wait()
+ if exitcode < 0:
+ sys.stderr.write('qemu-nbd received signal %i: %s\n' %
+ (-exitcode,
+ ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
+ return exitcode, subp.communicate()[0]
+
def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
'''Return True if two image files are identical'''
return qemu_img('compare', '-f', fmt1,
r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
return json.loads(r)['virtual-size']
+def is_str(val):
+ if sys.version_info.major >= 3:
+ return isinstance(val, str)
+ else:
+ return isinstance(val, str) or isinstance(val, unicode)
+
test_dir_re = re.compile(r"%s" % test_dir)
def filter_test_dir(msg):
return test_dir_re.sub("TEST_DIR", msg)
event['timestamp']['microseconds'] = 'USECS'
return event
+def filter_qmp(qmsg, filter_fn):
+ '''Given a string filter, filter a QMP object's values.
+ filter_fn takes a (key, value) pair.'''
+ # Iterate through either lists or dicts;
+ if isinstance(qmsg, list):
+ items = enumerate(qmsg)
+ else:
+ items = qmsg.items()
+
+ for k, v in items:
+ if isinstance(v, list) or isinstance(v, dict):
+ qmsg[k] = filter_qmp(v, filter_fn)
+ else:
+ qmsg[k] = filter_fn(k, v)
+ return qmsg
+
def filter_testfiles(msg):
prefix = os.path.join(test_dir, "%s-" % (os.getpid()))
return msg.replace(prefix, 'TEST_DIR/PID-')
+def filter_qmp_testfiles(qmsg):
+ def _filter(key, value):
+ if is_str(value):
+ return filter_testfiles(value)
+ return value
+ return filter_qmp(qmsg, _filter)
+
+def filter_generated_node_ids(msg):
+ return re.sub("#block[0-9]+", "NODE_NAME", msg)
+
def filter_img_info(output, filename):
lines = []
for line in output.split('\n'):
.replace(imgfmt, 'IMGFMT')
line = re.sub('iters: [0-9]+', 'iters: XXX', line)
line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line)
+ line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
lines.append(line)
return '\n'.join(lines)
-def log(msg, filters=[]):
+def filter_imgfmt(msg):
+ return msg.replace(imgfmt, 'IMGFMT')
+
+def filter_qmp_imgfmt(qmsg):
+ def _filter(key, value):
+ if is_str(value):
+ return filter_imgfmt(value)
+ return value
+ return filter_qmp(qmsg, _filter)
+
+def log(msg, filters=[], indent=None):
+ '''Logs either a string message or a JSON serializable message (like QMP).
+ If indent is provided, JSON serializable messages are pretty-printed.'''
for flt in filters:
msg = flt(msg)
- if type(msg) is dict or type(msg) is list:
- print(json.dumps(msg, sort_keys=True))
+ if isinstance(msg, dict) or isinstance(msg, list):
+ # Python < 3.4 needs to know not to add whitespace when pretty-printing:
+ separators = (', ', ': ') if indent is None else (',', ': ')
+ # Don't sort if it's already sorted
+ do_sort = not isinstance(msg, OrderedDict)
+ print(json.dumps(msg, sort_keys=do_sort,
+ indent=indent, separators=separators))
else:
print(msg)
result.append(filter_qmp_event(ev))
return result
- def qmp_log(self, cmd, filters=[filter_testfiles], **kwargs):
- logmsg = '{"execute": "%s", "arguments": %s}' % \
- (cmd, json.dumps(kwargs, sort_keys=True))
- log(logmsg, filters)
+ def qmp_log(self, cmd, filters=[], indent=None, **kwargs):
+ full_cmd = OrderedDict((
+ ("execute", cmd),
+ ("arguments", ordered_qmp(kwargs))
+ ))
+ log(full_cmd, filters, indent=indent)
result = self.qmp(cmd, **kwargs)
- log(json.dumps(result, sort_keys=True), filters)
+ log(result, filters, indent=indent)
return result
+ # Returns None on success, and an error string on failure
def run_job(self, job, auto_finalize=True, auto_dismiss=False):
+ error = None
while True:
for ev in self.get_qmp_events_filtered(wait=True):
if ev['event'] == 'JOB_STATUS_CHANGE':
result = self.qmp('query-jobs')
for j in result['return']:
if j['id'] == job:
+ error = j['error']
log('Job failed: %s' % (j['error']))
elif status == 'pending' and not auto_finalize:
self.qmp_log('job-finalize', id=job)
elif status == 'concluded' and not auto_dismiss:
self.qmp_log('job-dismiss', id=job)
elif status == 'null':
- return
+ return error
else:
iotests.log(ev)
+ def node_info(self, node_name):
+ nodes = self.qmp('query-named-block-nodes')
+ for x in nodes['return']:
+ if x['node-name'] == node_name:
+ return x
+ return None
+
index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
def wait_ready_and_cancel(self, drive='drive0'):
self.wait_ready(drive=drive)
event = self.cancel_and_wait(drive=drive)
- self.assertEquals(event['event'], 'BLOCK_JOB_COMPLETED')
+ self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
self.assert_qmp(event, 'data/type', 'mirror')
self.assert_qmp(event, 'data/offset', event['data']['len'])
# Each test in qemu-iotests has a number ("seq")
seq = os.path.basename(sys.argv[0])
- open('%s/%s.notrun' % (output_dir, seq), 'wb').write(reason + '\n')
+ open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
print('%s not run: %s' % (seq, reason))
sys.exit(0)
+def case_notrun(reason):
+ '''Skip this test case'''
+ # Each test in qemu-iotests has a number ("seq")
+ seq = os.path.basename(sys.argv[0])
+
+ open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
+ ' [case not run] ' + reason + '\n')
+
def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
assert not (supported_fmts and unsupported_fmts)
if not supports_quorum():
notrun('quorum support missing')
+def qemu_pipe(*args):
+ '''Run qemu with an option to print something and exit (e.g. a help option),
+ and return its output'''
+ args = [qemu_prog] + qemu_opts + list(args)
+ subp = subprocess.Popen(args, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ universal_newlines=True)
+ exitcode = subp.wait()
+ if exitcode < 0:
+ sys.stderr.write('qemu received signal %i: %s\n' % (-exitcode,
+ ' '.join(args)))
+ return subp.communicate()[0]
+
+def supported_formats(read_only=False):
+ '''Set 'read_only' to True to check ro-whitelist
+ Otherwise, rw-whitelist is checked'''
+ format_message = qemu_pipe("-drive", "format=help")
+ line = 1 if read_only else 0
+ return format_message.splitlines()[line].split(":")[1].split()
+
+def skip_if_unsupported(required_formats=[], read_only=False):
+ '''Skip Test Decorator
+ Runs the test if all the required formats are whitelisted'''
+ def skip_test_decorator(func):
+ def func_wrapper(*args, **kwargs):
+ usf_list = list(set(required_formats) -
+ set(supported_formats(read_only)))
+ if usf_list:
+ case_notrun('{}: formats {} are not whitelisted'.format(
+ args[0], usf_list))
+ else:
+ return func(*args, **kwargs)
+ return func_wrapper
+ return skip_test_decorator
+
def main(supported_fmts=[], supported_oses=['linux'], supported_cache_modes=[],
unsupported_fmts=[]):
'''Run tests'''