]> Git Repo - qemu.git/blobdiff - tests/qemu-iotests/iotests.py
Merge remote-tracking branch 'remotes/stefanha/tags/block-pull-request' into staging
[qemu.git] / tests / qemu-iotests / iotests.py
index cbedfaf1df670ca29cf2838af370785d051142e6..997dc910cb04841c59f17e24d9e98b28aae11368 100644 (file)
@@ -32,8 +32,8 @@ import atexit
 import io
 from collections import OrderedDict
 
-sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts'))
-import qtest
+sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
+from qemu import qtest
 
 
 # This will not work if arguments contain spaces but is necessary if we
@@ -76,15 +76,18 @@ def qemu_img(*args):
         sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
     return exitcode
 
-def ordered_kwargs(kwargs):
-    # kwargs prior to 3.6 are not ordered, so:
-    od = OrderedDict()
-    for k, v in sorted(kwargs.items()):
-        if isinstance(v, dict):
-            od[k] = ordered_kwargs(v)
-        else:
-            od[k] = v
-    return od
+def ordered_qmp(qmsg, conv_keys=True):
+    # Dictionaries are not ordered prior to 3.6, therefore:
+    if isinstance(qmsg, list):
+        return [ordered_qmp(atom) for atom in qmsg]
+    if isinstance(qmsg, dict):
+        od = OrderedDict()
+        for k, v in sorted(qmsg.items()):
+            if conv_keys:
+                k = k.replace('_', '-')
+            od[k] = ordered_qmp(v, conv_keys=False)
+        return od
+    return qmsg
 
 def qemu_img_create(*args):
     args = list(args)
@@ -201,6 +204,20 @@ def qemu_nbd(*args):
     '''Run qemu-nbd in daemon mode and return the parent's exit code'''
     return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
 
