]> Git Repo - qemu.git/blobdiff - tests/qemu-iotests/iotests.py
iotests: Add qemu_io_log()
[qemu.git] / tests / qemu-iotests / iotests.py
index 693fde155a43bcad30fc59afd3bd5a652723f064..fc78852ae58f3636939af65ce8f62733ca01e614 100644 (file)
@@ -47,6 +47,11 @@ qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
 if os.environ.get('QEMU_IO_OPTIONS'):
     qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
 
+qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
+if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
+    qemu_io_args_no_fmt += \
+        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
+
 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
 if os.environ.get('QEMU_NBD_OPTIONS'):
     qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
@@ -57,6 +62,7 @@ qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
 imgfmt = os.environ.get('IMGFMT', 'raw')
 imgproto = os.environ.get('IMGPROTO', 'file')
 test_dir = os.environ.get('TEST_DIR')
+sock_dir = os.environ.get('SOCK_DIR')
 output_dir = os.environ.get('OUTPUT_DIR', '.')
 cachemode = os.environ.get('CACHEMODE')
 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
@@ -156,6 +162,11 @@ def qemu_io(*args):
         sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
     return subp.communicate()[0]
 
+def qemu_io_log(*args):
+    result = qemu_io(*args)
+    log(result, filters=[filter_testfiles, filter_qemu_io])
+    return result
+
 def qemu_io_silent(*args):
     '''Run qemu-io and return the exit code, suppressing stdout'''
     args = qemu_io_args + list(args)
@@ -385,10 +396,10 @@ class FilePaths(object):
             qemu_img('create', img_path, '1G')
         # migration_sock_path is automatically deleted
     """
-    def __init__(self, names):
+    def __init__(self, names, base_dir=test_dir):
         self.paths = []
         for name in names:
-            self.paths.append(os.path.join(test_dir, file_pattern(name)))
+            self.paths.append(os.path.join(base_dir, file_pattern(name)))
 
     def __enter__(self):
         return self.paths
@@ -405,8 +416,8 @@ class FilePath(FilePaths):
     """
     FilePath is a specialization of FilePaths that takes a single filename.
     """
-    def __init__(self, name):
-        super(FilePath, self).__init__([name])
+    def __init__(self, name, base_dir=test_dir):
+        super(FilePath, self).__init__([name], base_dir)
 
     def __enter__(self):
         return self.paths[0]
@@ -419,7 +430,7 @@ def file_path_remover():
             pass
 
 
-def file_path(*names):
+def file_path(*names, base_dir=test_dir):
     ''' Another way to get auto-generated filename that cleans itself up.
 
     Use is as simple as:
@@ -435,7 +446,7 @@ def file_path(*names):
     paths = []
     for name in names:
         filename = file_pattern(name)
-        path = os.path.join(test_dir, filename)
+        path = os.path.join(base_dir, filename)
         file_path_remover.paths.append(path)
         paths.append(path)
 
@@ -456,7 +467,8 @@ class VM(qtest.QEMUQtestMachine):
         name = "qemu%s-%d" % (path_suffix, os.getpid())
         super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
                                  test_dir=test_dir,
-                                 socket_scm_helper=socket_scm_helper)
+                                 socket_scm_helper=socket_scm_helper,
+                                 sock_dir=sock_dir)
         self._num_drives = 0
 
     def add_object(self, opts):
@@ -729,8 +741,8 @@ class QMPTestCase(unittest.TestCase):
             self.fail('no match for "%s" in %s' % (str(result), str(value)))
         else:
             self.assertEqual(result, value,
-                             'values not equal "%s" and "%s"'
-                                 % (str(result), str(value)))
+                             '"%s" is "%s", expected "%s"'
+                                 % (path, str(result), str(value)))
 
     def assert_no_active_block_jobs(self):
         result = self.vm.qmp('query-block-jobs')
@@ -838,6 +850,11 @@ class QMPTestCase(unittest.TestCase):
             return self.pause_wait(job_id)
         return result
 
+    def case_skip(self, reason):
+        '''Skip this test case'''
+        case_notrun(reason)
+        self.skipTest(reason)
+
 
 def notrun(reason):
     '''Skip this test suite'''
@@ -849,7 +866,11 @@ def notrun(reason):
     sys.exit(0)
 
 def case_notrun(reason):
-    '''Skip this test case'''
+    '''Mark this test case as not having been run (without actually
+    skipping it, that is left to the caller).  See
+    QMPTestCase.case_skip() for a variant that actually skips the
+    current test case.'''
+
     # Each test in qemu-iotests has a number ("seq")
     seq = os.path.basename(sys.argv[0])
 
@@ -912,25 +933,47 @@ def qemu_pipe(*args):
 def supported_formats(read_only=False):
     '''Set 'read_only' to True to check ro-whitelist
        Otherwise, rw-whitelist is checked'''
-    format_message = qemu_pipe("-drive", "format=help")
-    line = 1 if read_only else 0
-    return format_message.splitlines()[line].split(":")[1].split()
+
+    if not hasattr(supported_formats, "formats"):
+        supported_formats.formats = {}
+
+    if read_only not in supported_formats.formats:
+        format_message = qemu_pipe("-drive", "format=help")
+        line = 1 if read_only else 0
+        supported_formats.formats[read_only] = \
+            format_message.splitlines()[line].split(":")[1].split()
+
+    return supported_formats.formats[read_only]
 
 def skip_if_unsupported(required_formats=[], read_only=False):
     '''Skip Test Decorator
        Runs the test if all the required formats are whitelisted'''
     def skip_test_decorator(func):
-        def func_wrapper(*args, **kwargs):
-            usf_list = list(set(required_formats) -
-                            set(supported_formats(read_only)))
+        def func_wrapper(test_case: QMPTestCase, *args, **kwargs):
+            if callable(required_formats):
+                fmts = required_formats(test_case)
+            else:
+                fmts = required_formats
+
+            usf_list = list(set(fmts) - set(supported_formats(read_only)))
             if usf_list:
-                case_notrun('{}: formats {} are not whitelisted'.format(
-                    args[0], usf_list))
+                test_case.case_skip('{}: formats {} are not whitelisted'.format(
+                    test_case, usf_list))
             else:
-                return func(*args, **kwargs)
+                return func(test_case, *args, **kwargs)
         return func_wrapper
     return skip_test_decorator
 
+def skip_if_user_is_root(func):
+    '''Skip Test Decorator
+       Runs the test only without root permissions'''
+    def func_wrapper(*args, **kwargs):
+        if os.getuid() == 0:
+            case_notrun('{}: cannot be run as root'.format(args[0]))
+        else:
+            return func(*args, **kwargs)
+    return func_wrapper
+
 def execute_unittest(output, verbosity, debug):
     runner = unittest.TextTestRunner(stream=output, descriptions=True,
                                      verbosity=verbosity)
@@ -940,8 +983,15 @@ def execute_unittest(output, verbosity, debug):
         unittest.main(testRunner=runner)
     finally:
         if not debug:
-            sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s',
-                                    r'Ran \1 tests', output.getvalue()))
+            out = output.getvalue()
+            out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
+
+            # Hide skipped tests from the reference output
+            out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
+            out_first_line, out_rest = out.split('\n', 1)
+            out = out_first_line.replace('s', '.') + '\n' + out_rest
+
+            sys.stderr.write(out)
 
 def execute_test(test_function=None,
                  supported_fmts=[], supported_oses=['linux'],
This page took 0.030645 seconds and 4 git commands to generate.