import qtest
import struct
import json
+import signal
+import logging
# This will not work if arguments contain spaces but is necessary if we
def filter_chown(msg):
return chown_re.sub("chown UID:GID", msg)
+def filter_qmp_event(event):
+ '''Filter a QMP event dict'''
+ event = dict(event)
+ if 'timestamp' in event:
+ event['timestamp']['seconds'] = 'SECS'
+ event['timestamp']['microseconds'] = 'USECS'
+ return event
+
def log(msg, filters=[]):
for flt in filters:
msg = flt(msg)
print msg
+class Timeout:
+ def __init__(self, seconds, errmsg = "Timeout"):
+ self.seconds = seconds
+ self.errmsg = errmsg
+ def __enter__(self):
+ signal.signal(signal.SIGALRM, self.timeout)
+ signal.setitimer(signal.ITIMER_REAL, self.seconds)
+ return self
+ def __exit__(self, type, value, traceback):
+ signal.setitimer(signal.ITIMER_REAL, 0)
+ return False
+ def timeout(self, signum, frame):
+ raise Exception(self.errmsg)
+
+
+class FilePath(object):
+ '''An auto-generated filename that cleans itself up.
+
+ Use this context manager to generate filenames and ensure that the file
+ gets deleted::
+
+ with TestFilePath('test.img') as img_path:
+ qemu_img('create', img_path, '1G')
+ # migration_sock_path is automatically deleted
+ '''
+ def __init__(self, name):
+ filename = '{0}-{1}'.format(os.getpid(), name)
+ self.path = os.path.join(test_dir, filename)
+
+ def __enter__(self):
+ return self.path
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ try:
+ os.remove(self.path)
+ except OSError:
+ pass
+ return False
+
+
class VM(qtest.QEMUQtestMachine):
'''A QEMU VM'''
super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
test_dir=test_dir,
socket_scm_helper=socket_scm_helper)
- if debug:
- self._debug = True
self._num_drives = 0
+ def add_object(self, opts):
+ self._args.append('-object')
+ self._args.append(opts)
+ return self
+
def add_device(self, opts):
self._args.append('-device')
self._args.append(opts)
self._num_drives += 1
return self
+ def add_blockdev(self, opts):
+ self._args.append('-blockdev')
+ if isinstance(opts, str):
+ self._args.append(opts)
+ else:
+ self._args.append(','.join(opts))
+ return self
+
+ def add_incoming(self, addr):
+ self._args.append('-incoming')
+ self._args.append(addr)
+ return self
+
def pause_drive(self, drive, event=None):
'''Pause drive r/w operations'''
if not event:
output[basestr[:-1]] = obj # Strip trailing '.'
return output
+ def qmp_to_opts(self, obj):
+ obj = self.flatten_qmp_object(obj)
+ output_list = list()
+ for key in obj:
+ output_list += [key + '=' + obj[key]]
+ return ','.join(output_list)
+
def assert_qmp_absent(self, d, path):
try:
result = self.dictpath(d, path)
event = self.wait_until_completed(drive=drive)
self.assert_qmp(event, 'data/type', 'mirror')
+ def pause_job(self, job_id='job0'):
+ result = self.vm.qmp('block-job-pause', device=job_id)
+ self.assert_qmp(result, 'return', {})
+
+ with Timeout(1, "Timeout waiting for job to pause"):
+ while True:
+ result = self.vm.qmp('query-block-jobs')
+ for job in result['return']:
+ if job['device'] == job_id and job['paused'] == True and job['busy'] == False:
+ return job
+
+
def notrun(reason):
'''Skip this test suite'''
# Each test in qemu-iotests has a number ("seq")
print '%s not run: %s' % (seq, reason)
sys.exit(0)
-def verify_image_format(supported_fmts=[]):
+def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
if supported_fmts and (imgfmt not in supported_fmts):
notrun('not suitable for this image format: %s' % imgfmt)
+ if unsupported_fmts and (imgfmt in unsupported_fmts):
+ notrun('not suitable for this image format: %s' % imgfmt)
def verify_platform(supported_oses=['linux']):
if True not in [sys.platform.startswith(x) for x in supported_oses]:
notrun('not suitable for this OS: %s' % sys.platform)
+def supports_quorum():
+ return 'quorum' in qemu_img_pipe('--help')
+
def verify_quorum():
'''Skip test suite if quorum support is not available'''
- if 'quorum' not in qemu_img_pipe('--help'):
+ if not supports_quorum():
notrun('quorum support missing')
def main(supported_fmts=[], supported_oses=['linux']):
else:
output = StringIO.StringIO()
+ logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
+
class MyTestRunner(unittest.TextTestRunner):
def __init__(self, stream=output, descriptions=True, verbosity=verbosity):
unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity)