X-Git-Url: https://repo.jachan.dev/qemu.git/blobdiff_plain/2d894bee1c907399fbd59163a18e42c41daf4863..f5b4c31030f45293bb4517445722768434829d91:/tests/qemu-iotests/iotests.py diff --git a/tests/qemu-iotests/iotests.py b/tests/qemu-iotests/iotests.py index a0f35e4b68..997dc910cb 100644 --- a/tests/qemu-iotests/iotests.py +++ b/tests/qemu-iotests/iotests.py @@ -30,9 +30,10 @@ import signal import logging import atexit import io +from collections import OrderedDict -sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts')) -import qtest +sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) +from qemu import qtest # This will not work if arguments contain spaces but is necessary if we @@ -63,7 +64,7 @@ socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') debug = False luks_default_secret_object = 'secret,id=keysec0,data=' + \ - os.environ['IMGKEYSECRET'] + os.environ.get('IMGKEYSECRET', '') luks_default_key_secret_opt = 'key-secret=keysec0' @@ -75,6 +76,19 @@ def qemu_img(*args): sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args)))) return exitcode +def ordered_qmp(qmsg, conv_keys=True): + # Dictionaries are not ordered prior to 3.6, therefore: + if isinstance(qmsg, list): + return [ordered_qmp(atom) for atom in qmsg] + if isinstance(qmsg, dict): + od = OrderedDict() + for k, v in sorted(qmsg.items()): + if conv_keys: + k = k.replace('_', '-') + od[k] = ordered_qmp(v, conv_keys=False) + return od + return qmsg + def qemu_img_create(*args): args = list(args) @@ -190,6 +204,20 @@ def qemu_nbd(*args): '''Run qemu-nbd in daemon mode and return the parent's exit code''' return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) +def qemu_nbd_pipe(*args): + '''Run qemu-nbd in daemon mode and return both the parent's exit code + and its output''' + subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args), + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True) + exitcode = subp.wait() + if exitcode < 0: + sys.stderr.write('qemu-nbd received signal %i: %s\n' % + (-exitcode, + ' '.join(qemu_nbd_args + ['--fork'] + list(args)))) + return exitcode, subp.communicate()[0] + def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): '''Return True if two image files are identical''' return qemu_img('compare', '-f', fmt1, @@ -210,6 +238,12 @@ def image_size(img): r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) return json.loads(r)['virtual-size'] +def is_str(val): + if sys.version_info.major >= 3: + return isinstance(val, str) + else: + return isinstance(val, str) or isinstance(val, unicode) + test_dir_re = re.compile(r"%s" % test_dir) def filter_test_dir(msg): return test_dir_re.sub("TEST_DIR", msg) @@ -235,10 +269,36 @@ def filter_qmp_event(event): event['timestamp']['microseconds'] = 'USECS' return event +def filter_qmp(qmsg, filter_fn): + '''Given a string filter, filter a QMP object's values. + filter_fn takes a (key, value) pair.''' + # Iterate through either lists or dicts; + if isinstance(qmsg, list): + items = enumerate(qmsg) + else: + items = qmsg.items() + + for k, v in items: + if isinstance(v, list) or isinstance(v, dict): + qmsg[k] = filter_qmp(v, filter_fn) + else: + qmsg[k] = filter_fn(k, v) + return qmsg + def filter_testfiles(msg): prefix = os.path.join(test_dir, "%s-" % (os.getpid())) return msg.replace(prefix, 'TEST_DIR/PID-') +def filter_qmp_testfiles(qmsg): + def _filter(key, value): + if is_str(value): + return filter_testfiles(value) + return value + return filter_qmp(qmsg, _filter) + +def filter_generated_node_ids(msg): + return re.sub("#block[0-9]+", "NODE_NAME", msg) + def filter_img_info(output, filename): lines = [] for line in output.split('\n'): @@ -248,13 +308,34 @@ def filter_img_info(output, filename): .replace(imgfmt, 'IMGFMT') line = re.sub('iters: [0-9]+', 'iters: XXX', line) line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line) + line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) lines.append(line) return '\n'.join(lines) -def log(msg, filters=[]): +def filter_imgfmt(msg): + return msg.replace(imgfmt, 'IMGFMT') + +def filter_qmp_imgfmt(qmsg): + def _filter(key, value): + if is_str(value): + return filter_imgfmt(value) + return value + return filter_qmp(qmsg, _filter) + +def log(msg, filters=[], indent=None): + '''Logs either a string message or a JSON serializable message (like QMP). + If indent is provided, JSON serializable messages are pretty-printed.''' for flt in filters: msg = flt(msg) - print(msg) + if isinstance(msg, dict) or isinstance(msg, list): + # Python < 3.4 needs to know not to add whitespace when pretty-printing: + separators = (', ', ': ') if indent is None else (',', ': ') + # Don't sort if it's already sorted + do_sort = not isinstance(msg, OrderedDict) + print(json.dumps(msg, sort_keys=do_sort, + indent=indent, separators=separators)) + else: + print(msg) class Timeout: def __init__(self, seconds, errmsg = "Timeout"): @@ -441,14 +522,19 @@ class VM(qtest.QEMUQtestMachine): result.append(filter_qmp_event(ev)) return result - def qmp_log(self, cmd, filters=[filter_testfiles], **kwargs): - logmsg = "{'execute': '%s', 'arguments': %s}" % (cmd, kwargs) - log(logmsg, filters) + def qmp_log(self, cmd, filters=[], indent=None, **kwargs): + full_cmd = OrderedDict(( + ("execute", cmd), + ("arguments", ordered_qmp(kwargs)) + )) + log(full_cmd, filters, indent=indent) result = self.qmp(cmd, **kwargs) - log(str(result), filters) + log(result, filters, indent=indent) return result + # Returns None on success, and an error string on failure def run_job(self, job, auto_finalize=True, auto_dismiss=False): + error = None while True: for ev in self.get_qmp_events_filtered(wait=True): if ev['event'] == 'JOB_STATUS_CHANGE': @@ -457,16 +543,24 @@ class VM(qtest.QEMUQtestMachine): result = self.qmp('query-jobs') for j in result['return']: if j['id'] == job: + error = j['error'] log('Job failed: %s' % (j['error'])) elif status == 'pending' and not auto_finalize: self.qmp_log('job-finalize', id=job) elif status == 'concluded' and not auto_dismiss: self.qmp_log('job-dismiss', id=job) elif status == 'null': - return + return error else: iotests.log(ev) + def node_info(self, node_name): + nodes = self.qmp('query-named-block-nodes') + for x in nodes['return']: + if x['node-name'] == node_name: + return x + return None + index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') @@ -577,7 +671,7 @@ class QMPTestCase(unittest.TestCase): def wait_ready_and_cancel(self, drive='drive0'): self.wait_ready(drive=drive) event = self.cancel_and_wait(drive=drive) - self.assertEquals(event['event'], 'BLOCK_JOB_COMPLETED') + self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') self.assert_qmp(event, 'data/type', 'mirror') self.assert_qmp(event, 'data/offset', event['data']['len']) @@ -618,10 +712,18 @@ def notrun(reason): # Each test in qemu-iotests has a number ("seq") seq = os.path.basename(sys.argv[0]) - open('%s/%s.notrun' % (output_dir, seq), 'wb').write(reason + '\n') + open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n') print('%s not run: %s' % (seq, reason)) sys.exit(0) +def case_notrun(reason): + '''Skip this test case''' + # Each test in qemu-iotests has a number ("seq") + seq = os.path.basename(sys.argv[0]) + + open('%s/%s.casenotrun' % (output_dir, seq), 'a').write( + ' [case not run] ' + reason + '\n') + def verify_image_format(supported_fmts=[], unsupported_fmts=[]): assert not (supported_fmts and unsupported_fmts) @@ -662,6 +764,41 @@ def verify_quorum(): if not supports_quorum(): notrun('quorum support missing') +def qemu_pipe(*args): + '''Run qemu with an option to print something and exit (e.g. a help option), + and return its output''' + args = [qemu_prog] + qemu_opts + list(args) + subp = subprocess.Popen(args, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True) + exitcode = subp.wait() + if exitcode < 0: + sys.stderr.write('qemu received signal %i: %s\n' % (-exitcode, + ' '.join(args))) + return subp.communicate()[0] + +def supported_formats(read_only=False): + '''Set 'read_only' to True to check ro-whitelist + Otherwise, rw-whitelist is checked''' + format_message = qemu_pipe("-drive", "format=help") + line = 1 if read_only else 0 + return format_message.splitlines()[line].split(":")[1].split() + +def skip_if_unsupported(required_formats=[], read_only=False): + '''Skip Test Decorator + Runs the test if all the required formats are whitelisted''' + def skip_test_decorator(func): + def func_wrapper(*args, **kwargs): + usf_list = list(set(required_formats) - + set(supported_formats(read_only))) + if usf_list: + case_notrun('{}: formats {} are not whitelisted'.format( + args[0], usf_list)) + else: + return func(*args, **kwargs) + return func_wrapper + return skip_test_decorator + def main(supported_fmts=[], supported_oses=['linux'], supported_cache_modes=[], unsupported_fmts=[]): '''Run tests'''