]> Git Repo - qemu.git/blobdiff - tests/qemu-iotests/iotests.py
block: Don't block_job_pause_all() in bdrv_drain_all()
[qemu.git] / tests / qemu-iotests / iotests.py
index 1f30cfcc7547cda4ba94d51d51a0ffa2506c34a6..44477e9295895a752a08b5d60ef79cf2acbc7850 100644 (file)
@@ -27,6 +27,8 @@ sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts'))
 import qtest
 import struct
 import json
+import signal
+import logging
 
 
 # This will not work if arguments contain spaces but is necessary if we
@@ -132,11 +134,59 @@ chown_re = re.compile(r"chown [0-9]+:[0-9]+")
 def filter_chown(msg):
     return chown_re.sub("chown UID:GID", msg)
 
+def filter_qmp_event(event):
+    '''Filter a QMP event dict'''
+    event = dict(event)
+    if 'timestamp' in event:
+        event['timestamp']['seconds'] = 'SECS'
+        event['timestamp']['microseconds'] = 'USECS'
+    return event
+
 def log(msg, filters=[]):
     for flt in filters:
         msg = flt(msg)
     print msg
 
+class Timeout:
+    def __init__(self, seconds, errmsg = "Timeout"):
+        self.seconds = seconds
+        self.errmsg = errmsg
+    def __enter__(self):
+        signal.signal(signal.SIGALRM, self.timeout)
+        signal.setitimer(signal.ITIMER_REAL, self.seconds)
+        return self
+    def __exit__(self, type, value, traceback):
+        signal.setitimer(signal.ITIMER_REAL, 0)
+        return False
+    def timeout(self, signum, frame):
+        raise Exception(self.errmsg)
+
+
+class FilePath(object):
+    '''An auto-generated filename that cleans itself up.
+
+    Use this context manager to generate filenames and ensure that the file
+    gets deleted::
+
+        with TestFilePath('test.img') as img_path:
+            qemu_img('create', img_path, '1G')
+        # migration_sock_path is automatically deleted
+    '''
+    def __init__(self, name):
+        filename = '{0}-{1}'.format(os.getpid(), name)
+        self.path = os.path.join(test_dir, filename)
+
+    def __enter__(self):
+        return self.path
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        try:
+            os.remove(self.path)
+        except OSError:
+            pass
+        return False
+
+
 class VM(qtest.QEMUQtestMachine):
     '''A QEMU VM'''
 
@@ -145,10 +195,13 @@ class VM(qtest.QEMUQtestMachine):
         super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
                                  test_dir=test_dir,
                                  socket_scm_helper=socket_scm_helper)
-        if debug:
-            self._debug = True
         self._num_drives = 0
 
+    def add_object(self, opts):
+        self._args.append('-object')
+        self._args.append(opts)
+        return self
+
     def add_device(self, opts):
         self._args.append('-device')
         self._args.append(opts)
@@ -177,6 +230,19 @@ class VM(qtest.QEMUQtestMachine):
         self._num_drives += 1
         return self
 
+    def add_blockdev(self, opts):
+        self._args.append('-blockdev')
+        if isinstance(opts, str):
+            self._args.append(opts)
+        else:
+            self._args.append(','.join(opts))
+        return self
+
+    def add_incoming(self, addr):
+        self._args.append('-incoming')
+        self._args.append(addr)
+        return self
+
     def pause_drive(self, drive, event=None):
         '''Pause drive r/w operations'''
         if not event:
@@ -235,6 +301,13 @@ class QMPTestCase(unittest.TestCase):
             output[basestr[:-1]] = obj # Strip trailing '.'
         return output
 
+    def qmp_to_opts(self, obj):
+        obj = self.flatten_qmp_object(obj)
+        output_list = list()
+        for key in obj:
+            output_list += [key + '=' + obj[key]]
+        return ','.join(output_list)
+
     def assert_qmp_absent(self, d, path):
         try:
             result = self.dictpath(d, path)
@@ -331,6 +404,18 @@ class QMPTestCase(unittest.TestCase):
         event = self.wait_until_completed(drive=drive)
         self.assert_qmp(event, 'data/type', 'mirror')
 
+    def pause_job(self, job_id='job0'):
+        result = self.vm.qmp('block-job-pause', device=job_id)
+        self.assert_qmp(result, 'return', {})
+
+        with Timeout(1, "Timeout waiting for job to pause"):
+            while True:
+                result = self.vm.qmp('query-block-jobs')
+                for job in result['return']:
+                    if job['device'] == job_id and job['paused'] == True and job['busy'] == False:
+                        return job
+
+
 def notrun(reason):
     '''Skip this test suite'''
     # Each test in qemu-iotests has a number ("seq")
@@ -340,17 +425,22 @@ def notrun(reason):
     print '%s not run: %s' % (seq, reason)
     sys.exit(0)
 
-def verify_image_format(supported_fmts=[]):
+def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
     if supported_fmts and (imgfmt not in supported_fmts):
         notrun('not suitable for this image format: %s' % imgfmt)
+    if unsupported_fmts and (imgfmt in unsupported_fmts):
+        notrun('not suitable for this image format: %s' % imgfmt)
 
 def verify_platform(supported_oses=['linux']):
     if True not in [sys.platform.startswith(x) for x in supported_oses]:
         notrun('not suitable for this OS: %s' % sys.platform)
 
+def supports_quorum():
+    return 'quorum' in qemu_img_pipe('--help')
+
 def verify_quorum():
     '''Skip test suite if quorum support is not available'''
-    if 'quorum' not in qemu_img_pipe('--help'):
+    if not supports_quorum():
         notrun('quorum support missing')
 
 def main(supported_fmts=[], supported_oses=['linux']):
@@ -381,6 +471,8 @@ def main(supported_fmts=[], supported_oses=['linux']):
     else:
         output = StringIO.StringIO()
 
+    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
+
     class MyTestRunner(unittest.TextTestRunner):
         def __init__(self, stream=output, descriptions=True, verbosity=verbosity):
             unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity)
This page took 0.029353 seconds and 4 git commands to generate.