]> Git Repo - qemu.git/blob - tests/qemu-iotests/iotests.py
Merge remote-tracking branch 'remotes/bonzini/tags/for-upstream' into staging
[qemu.git] / tests / qemu-iotests / iotests.py
1 # Common utilities and Python wrappers for qemu-iotests
2 #
3 # Copyright (C) 2012 IBM Corp.
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 import errno
20 import os
21 import re
22 import subprocess
23 import string
24 import unittest
25 import sys
26 import struct
27 import json
28 import signal
29 import logging
30 import atexit
31
32 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts'))
33 import qtest
34
35
36 # This will not work if arguments contain spaces but is necessary if we
37 # want to support the override options that ./check supports.
38 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
39 if os.environ.get('QEMU_IMG_OPTIONS'):
40     qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
41
42 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
43 if os.environ.get('QEMU_IO_OPTIONS'):
44     qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
45
46 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
47 if os.environ.get('QEMU_NBD_OPTIONS'):
48     qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
49
50 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
51 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
52
53 imgfmt = os.environ.get('IMGFMT', 'raw')
54 imgproto = os.environ.get('IMGPROTO', 'file')
55 test_dir = os.environ.get('TEST_DIR')
56 output_dir = os.environ.get('OUTPUT_DIR', '.')
57 cachemode = os.environ.get('CACHEMODE')
58 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
59
60 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
61 debug = False
62
63 luks_default_secret_object = 'secret,id=keysec0,data=' + \
64                              os.environ['IMGKEYSECRET']
65 luks_default_key_secret_opt = 'key-secret=keysec0'
66
67
68 def qemu_img(*args):
69     '''Run qemu-img and return the exit code'''
70     devnull = open('/dev/null', 'r+')
71     exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull)
72     if exitcode < 0:
73         sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
74     return exitcode
75
76 def qemu_img_create(*args):
77     args = list(args)
78
79     # default luks support
80     if '-f' in args and args[args.index('-f') + 1] == 'luks':
81         if '-o' in args:
82             i = args.index('-o')
83             if 'key-secret' not in args[i + 1]:
84                 args[i + 1].append(luks_default_key_secret_opt)
85                 args.insert(i + 2, '--object')
86                 args.insert(i + 3, luks_default_secret_object)
87         else:
88             args = ['-o', luks_default_key_secret_opt,
89                     '--object', luks_default_secret_object] + args
90
91     args.insert(0, 'create')
92
93     return qemu_img(*args)
94
95 def qemu_img_verbose(*args):
96     '''Run qemu-img without suppressing its output and return the exit code'''
97     exitcode = subprocess.call(qemu_img_args + list(args))
98     if exitcode < 0:
99         sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
100     return exitcode
101
102 def qemu_img_pipe(*args):
103     '''Run qemu-img and return its output'''
104     subp = subprocess.Popen(qemu_img_args + list(args),
105                             stdout=subprocess.PIPE,
106                             stderr=subprocess.STDOUT)
107     exitcode = subp.wait()
108     if exitcode < 0:
109         sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
110     return subp.communicate()[0]
111
112 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=[]):
113     args = [ 'info' ]
114     if imgopts:
115         args.append('--image-opts')
116     else:
117         args += [ '-f', imgfmt ]
118     args += extra_args
119     args.append(filename)
120
121     output = qemu_img_pipe(*args)
122     if not filter_path:
123         filter_path = filename
124     log(filter_img_info(output, filter_path))
125
126 def qemu_io(*args):
127     '''Run qemu-io and return the stdout data'''
128     args = qemu_io_args + list(args)
129     subp = subprocess.Popen(args, stdout=subprocess.PIPE,
130                             stderr=subprocess.STDOUT)
131     exitcode = subp.wait()
132     if exitcode < 0:
133         sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
134     return subp.communicate()[0]
135
136
137 class QemuIoInteractive:
138     def __init__(self, *args):
139         self.args = qemu_io_args + list(args)
140         self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
141                                    stdout=subprocess.PIPE,
142                                    stderr=subprocess.STDOUT)
143         assert self._p.stdout.read(9) == 'qemu-io> '
144
145     def close(self):
146         self._p.communicate('q\n')
147
148     def _read_output(self):
149         pattern = 'qemu-io> '
150         n = len(pattern)
151         pos = 0
152         s = []
153         while pos != n:
154             c = self._p.stdout.read(1)
155             # check unexpected EOF
156             assert c != ''
157             s.append(c)
158             if c == pattern[pos]:
159                 pos += 1
160             else:
161                 pos = 0
162
163         return ''.join(s[:-n])
164
165     def cmd(self, cmd):
166         # quit command is in close(), '\n' is added automatically
167         assert '\n' not in cmd
168         cmd = cmd.strip()
169         assert cmd != 'q' and cmd != 'quit'
170         self._p.stdin.write(cmd + '\n')
171         return self._read_output()
172
173
174 def qemu_nbd(*args):
175     '''Run qemu-nbd in daemon mode and return the parent's exit code'''
176     return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
177
178 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
179     '''Return True if two image files are identical'''
180     return qemu_img('compare', '-f', fmt1,
181                     '-F', fmt2, img1, img2) == 0
182
183 def create_image(name, size):
184     '''Create a fully-allocated raw image with sector markers'''
185     file = open(name, 'w')
186     i = 0
187     while i < size:
188         sector = struct.pack('>l504xl', i / 512, i / 512)
189         file.write(sector)
190         i = i + 512
191     file.close()
192
193 def image_size(img):
194     '''Return image's virtual size'''
195     r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
196     return json.loads(r)['virtual-size']
197
198 test_dir_re = re.compile(r"%s" % test_dir)
199 def filter_test_dir(msg):
200     return test_dir_re.sub("TEST_DIR", msg)
201
202 win32_re = re.compile(r"\r")
203 def filter_win32(msg):
204     return win32_re.sub("", msg)
205
206 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)")
207 def filter_qemu_io(msg):
208     msg = filter_win32(msg)
209     return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg)
210
211 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
212 def filter_chown(msg):
213     return chown_re.sub("chown UID:GID", msg)
214
215 def filter_qmp_event(event):
216     '''Filter a QMP event dict'''
217     event = dict(event)
218     if 'timestamp' in event:
219         event['timestamp']['seconds'] = 'SECS'
220         event['timestamp']['microseconds'] = 'USECS'
221     return event
222
223 def filter_testfiles(msg):
224     prefix = os.path.join(test_dir, "%s-" % (os.getpid()))
225     return msg.replace(prefix, 'TEST_DIR/PID-')
226
227 def filter_img_info(output, filename):
228     lines = []
229     for line in output.split('\n'):
230         if 'disk size' in line or 'actual-size' in line:
231             continue
232         line = line.replace(filename, 'TEST_IMG') \
233                    .replace(imgfmt, 'IMGFMT')
234         line = re.sub('iters: [0-9]+', 'iters: XXX', line)
235         line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line)
236         lines.append(line)
237     return '\n'.join(lines)
238
239 def log(msg, filters=[]):
240     for flt in filters:
241         msg = flt(msg)
242     print msg
243
244 class Timeout:
245     def __init__(self, seconds, errmsg = "Timeout"):
246         self.seconds = seconds
247         self.errmsg = errmsg
248     def __enter__(self):
249         signal.signal(signal.SIGALRM, self.timeout)
250         signal.setitimer(signal.ITIMER_REAL, self.seconds)
251         return self
252     def __exit__(self, type, value, traceback):
253         signal.setitimer(signal.ITIMER_REAL, 0)
254         return False
255     def timeout(self, signum, frame):
256         raise Exception(self.errmsg)
257
258
259 class FilePath(object):
260     '''An auto-generated filename that cleans itself up.
261
262     Use this context manager to generate filenames and ensure that the file
263     gets deleted::
264
265         with TestFilePath('test.img') as img_path:
266             qemu_img('create', img_path, '1G')
267         # migration_sock_path is automatically deleted
268     '''
269     def __init__(self, name):
270         filename = '{0}-{1}'.format(os.getpid(), name)
271         self.path = os.path.join(test_dir, filename)
272
273     def __enter__(self):
274         return self.path
275
276     def __exit__(self, exc_type, exc_val, exc_tb):
277         try:
278             os.remove(self.path)
279         except OSError:
280             pass
281         return False
282
283
284 def file_path_remover():
285     for path in reversed(file_path_remover.paths):
286         try:
287             os.remove(path)
288         except OSError:
289             pass
290
291
292 def file_path(*names):
293     ''' Another way to get auto-generated filename that cleans itself up.
294
295     Use is as simple as:
296
297     img_a, img_b = file_path('a.img', 'b.img')
298     sock = file_path('socket')
299     '''
300
301     if not hasattr(file_path_remover, 'paths'):
302         file_path_remover.paths = []
303         atexit.register(file_path_remover)
304
305     paths = []
306     for name in names:
307         filename = '{0}-{1}'.format(os.getpid(), name)
308         path = os.path.join(test_dir, filename)
309         file_path_remover.paths.append(path)
310         paths.append(path)
311
312     return paths[0] if len(paths) == 1 else paths
313
314 def remote_filename(path):
315     if imgproto == 'file':
316         return path
317     elif imgproto == 'ssh':
318         return "ssh://127.0.0.1%s" % (path)
319     else:
320         raise Exception("Protocol %s not supported" % (imgproto))
321
322 class VM(qtest.QEMUQtestMachine):
323     '''A QEMU VM'''
324
325     def __init__(self, path_suffix=''):
326         name = "qemu%s-%d" % (path_suffix, os.getpid())
327         super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
328                                  test_dir=test_dir,
329                                  socket_scm_helper=socket_scm_helper)
330         self._num_drives = 0
331
332     def add_object(self, opts):
333         self._args.append('-object')
334         self._args.append(opts)
335         return self
336
337     def add_device(self, opts):
338         self._args.append('-device')
339         self._args.append(opts)
340         return self
341
342     def add_drive_raw(self, opts):
343         self._args.append('-drive')
344         self._args.append(opts)
345         return self
346
347     def add_drive(self, path, opts='', interface='virtio', format=imgfmt):
348         '''Add a virtio-blk drive to the VM'''
349         options = ['if=%s' % interface,
350                    'id=drive%d' % self._num_drives]
351
352         if path is not None:
353             options.append('file=%s' % path)
354             options.append('format=%s' % format)
355             options.append('cache=%s' % cachemode)
356
357         if opts:
358             options.append(opts)
359
360         if format == 'luks' and 'key-secret' not in opts:
361             # default luks support
362             if luks_default_secret_object not in self._args:
363                 self.add_object(luks_default_secret_object)
364
365             options.append(luks_default_key_secret_opt)
366
367         self._args.append('-drive')
368         self._args.append(','.join(options))
369         self._num_drives += 1
370         return self
371
372     def add_blockdev(self, opts):
373         self._args.append('-blockdev')
374         if isinstance(opts, str):
375             self._args.append(opts)
376         else:
377             self._args.append(','.join(opts))
378         return self
379
380     def add_incoming(self, addr):
381         self._args.append('-incoming')
382         self._args.append(addr)
383         return self
384
385     def pause_drive(self, drive, event=None):
386         '''Pause drive r/w operations'''
387         if not event:
388             self.pause_drive(drive, "read_aio")
389             self.pause_drive(drive, "write_aio")
390             return
391         self.qmp('human-monitor-command',
392                     command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive))
393
394     def resume_drive(self, drive):
395         self.qmp('human-monitor-command',
396                     command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive))
397
398     def hmp_qemu_io(self, drive, cmd):
399         '''Write to a given drive using an HMP command'''
400         return self.qmp('human-monitor-command',
401                         command_line='qemu-io %s "%s"' % (drive, cmd))
402
403     def flatten_qmp_object(self, obj, output=None, basestr=''):
404         if output is None:
405             output = dict()
406         if isinstance(obj, list):
407             for i in range(len(obj)):
408                 self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.')
409         elif isinstance(obj, dict):
410             for key in obj:
411                 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
412         else:
413             output[basestr[:-1]] = obj # Strip trailing '.'
414         return output
415
416     def qmp_to_opts(self, obj):
417         obj = self.flatten_qmp_object(obj)
418         output_list = list()
419         for key in obj:
420             output_list += [key + '=' + obj[key]]
421         return ','.join(output_list)
422
423     def get_qmp_events_filtered(self, wait=True):
424         result = []
425         for ev in self.get_qmp_events(wait=wait):
426             result.append(filter_qmp_event(ev))
427         return result
428
429     def qmp_log(self, cmd, filters=[filter_testfiles], **kwargs):
430         logmsg = "{'execute': '%s', 'arguments': %s}" % (cmd, kwargs)
431         log(logmsg, filters)
432         result = self.qmp(cmd, **kwargs)
433         log(str(result), filters)
434         return result
435
436     def run_job(self, job, auto_finalize=True, auto_dismiss=False):
437         while True:
438             for ev in self.get_qmp_events_filtered(wait=True):
439                 if ev['event'] == 'JOB_STATUS_CHANGE':
440                     status = ev['data']['status']
441                     if status == 'aborting':
442                         result = self.qmp('query-jobs')
443                         for j in result['return']:
444                             if j['id'] == job:
445                                 log('Job failed: %s' % (j['error']))
446                     elif status == 'pending' and not auto_finalize:
447                         self.qmp_log('job-finalize', id=job)
448                     elif status == 'concluded' and not auto_dismiss:
449                         self.qmp_log('job-dismiss', id=job)
450                     elif status == 'null':
451                         return
452                 else:
453                     iotests.log(ev)
454
455
456 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
457
458 class QMPTestCase(unittest.TestCase):
459     '''Abstract base class for QMP test cases'''
460
461     def dictpath(self, d, path):
462         '''Traverse a path in a nested dict'''
463         for component in path.split('/'):
464             m = index_re.match(component)
465             if m:
466                 component, idx = m.groups()
467                 idx = int(idx)
468
469             if not isinstance(d, dict) or component not in d:
470                 self.fail('failed path traversal for "%s" in "%s"' % (path, str(d)))
471             d = d[component]
472
473             if m:
474                 if not isinstance(d, list):
475                     self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d)))
476                 try:
477                     d = d[idx]
478                 except IndexError:
479                     self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d)))
480         return d
481
482     def assert_qmp_absent(self, d, path):
483         try:
484             result = self.dictpath(d, path)
485         except AssertionError:
486             return
487         self.fail('path "%s" has value "%s"' % (path, str(result)))
488
489     def assert_qmp(self, d, path, value):
490         '''Assert that the value for a specific path in a QMP dict matches'''
491         result = self.dictpath(d, path)
492         self.assertEqual(result, value, 'values not equal "%s" and "%s"' % (str(result), str(value)))
493
494     def assert_no_active_block_jobs(self):
495         result = self.vm.qmp('query-block-jobs')
496         self.assert_qmp(result, 'return', [])
497
498     def assert_has_block_node(self, node_name=None, file_name=None):
499         """Issue a query-named-block-nodes and assert node_name and/or
500         file_name is present in the result"""
501         def check_equal_or_none(a, b):
502             return a == None or b == None or a == b
503         assert node_name or file_name
504         result = self.vm.qmp('query-named-block-nodes')
505         for x in result["return"]:
506             if check_equal_or_none(x.get("node-name"), node_name) and \
507                     check_equal_or_none(x.get("file"), file_name):
508                 return
509         self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \
510                 (node_name, file_name, result))
511
512     def assert_json_filename_equal(self, json_filename, reference):
513         '''Asserts that the given filename is a json: filename and that its
514            content is equal to the given reference object'''
515         self.assertEqual(json_filename[:5], 'json:')
516         self.assertEqual(self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
517                          self.vm.flatten_qmp_object(reference))
518
519     def cancel_and_wait(self, drive='drive0', force=False, resume=False):
520         '''Cancel a block job and wait for it to finish, returning the event'''
521         result = self.vm.qmp('block-job-cancel', device=drive, force=force)
522         self.assert_qmp(result, 'return', {})
523
524         if resume:
525             self.vm.resume_drive(drive)
526
527         cancelled = False
528         result = None
529         while not cancelled:
530             for event in self.vm.get_qmp_events(wait=True):
531                 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
532                    event['event'] == 'BLOCK_JOB_CANCELLED':
533                     self.assert_qmp(event, 'data/device', drive)
534                     result = event
535                     cancelled = True
536                 elif event['event'] == 'JOB_STATUS_CHANGE':
537                     self.assert_qmp(event, 'data/id', drive)
538
539
540         self.assert_no_active_block_jobs()
541         return result
542
543     def wait_until_completed(self, drive='drive0', check_offset=True):
544         '''Wait for a block job to finish, returning the event'''
545         while True:
546             for event in self.vm.get_qmp_events(wait=True):
547                 if event['event'] == 'BLOCK_JOB_COMPLETED':
548                     self.assert_qmp(event, 'data/device', drive)
549                     self.assert_qmp_absent(event, 'data/error')
550                     if check_offset:
551                         self.assert_qmp(event, 'data/offset', event['data']['len'])
552                     self.assert_no_active_block_jobs()
553                     return event
554                 elif event['event'] == 'JOB_STATUS_CHANGE':
555                     self.assert_qmp(event, 'data/id', drive)
556
557     def wait_ready(self, drive='drive0'):
558         '''Wait until a block job BLOCK_JOB_READY event'''
559         f = {'data': {'type': 'mirror', 'device': drive } }
560         event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
561
562     def wait_ready_and_cancel(self, drive='drive0'):
563         self.wait_ready(drive=drive)
564         event = self.cancel_and_wait(drive=drive)
565         self.assertEquals(event['event'], 'BLOCK_JOB_COMPLETED')
566         self.assert_qmp(event, 'data/type', 'mirror')
567         self.assert_qmp(event, 'data/offset', event['data']['len'])
568
569     def complete_and_wait(self, drive='drive0', wait_ready=True):
570         '''Complete a block job and wait for it to finish'''
571         if wait_ready:
572             self.wait_ready(drive=drive)
573
574         result = self.vm.qmp('block-job-complete', device=drive)
575         self.assert_qmp(result, 'return', {})
576
577         event = self.wait_until_completed(drive=drive)
578         self.assert_qmp(event, 'data/type', 'mirror')
579
580     def pause_wait(self, job_id='job0'):
581         with Timeout(1, "Timeout waiting for job to pause"):
582             while True:
583                 result = self.vm.qmp('query-block-jobs')
584                 for job in result['return']:
585                     if job['device'] == job_id and job['paused'] == True and job['busy'] == False:
586                         return job
587
588     def pause_job(self, job_id='job0', wait=True):
589         result = self.vm.qmp('block-job-pause', device=job_id)
590         self.assert_qmp(result, 'return', {})
591         if wait:
592             return self.pause_wait(job_id)
593         return result
594
595
596 def notrun(reason):
597     '''Skip this test suite'''
598     # Each test in qemu-iotests has a number ("seq")
599     seq = os.path.basename(sys.argv[0])
600
601     open('%s/%s.notrun' % (output_dir, seq), 'wb').write(reason + '\n')
602     print '%s not run: %s' % (seq, reason)
603     sys.exit(0)
604
605 def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
606     assert not (supported_fmts and unsupported_fmts)
607
608     if 'generic' in supported_fmts and \
609             os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
610         # similar to
611         #   _supported_fmt generic
612         # for bash tests
613         return
614
615     not_sup = supported_fmts and (imgfmt not in supported_fmts)
616     if not_sup or (imgfmt in unsupported_fmts):
617         notrun('not suitable for this image format: %s' % imgfmt)
618
619 def verify_protocol(supported=[], unsupported=[]):
620     assert not (supported and unsupported)
621
622     if 'generic' in supported:
623         return
624
625     not_sup = supported and (imgproto not in supported)
626     if not_sup or (imgproto in unsupported):
627         notrun('not suitable for this protocol: %s' % imgproto)
628
629 def verify_platform(supported_oses=['linux']):
630     if True not in [sys.platform.startswith(x) for x in supported_oses]:
631         notrun('not suitable for this OS: %s' % sys.platform)
632
633 def verify_cache_mode(supported_cache_modes=[]):
634     if supported_cache_modes and (cachemode not in supported_cache_modes):
635         notrun('not suitable for this cache mode: %s' % cachemode)
636
637 def supports_quorum():
638     return 'quorum' in qemu_img_pipe('--help')
639
640 def verify_quorum():
641     '''Skip test suite if quorum support is not available'''
642     if not supports_quorum():
643         notrun('quorum support missing')
644
645 def main(supported_fmts=[], supported_oses=['linux'], supported_cache_modes=[],
646          unsupported_fmts=[]):
647     '''Run tests'''
648
649     global debug
650
651     # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
652     # indicate that we're not being run via "check". There may be
653     # other things set up by "check" that individual test cases rely
654     # on.
655     if test_dir is None or qemu_default_machine is None:
656         sys.stderr.write('Please run this test via the "check" script\n')
657         sys.exit(os.EX_USAGE)
658
659     debug = '-d' in sys.argv
660     verbosity = 1
661     verify_image_format(supported_fmts, unsupported_fmts)
662     verify_platform(supported_oses)
663     verify_cache_mode(supported_cache_modes)
664
665     # We need to filter out the time taken from the output so that qemu-iotest
666     # can reliably diff the results against master output.
667     import StringIO
668     if debug:
669         output = sys.stdout
670         verbosity = 2
671         sys.argv.remove('-d')
672     else:
673         output = StringIO.StringIO()
674
675     logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
676
677     class MyTestRunner(unittest.TextTestRunner):
678         def __init__(self, stream=output, descriptions=True, verbosity=verbosity):
679             unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity)
680
681     # unittest.main() will use sys.exit() so expect a SystemExit exception
682     try:
683         unittest.main(testRunner=MyTestRunner)
684     finally:
685         if not debug:
686             sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', output.getvalue()))
This page took 0.064077 seconds and 4 git commands to generate.