+def qemu_nbd_pipe(*args):
+    '''Run qemu-nbd in daemon mode and return both the parent's exit code
+       and its output'''
+    subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
+                            stdout=subprocess.PIPE,
+                            stderr=subprocess.STDOUT,
+                            universal_newlines=True)
+    exitcode = subp.wait()
+    if exitcode < 0:
+        sys.stderr.write('qemu-nbd received signal %i: %s\n' %
+                         (-exitcode,
+                          ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
+    return exitcode, subp.communicate()[0]
+
 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
     '''Return True if two image files are identical'''
     return qemu_img('compare', '-f', fmt1,
@@ -221,6 +238,12 @@ def image_size(img):
     r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
     return json.loads(r)['virtual-size']
 
+def is_str(val):
+    if sys.version_info.major >= 3:
+        return isinstance(val, str)
+    else:
+        return isinstance(val, str) or isinstance(val, unicode)
+
 test_dir_re = re.compile(r"%s" % test_dir)
 def filter_test_dir(msg):
     return test_dir_re.sub("TEST_DIR", msg)
@@ -268,7 +291,7 @@ def filter_testfiles(msg):
 
 def filter_qmp_testfiles(qmsg):
     def _filter(key, value):
-        if key == 'filename' or key == 'backing-file':
+        if is_str(value):
             return filter_testfiles(value)
         return value
     return filter_qmp(qmsg, _filter)
@@ -285,9 +308,20 @@ def filter_img_info(output, filename):
                    .replace(imgfmt, 'IMGFMT')
         line = re.sub('iters: [0-9]+', 'iters: XXX', line)
         line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line)
+        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
         lines.append(line)
     return '\n'.join(lines)
 
+def filter_imgfmt(msg):
+    return msg.replace(imgfmt, 'IMGFMT')
+
+def filter_qmp_imgfmt(qmsg):
+    def _filter(key, value):
+        if is_str(value):
+            return filter_imgfmt(value)
+        return value
+    return filter_qmp(qmsg, _filter)
+
 def log(msg, filters=[], indent=None):
     '''Logs either a string message or a JSON serializable message (like QMP).
     If indent is provided, JSON serializable messages are pretty-printed.'''
@@ -491,14 +525,16 @@ class VM(qtest.QEMUQtestMachine):
     def qmp_log(self, cmd, filters=[], indent=None, **kwargs):
         full_cmd = OrderedDict((
             ("execute", cmd),
-            ("arguments", ordered_kwargs(kwargs))
+            ("arguments", ordered_qmp(kwargs))
         ))
         log(full_cmd, filters, indent=indent)
         result = self.qmp(cmd, **kwargs)
         log(result, filters, indent=indent)
         return result
 
+    # Returns None on success, and an error string on failure
     def run_job(self, job, auto_finalize=True, auto_dismiss=False):
+        error = None
         while True:
             for ev in self.get_qmp_events_filtered(wait=True):
                 if ev['event'] == 'JOB_STATUS_CHANGE':
@@ -507,16 +543,24 @@ class VM(qtest.QEMUQtestMachine):
                         result = self.qmp('query-jobs')
                         for j in result['return']:
                             if j['id'] == job:
+                                error = j['error']
                                 log('Job failed: %s' % (j['error']))
                     elif status == 'pending' and not auto_finalize:
                         self.qmp_log('job-finalize', id=job)
                     elif status == 'concluded' and not auto_dismiss:
                         self.qmp_log('job-dismiss', id=job)
                     elif status == 'null':
-                        return
+                        return error
                 else:
                     iotests.log(ev)
 
+    def node_info(self, node_name):
+        nodes = self.qmp('query-named-block-nodes')
+        for x in nodes['return']:
+            if x['node-name'] == node_name:
+                return x
+        return None
+
 
 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
 
@@ -668,10 +712,18 @@ def notrun(reason):
     # Each test in qemu-iotests has a number ("seq")
     seq = os.path.basename(sys.argv[0])
 
-    open('%s/%s.notrun' % (output_dir, seq), 'wb').write(reason + '\n')
+    open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
     print('%s not run: %s' % (seq, reason))
     sys.exit(0)
 
+def case_notrun(reason):
+    '''Skip this test case'''
+    # Each test in qemu-iotests has a number ("seq")
+    seq = os.path.basename(sys.argv[0])
+
+    open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
+        '    [case not run] ' + reason + '\n')
+
 def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
     assert not (supported_fmts and unsupported_fmts)
 
@@ -712,6 +764,41 @@ def verify_quorum():
     if not supports_quorum():
         notrun('quorum support missing')
 
+def qemu_pipe(*args):
+    '''Run qemu with an option to print something and exit (e.g. a help option),
+    and return its output'''
+    args = [qemu_prog] + qemu_opts + list(args)
+    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
+                            stderr=subprocess.STDOUT,
+                            universal_newlines=True)
+    exitcode = subp.wait()
+    if exitcode < 0:
+        sys.stderr.write('qemu received signal %i: %s\n' % (-exitcode,
+                         ' '.join(args)))
+    return subp.communicate()[0]
+
+def supported_formats(read_only=False):
+    '''Set 'read_only' to True to check ro-whitelist
+       Otherwise, rw-whitelist is checked'''
+    format_message = qemu_pipe("-drive", "format=help")
+    line = 1 if read_only else 0
+    return format_message.splitlines()[line].split(":")[1].split()
+
+def skip_if_unsupported(required_formats=[], read_only=False):
+    '''Skip Test Decorator
+       Runs the test if all the required formats are whitelisted'''
+    def skip_test_decorator(func):
+        def func_wrapper(*args, **kwargs):
+            usf_list = list(set(required_formats) -
+                            set(supported_formats(read_only)))
+            if usf_list:
+                case_notrun('{}: formats {} are not whitelisted'.format(
+                    args[0], usf_list))
+            else:
+                return func(*args, **kwargs)
+        return func_wrapper
+    return skip_test_decorator
+
 def main(supported_fmts=[], supported_oses=['linux'], supported_cache_modes=[],
          unsupported_fmts=[]):
     '''Run tests'''
This page took 0.030832 seconds and 4 git commands to generate.