]> Git Repo - qemu.git/blobdiff - tests/qemu-iotests/iotests.py
iotests: Add qemu_io_log()
[qemu.git] / tests / qemu-iotests / iotests.py
index 84438e837cb347857dcb2c98afd1786969fe685b..fc78852ae58f3636939af65ce8f62733ca01e614 100644 (file)
@@ -35,6 +35,7 @@ from collections import OrderedDict
 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
 from qemu import qtest
 
+assert sys.version_info >= (3,6)
 
 # This will not work if arguments contain spaces but is necessary if we
 # want to support the override options that ./check supports.
@@ -46,6 +47,11 @@ qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
 if os.environ.get('QEMU_IO_OPTIONS'):
     qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
 
+qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
+if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
+    qemu_io_args_no_fmt += \
+        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
+
 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
 if os.environ.get('QEMU_NBD_OPTIONS'):
     qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
@@ -56,6 +62,7 @@ qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
 imgfmt = os.environ.get('IMGFMT', 'raw')
 imgproto = os.environ.get('IMGPROTO', 'file')
 test_dir = os.environ.get('TEST_DIR')
+sock_dir = os.environ.get('SOCK_DIR')
 output_dir = os.environ.get('OUTPUT_DIR', '.')
 cachemode = os.environ.get('CACHEMODE')
 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
@@ -155,6 +162,11 @@ def qemu_io(*args):
         sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
     return subp.communicate()[0]
 
+def qemu_io_log(*args):
+    result = qemu_io(*args)
+    log(result, filters=[filter_testfiles, filter_qemu_io])
+    return result
+
 def qemu_io_silent(*args):
     '''Run qemu-io and return the exit code, suppressing stdout'''
     args = qemu_io_args + list(args)
@@ -164,6 +176,13 @@ def qemu_io_silent(*args):
                          (-exitcode, ' '.join(args)))
     return exitcode
 
+def qemu_io_silent_check(*args):
+    '''Run qemu-io and return the true if subprocess returned 0'''
+    args = qemu_io_args + list(args)
+    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
+                               stderr=subprocess.STDOUT)
+    return exitcode == 0
+
 def get_virtio_scsi_device():
     if qemu_default_machine == 's390-ccw-virtio':
         return 'virtio-scsi-ccw'
@@ -229,6 +248,10 @@ def qemu_nbd_early_pipe(*args):
     else:
         return exitcode, subp.communicate()[0]
 
