]> Git Repo - qemu.git/blobdiff - tests/qemu-iotests/iotests.py
iotests: Add qemu_io_log()
[qemu.git] / tests / qemu-iotests / iotests.py
index 9fb5181c3dc4e58112f21aece13cd1c3dd21aa65..fc78852ae58f3636939af65ce8f62733ca01e614 100644 (file)
@@ -47,6 +47,11 @@ qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
 if os.environ.get('QEMU_IO_OPTIONS'):
     qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
 
+qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
+if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
+    qemu_io_args_no_fmt += \
+        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
+
 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
 if os.environ.get('QEMU_NBD_OPTIONS'):
     qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
@@ -57,6 +62,7 @@ qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
 imgfmt = os.environ.get('IMGFMT', 'raw')
 imgproto = os.environ.get('IMGPROTO', 'file')
 test_dir = os.environ.get('TEST_DIR')
+sock_dir = os.environ.get('SOCK_DIR')
 output_dir = os.environ.get('OUTPUT_DIR', '.')
 cachemode = os.environ.get('CACHEMODE')
 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
@@ -156,6 +162,11 @@ def qemu_io(*args):
         sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
     return subp.communicate()[0]
 
+def qemu_io_log(*args):
+    result = qemu_io(*args)
+    log(result, filters=[filter_testfiles, filter_qemu_io])
+    return result
+
 def qemu_io_silent(*args):
     '''Run qemu-io and return the exit code, suppressing stdout'''
     args = qemu_io_args + list(args)
@@ -165,6 +176,13 @@ def qemu_io_silent(*args):
                          (-exitcode, ' '.join(args)))
     return exitcode
 
+def qemu_io_silent_check(*args):
+    '''Run qemu-io and return the true if subprocess returned 0'''
+    args = qemu_io_args + list(args)
+    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
+                               stderr=subprocess.STDOUT)
+    return exitcode == 0
+
 def get_virtio_scsi_device():
     if qemu_default_machine == 's390-ccw-virtio':
         return 'virtio-scsi-ccw'
@@ -230,6 +248,10 @@ def qemu_nbd_early_pipe(*args):
     else:
         return exitcode, subp.communicate()[0]
 