+def qemu_nbd_popen(*args):
+    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
+    return subprocess.Popen(qemu_nbd_args + ['--persistent'] + list(args))
+
 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
     '''Return True if two image files are identical'''
     return qemu_img('compare', '-f', fmt1,
@@ -250,10 +273,7 @@ def image_size(img):
     return json.loads(r)['virtual-size']
 
 def is_str(val):
-    if sys.version_info.major >= 3:
-        return isinstance(val, str)
-    else:
-        return isinstance(val, str) or isinstance(val, unicode)
+    return isinstance(val, str)
 
 test_dir_re = re.compile(r"%s" % test_dir)
 def filter_test_dir(msg):
@@ -376,10 +396,10 @@ class FilePaths(object):
             qemu_img('create', img_path, '1G')
         # migration_sock_path is automatically deleted
     """
-    def __init__(self, names):
+    def __init__(self, names, base_dir=test_dir):
         self.paths = []
         for name in names:
-            self.paths.append(os.path.join(test_dir, file_pattern(name)))
+            self.paths.append(os.path.join(base_dir, file_pattern(name)))
 
     def __enter__(self):
         return self.paths
@@ -396,8 +416,8 @@ class FilePath(FilePaths):
     """
     FilePath is a specialization of FilePaths that takes a single filename.
     """
-    def __init__(self, name):
-        super(FilePath, self).__init__([name])
+    def __init__(self, name, base_dir=test_dir):
+        super(FilePath, self).__init__([name], base_dir)
 
     def __enter__(self):
         return self.paths[0]
@@ -410,7 +430,7 @@ def file_path_remover():
             pass
 
 
-def file_path(*names):
+def file_path(*names, base_dir=test_dir):
     ''' Another way to get auto-generated filename that cleans itself up.
 
     Use is as simple as:
@@ -426,7 +446,7 @@ def file_path(*names):
     paths = []
     for name in names:
         filename = file_pattern(name)
-        path = os.path.join(test_dir, filename)
+        path = os.path.join(base_dir, filename)
         file_path_remover.paths.append(path)
         paths.append(path)
 
@@ -447,7 +467,8 @@ class VM(qtest.QEMUQtestMachine):
         name = "qemu%s-%d" % (path_suffix, os.getpid())
         super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
                                  test_dir=test_dir,
-                                 socket_scm_helper=socket_scm_helper)
+                                 socket_scm_helper=socket_scm_helper,
+                                 sock_dir=sock_dir)
         self._num_drives = 0
 
     def add_object(self, opts):
@@ -643,6 +664,33 @@ class VM(qtest.QEMUQtestMachine):
                 return x
         return None
 
+    def query_bitmaps(self):
+        res = self.qmp("query-named-block-nodes")
+        return {device['node-name']: device['dirty-bitmaps']
+                for device in res['return'] if 'dirty-bitmaps' in device}
+
+    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
+        """
+        get a specific bitmap from the object returned by query_bitmaps.
+        :param recording: If specified, filter results by the specified value.
+        :param bitmaps: If specified, use it instead of call query_bitmaps()
+        """
+        if bitmaps is None:
+            bitmaps = self.query_bitmaps()
+
+        for bitmap in bitmaps[node_name]:
+            if bitmap.get('name', '') == bitmap_name:
+                if recording is None:
+                    return bitmap
+                elif bitmap.get('recording') == recording:
+                    return bitmap
+        return None
+
+    def check_bitmap_status(self, node_name, bitmap_name, fields):
+        ret = self.get_bitmap(node_name, bitmap_name)
+
+        return fields.items() <= ret.items()
+
 
 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
 
@@ -693,8 +741,8 @@ class QMPTestCase(unittest.TestCase):
             self.fail('no match for "%s" in %s' % (str(result), str(value)))
         else:
             self.assertEqual(result, value,
-                             'values not equal "%s" and "%s"'
-                                 % (str(result), str(value)))
+                             '"%s" is "%s", expected "%s"'
+                                 % (path, str(result), str(value)))
 
     def assert_no_active_block_jobs(self):
         result = self.vm.qmp('query-block-jobs')
@@ -802,6 +850,11 @@ class QMPTestCase(unittest.TestCase):
             return self.pause_wait(job_id)
         return result
 
+    def case_skip(self, reason):
+        '''Skip this test case'''
+        case_notrun(reason)
+        self.skipTest(reason)
+
 
 def notrun(reason):
     '''Skip this test suite'''
@@ -813,7 +866,11 @@ def notrun(reason):
     sys.exit(0)
 
 def case_notrun(reason):
-    '''Skip this test case'''
+    '''Mark this test case as not having been run (without actually
+    skipping it, that is left to the caller).  See
+    QMPTestCase.case_skip() for a variant that actually skips the
+    current test case.'''
+
     # Each test in qemu-iotests has a number ("seq")
     seq = os.path.basename(sys.argv[0])
 
@@ -876,25 +933,47 @@ def qemu_pipe(*args):
 def supported_formats(read_only=False):
     '''Set 'read_only' to True to check ro-whitelist
        Otherwise, rw-whitelist is checked'''
-    format_message = qemu_pipe("-drive", "format=help")
-    line = 1 if read_only else 0
-    return format_message.splitlines()[line].split(":")[1].split()
+
+    if not hasattr(supported_formats, "formats"):
+        supported_formats.formats = {}
+
+    if read_only not in supported_formats.formats:
+        format_message = qemu_pipe("-drive", "format=help")
+        line = 1 if read_only else 0
+        supported_formats.formats[read_only] = \
+            format_message.splitlines()[line].split(":")[1].split()
+
+    return supported_formats.formats[read_only]
 
 def skip_if_unsupported(required_formats=[], read_only=False):
     '''Skip Test Decorator
        Runs the test if all the required formats are whitelisted'''
     def skip_test_decorator(func):
-        def func_wrapper(*args, **kwargs):
-            usf_list = list(set(required_formats) -
-                            set(supported_formats(read_only)))
+        def func_wrapper(test_case: QMPTestCase, *args, **kwargs):
+            if callable(required_formats):
+                fmts = required_formats(test_case)
+            else:
+                fmts = required_formats
+
+            usf_list = list(set(fmts) - set(supported_formats(read_only)))
             if usf_list:
-                case_notrun('{}: formats {} are not whitelisted'.format(
-                    args[0], usf_list))
+                test_case.case_skip('{}: formats {} are not whitelisted'.format(
+                    test_case, usf_list))
             else:
-                return func(*args, **kwargs)
+                return func(test_case, *args, **kwargs)
         return func_wrapper
     return skip_test_decorator
 
+def skip_if_user_is_root(func):
+    '''Skip Test Decorator
+       Runs the test only without root permissions'''
+    def func_wrapper(*args, **kwargs):
+        if os.getuid() == 0:
+            case_notrun('{}: cannot be run as root'.format(args[0]))
+        else:
+            return func(*args, **kwargs)
+    return func_wrapper
+
 def execute_unittest(output, verbosity, debug):
     runner = unittest.TextTestRunner(stream=output, descriptions=True,
                                      verbosity=verbosity)
@@ -904,12 +983,20 @@ def execute_unittest(output, verbosity, debug):
         unittest.main(testRunner=runner)
     finally:
         if not debug:
-            sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s',
-                                    r'Ran \1 tests', output.getvalue()))
+            out = output.getvalue()
+            out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
+
+            # Hide skipped tests from the reference output
+            out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
+            out_first_line, out_rest = out.split('\n', 1)
+            out = out_first_line.replace('s', '.') + '\n' + out_rest
+
+            sys.stderr.write(out)
 
 def execute_test(test_function=None,
                  supported_fmts=[], supported_oses=['linux'],
-                 supported_cache_modes=[], unsupported_fmts=[]):
+                 supported_cache_modes=[], unsupported_fmts=[],
+                 supported_protocols=[], unsupported_protocols=[]):
     """Run either unittest or script-style tests."""
 
     # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
@@ -923,6 +1010,7 @@ def execute_test(test_function=None,
     debug = '-d' in sys.argv
     verbosity = 1
     verify_image_format(supported_fmts, unsupported_fmts)
+    verify_protocol(supported_protocols, unsupported_protocols)
     verify_platform(supported_oses)
     verify_cache_mode(supported_cache_modes)
 
@@ -933,12 +1021,7 @@ def execute_test(test_function=None,
     else:
         # We need to filter out the time taken from the output so that
         # qemu-iotest can reliably diff the results against master output.
-        if sys.version_info.major >= 3:
-            output = io.StringIO()
-        else:
-            # io.StringIO is for unicode strings, which is not what
-            # 2.x's test runner emits.
-            output = io.BytesIO()
+        output = io.StringIO()
 
     logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
 
This page took 0.033359 seconds and 4 git commands to generate.