+def qemu_nbd_popen(*args):
+    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
+    return subprocess.Popen(qemu_nbd_args + ['--persistent'] + list(args))
+
 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
     '''Return True if two image files are identical'''
     return qemu_img('compare', '-f', fmt1,
@@ -374,10 +396,10 @@ class FilePaths(object):
             qemu_img('create', img_path, '1G')
         # migration_sock_path is automatically deleted
     """
-    def __init__(self, names):
+    def __init__(self, names, base_dir=test_dir):
         self.paths = []
         for name in names:
-            self.paths.append(os.path.join(test_dir, file_pattern(name)))
+            self.paths.append(os.path.join(base_dir, file_pattern(name)))
 
     def __enter__(self):
         return self.paths
@@ -394,8 +416,8 @@ class FilePath(FilePaths):
     """
     FilePath is a specialization of FilePaths that takes a single filename.
     """
-    def __init__(self, name):
-        super(FilePath, self).__init__([name])
+    def __init__(self, name, base_dir=test_dir):
+        super(FilePath, self).__init__([name], base_dir)
 
     def __enter__(self):
         return self.paths[0]
@@ -408,7 +430,7 @@ def file_path_remover():
             pass
 
 
-def file_path(*names):
+def file_path(*names, base_dir=test_dir):
     ''' Another way to get auto-generated filename that cleans itself up.
 
     Use is as simple as:
@@ -424,7 +446,7 @@ def file_path(*names):
     paths = []
     for name in names:
         filename = file_pattern(name)
-        path = os.path.join(test_dir, filename)
+        path = os.path.join(base_dir, filename)
         file_path_remover.paths.append(path)
         paths.append(path)
 
@@ -445,7 +467,8 @@ class VM(qtest.QEMUQtestMachine):
         name = "qemu%s-%d" % (path_suffix, os.getpid())
         super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
                                  test_dir=test_dir,
-                                 socket_scm_helper=socket_scm_helper)
+                                 socket_scm_helper=socket_scm_helper,
+                                 sock_dir=sock_dir)
         self._num_drives = 0
 
     def add_object(self, opts):
@@ -641,6 +664,33 @@ class VM(qtest.QEMUQtestMachine):
                 return x
         return None
 
+    def query_bitmaps(self):
+        res = self.qmp("query-named-block-nodes")
+        return {device['node-name']: device['dirty-bitmaps']
+                for device in res['return'] if 'dirty-bitmaps' in device}
+
+    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
+        """
+        get a specific bitmap from the object returned by query_bitmaps.
+        :param recording: If specified, filter results by the specified value.
+        :param bitmaps: If specified, use it instead of call query_bitmaps()
+        """
+        if bitmaps is None:
+            bitmaps = self.query_bitmaps()
+
+        for bitmap in bitmaps[node_name]:
+            if bitmap.get('name', '') == bitmap_name:
+                if recording is None:
+                    return bitmap
+                elif bitmap.get('recording') == recording:
+                    return bitmap
+        return None
+
+    def check_bitmap_status(self, node_name, bitmap_name, fields):
+        ret = self.get_bitmap(node_name, bitmap_name)
+
+        return fields.items() <= ret.items()
+
 
 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
 
@@ -691,8 +741,8 @@ class QMPTestCase(unittest.TestCase):
             self.fail('no match for "%s" in %s' % (str(result), str(value)))
         else:
             self.assertEqual(result, value,
-                             'values not equal "%s" and "%s"'
-                                 % (str(result), str(value)))
+                             '"%s" is "%s", expected "%s"'
+                                 % (path, str(result), str(value)))
 
     def assert_no_active_block_jobs(self):
         result = self.vm.qmp('query-block-jobs')
@@ -800,6 +850,11 @@ class QMPTestCase(unittest.TestCase):
             return self.pause_wait(job_id)
         return result
 
+    def case_skip(self, reason):
+        '''Skip this test case'''
+        case_notrun(reason)
+        self.skipTest(reason)
+
 
 def notrun(reason):
     '''Skip this test suite'''
@@ -811,7 +866,11 @@ def notrun(reason):
     sys.exit(0)
 
 def case_notrun(reason):
-    '''Skip this test case'''
+    '''Mark this test case as not having been run (without actually
+    skipping it, that is left to the caller).  See
+    QMPTestCase.case_skip() for a variant that actually skips the
+    current test case.'''
+
     # Each test in qemu-iotests has a number ("seq")
     seq = os.path.basename(sys.argv[0])
 
@@ -874,25 +933,47 @@ def qemu_pipe(*args):
 def supported_formats(read_only=False):
     '''Set 'read_only' to True to check ro-whitelist
        Otherwise, rw-whitelist is checked'''
-    format_message = qemu_pipe("-drive", "format=help")
-    line = 1 if read_only else 0
-    return format_message.splitlines()[line].split(":")[1].split()
+
+    if not hasattr(supported_formats, "formats"):
+        supported_formats.formats = {}
+
+    if read_only not in supported_formats.formats:
+        format_message = qemu_pipe("-drive", "format=help")
+        line = 1 if read_only else 0
+        supported_formats.formats[read_only] = \
+            format_message.splitlines()[line].split(":")[1].split()
+
+    return supported_formats.formats[read_only]
 
 def skip_if_unsupported(required_formats=[], read_only=False):
     '''Skip Test Decorator
        Runs the test if all the required formats are whitelisted'''
     def skip_test_decorator(func):
-        def func_wrapper(*args, **kwargs):
-            usf_list = list(set(required_formats) -
-                            set(supported_formats(read_only)))
+        def func_wrapper(test_case: QMPTestCase, *args, **kwargs):
+            if callable(required_formats):
+                fmts = required_formats(test_case)
+            else:
+                fmts = required_formats
+
+            usf_list = list(set(fmts) - set(supported_formats(read_only)))
             if usf_list:
-                case_notrun('{}: formats {} are not whitelisted'.format(
-                    args[0], usf_list))
+                test_case.case_skip('{}: formats {} are not whitelisted'.format(
+                    test_case, usf_list))
             else:
-                return func(*args, **kwargs)
+                return func(test_case, *args, **kwargs)
         return func_wrapper
     return skip_test_decorator
 
+def skip_if_user_is_root(func):
+    '''Skip Test Decorator
+       Runs the test only without root permissions'''
+    def func_wrapper(*args, **kwargs):
+        if os.getuid() == 0:
+            case_notrun('{}: cannot be run as root'.format(args[0]))
+        else:
+            return func(*args, **kwargs)
+    return func_wrapper
+
 def execute_unittest(output, verbosity, debug):
     runner = unittest.TextTestRunner(stream=output, descriptions=True,
                                      verbosity=verbosity)
@@ -902,8 +983,15 @@ def execute_unittest(output, verbosity, debug):
         unittest.main(testRunner=runner)
     finally:
         if not debug:
-            sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s',
-                                    r'Ran \1 tests', output.getvalue()))
+            out = output.getvalue()
+            out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
+
+            # Hide skipped tests from the reference output
+            out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
+            out_first_line, out_rest = out.split('\n', 1)
+            out = out_first_line.replace('s', '.') + '\n' + out_rest
+
+            sys.stderr.write(out)
 
 def execute_test(test_function=None,
                  supported_fmts=[], supported_oses=['linux'],
This page took 0.03274 seconds and 4 git commands to generate.