]> Git Repo - qemu.git/blob - tests/qemu-iotests/iotests.py
Merge remote-tracking branch 'remotes/ehabkost/tags/x86-next-pull-request' into staging
[qemu.git] / tests / qemu-iotests / iotests.py
1 from __future__ import print_function
2 # Common utilities and Python wrappers for qemu-iotests
3 #
4 # Copyright (C) 2012 IBM Corp.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 import errno
21 import os
22 import re
23 import subprocess
24 import string
25 import unittest
26 import sys
27 import struct
28 import json
29 import signal
30 import logging
31 import atexit
32 import io
33
34 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts'))
35 import qtest
36
37
38 # This will not work if arguments contain spaces but is necessary if we
39 # want to support the override options that ./check supports.
40 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
41 if os.environ.get('QEMU_IMG_OPTIONS'):
42     qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
43
44 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
45 if os.environ.get('QEMU_IO_OPTIONS'):
46     qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
47
48 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
49 if os.environ.get('QEMU_NBD_OPTIONS'):
50     qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
51
52 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
53 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
54
55 imgfmt = os.environ.get('IMGFMT', 'raw')
56 imgproto = os.environ.get('IMGPROTO', 'file')
57 test_dir = os.environ.get('TEST_DIR')
58 output_dir = os.environ.get('OUTPUT_DIR', '.')
59 cachemode = os.environ.get('CACHEMODE')
60 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
61
62 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
63 debug = False
64
65 luks_default_secret_object = 'secret,id=keysec0,data=' + \
66                              os.environ['IMGKEYSECRET']
67 luks_default_key_secret_opt = 'key-secret=keysec0'
68
69
70 def qemu_img(*args):
71     '''Run qemu-img and return the exit code'''
72     devnull = open('/dev/null', 'r+')
73     exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull)
74     if exitcode < 0:
75         sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
76     return exitcode
77
78 def qemu_img_create(*args):
79     args = list(args)
80
81     # default luks support
82     if '-f' in args and args[args.index('-f') + 1] == 'luks':
83         if '-o' in args:
84             i = args.index('-o')
85             if 'key-secret' not in args[i + 1]:
86                 args[i + 1].append(luks_default_key_secret_opt)
87                 args.insert(i + 2, '--object')
88                 args.insert(i + 3, luks_default_secret_object)
89         else:
90             args = ['-o', luks_default_key_secret_opt,
91                     '--object', luks_default_secret_object] + args
92
93     args.insert(0, 'create')
94
95     return qemu_img(*args)
96
97 def qemu_img_verbose(*args):
98     '''Run qemu-img without suppressing its output and return the exit code'''
99     exitcode = subprocess.call(qemu_img_args + list(args))
100     if exitcode < 0:
101         sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
102     return exitcode
103
104 def qemu_img_pipe(*args):
105     '''Run qemu-img and return its output'''
106     subp = subprocess.Popen(qemu_img_args + list(args),
107                             stdout=subprocess.PIPE,
108                             stderr=subprocess.STDOUT,
109                             universal_newlines=True)
110     exitcode = subp.wait()
111     if exitcode < 0:
112         sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
113     return subp.communicate()[0]
114
115 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=[]):
116     args = [ 'info' ]
117     if imgopts:
118         args.append('--image-opts')
119     else:
120         args += [ '-f', imgfmt ]
121     args += extra_args
122     args.append(filename)
123
124     output = qemu_img_pipe(*args)
125     if not filter_path:
126         filter_path = filename
127     log(filter_img_info(output, filter_path))
128
129 def qemu_io(*args):
130     '''Run qemu-io and return the stdout data'''
131     args = qemu_io_args + list(args)
132     subp = subprocess.Popen(args, stdout=subprocess.PIPE,
133                             stderr=subprocess.STDOUT,
134                             universal_newlines=True)
135     exitcode = subp.wait()
136     if exitcode < 0:
137         sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
138     return subp.communicate()[0]
139
140 def qemu_io_silent(*args):
141     '''Run qemu-io and return the exit code, suppressing stdout'''
142     args = qemu_io_args + list(args)
143     exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
144     if exitcode < 0:
145         sys.stderr.write('qemu-io received signal %i: %s\n' %
146                          (-exitcode, ' '.join(args)))
147     return exitcode
148
149
150 class QemuIoInteractive:
151     def __init__(self, *args):
152         self.args = qemu_io_args + list(args)
153         self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
154                                    stdout=subprocess.PIPE,
155                                    stderr=subprocess.STDOUT,
156                                    universal_newlines=True)
157         assert self._p.stdout.read(9) == 'qemu-io> '
158
159     def close(self):
160         self._p.communicate('q\n')
161
162     def _read_output(self):
163         pattern = 'qemu-io> '
164         n = len(pattern)
165         pos = 0
166         s = []
167         while pos != n:
168             c = self._p.stdout.read(1)
169             # check unexpected EOF
170             assert c != ''
171             s.append(c)
172             if c == pattern[pos]:
173                 pos += 1
174             else:
175                 pos = 0
176
177         return ''.join(s[:-n])
178
179     def cmd(self, cmd):
180         # quit command is in close(), '\n' is added automatically
181         assert '\n' not in cmd
182         cmd = cmd.strip()
183         assert cmd != 'q' and cmd != 'quit'
184         self._p.stdin.write(cmd + '\n')
185         self._p.stdin.flush()
186         return self._read_output()
187
188
189 def qemu_nbd(*args):
190     '''Run qemu-nbd in daemon mode and return the parent's exit code'''
191     return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
192
193 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
194     '''Return True if two image files are identical'''
195     return qemu_img('compare', '-f', fmt1,
196                     '-F', fmt2, img1, img2) == 0
197
198 def create_image(name, size):
199     '''Create a fully-allocated raw image with sector markers'''
200     file = open(name, 'wb')
201     i = 0
202     while i < size:
203         sector = struct.pack('>l504xl', i // 512, i // 512)
204         file.write(sector)
205         i = i + 512
206     file.close()
207
208 def image_size(img):
209     '''Return image's virtual size'''
210     r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
211     return json.loads(r)['virtual-size']
212
213 test_dir_re = re.compile(r"%s" % test_dir)
214 def filter_test_dir(msg):
215     return test_dir_re.sub("TEST_DIR", msg)
216
217 win32_re = re.compile(r"\r")
218 def filter_win32(msg):
219     return win32_re.sub("", msg)
220
221 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)")
222 def filter_qemu_io(msg):
223     msg = filter_win32(msg)
224     return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg)
225
226 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
227 def filter_chown(msg):
228     return chown_re.sub("chown UID:GID", msg)
229
230 def filter_qmp_event(event):
231     '''Filter a QMP event dict'''
232     event = dict(event)
233     if 'timestamp' in event:
234         event['timestamp']['seconds'] = 'SECS'
235         event['timestamp']['microseconds'] = 'USECS'
236     return event
237
238 def filter_testfiles(msg):
239     prefix = os.path.join(test_dir, "%s-" % (os.getpid()))
240     return msg.replace(prefix, 'TEST_DIR/PID-')
241
242 def filter_img_info(output, filename):
243     lines = []
244     for line in output.split('\n'):
245         if 'disk size' in line or 'actual-size' in line:
246             continue
247         line = line.replace(filename, 'TEST_IMG') \
248                    .replace(imgfmt, 'IMGFMT')
249         line = re.sub('iters: [0-9]+', 'iters: XXX', line)
250         line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line)
251         lines.append(line)
252     return '\n'.join(lines)
253
254 def log(msg, filters=[]):
255     for flt in filters:
256         msg = flt(msg)
257     if type(msg) is dict or type(msg) is list:
258         print(json.dumps(msg, sort_keys=True))
259     else:
260         print(msg)
261
262 class Timeout:
263     def __init__(self, seconds, errmsg = "Timeout"):
264         self.seconds = seconds
265         self.errmsg = errmsg
266     def __enter__(self):
267         signal.signal(signal.SIGALRM, self.timeout)
268         signal.setitimer(signal.ITIMER_REAL, self.seconds)
269         return self
270     def __exit__(self, type, value, traceback):
271         signal.setitimer(signal.ITIMER_REAL, 0)
272         return False
273     def timeout(self, signum, frame):
274         raise Exception(self.errmsg)
275
276
277 class FilePath(object):
278     '''An auto-generated filename that cleans itself up.
279
280     Use this context manager to generate filenames and ensure that the file
281     gets deleted::
282
283         with TestFilePath('test.img') as img_path:
284             qemu_img('create', img_path, '1G')
285         # migration_sock_path is automatically deleted
286     '''
287     def __init__(self, name):
288         filename = '{0}-{1}'.format(os.getpid(), name)
289         self.path = os.path.join(test_dir, filename)
290
291     def __enter__(self):
292         return self.path
293
294     def __exit__(self, exc_type, exc_val, exc_tb):
295         try:
296             os.remove(self.path)
297         except OSError:
298             pass
299         return False
300
301
302 def file_path_remover():
303     for path in reversed(file_path_remover.paths):
304         try:
305             os.remove(path)
306         except OSError:
307             pass
308
309
310 def file_path(*names):
311     ''' Another way to get auto-generated filename that cleans itself up.
312
313     Use is as simple as:
314
315     img_a, img_b = file_path('a.img', 'b.img')
316     sock = file_path('socket')
317     '''
318
319     if not hasattr(file_path_remover, 'paths'):
320         file_path_remover.paths = []
321         atexit.register(file_path_remover)
322
323     paths = []
324     for name in names:
325         filename = '{0}-{1}'.format(os.getpid(), name)
326         path = os.path.join(test_dir, filename)
327         file_path_remover.paths.append(path)
328         paths.append(path)
329
330     return paths[0] if len(paths) == 1 else paths
331
332 def remote_filename(path):
333     if imgproto == 'file':
334         return path
335     elif imgproto == 'ssh':
336         return "ssh://127.0.0.1%s" % (path)
337     else:
338         raise Exception("Protocol %s not supported" % (imgproto))
339
340 class VM(qtest.QEMUQtestMachine):
341     '''A QEMU VM'''
342
343     def __init__(self, path_suffix=''):
344         name = "qemu%s-%d" % (path_suffix, os.getpid())
345         super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
346                                  test_dir=test_dir,
347                                  socket_scm_helper=socket_scm_helper)
348         self._num_drives = 0
349
350     def add_object(self, opts):
351         self._args.append('-object')
352         self._args.append(opts)
353         return self
354
355     def add_device(self, opts):
356         self._args.append('-device')
357         self._args.append(opts)
358         return self
359
360     def add_drive_raw(self, opts):
361         self._args.append('-drive')
362         self._args.append(opts)
363         return self
364
365     def add_drive(self, path, opts='', interface='virtio', format=imgfmt):
366         '''Add a virtio-blk drive to the VM'''
367         options = ['if=%s' % interface,
368                    'id=drive%d' % self._num_drives]
369
370         if path is not None:
371             options.append('file=%s' % path)
372             options.append('format=%s' % format)
373             options.append('cache=%s' % cachemode)
374
375         if opts:
376             options.append(opts)
377
378         if format == 'luks' and 'key-secret' not in opts:
379             # default luks support
380             if luks_default_secret_object not in self._args:
381                 self.add_object(luks_default_secret_object)
382
383             options.append(luks_default_key_secret_opt)
384
385         self._args.append('-drive')
386         self._args.append(','.join(options))
387         self._num_drives += 1
388         return self
389
390     def add_blockdev(self, opts):
391         self._args.append('-blockdev')
392         if isinstance(opts, str):
393             self._args.append(opts)
394         else:
395             self._args.append(','.join(opts))
396         return self
397
398     def add_incoming(self, addr):
399         self._args.append('-incoming')
400         self._args.append(addr)
401         return self
402
403     def pause_drive(self, drive, event=None):
404         '''Pause drive r/w operations'''
405         if not event:
406             self.pause_drive(drive, "read_aio")
407             self.pause_drive(drive, "write_aio")
408             return
409         self.qmp('human-monitor-command',
410                     command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive))
411
412     def resume_drive(self, drive):
413         self.qmp('human-monitor-command',
414                     command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive))
415
416     def hmp_qemu_io(self, drive, cmd):
417         '''Write to a given drive using an HMP command'''
418         return self.qmp('human-monitor-command',
419                         command_line='qemu-io %s "%s"' % (drive, cmd))
420
421     def flatten_qmp_object(self, obj, output=None, basestr=''):
422         if output is None:
423             output = dict()
424         if isinstance(obj, list):
425             for i in range(len(obj)):
426                 self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.')
427         elif isinstance(obj, dict):
428             for key in obj:
429                 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
430         else:
431             output[basestr[:-1]] = obj # Strip trailing '.'
432         return output
433
434     def qmp_to_opts(self, obj):
435         obj = self.flatten_qmp_object(obj)
436         output_list = list()
437         for key in obj:
438             output_list += [key + '=' + obj[key]]
439         return ','.join(output_list)
440
441     def get_qmp_events_filtered(self, wait=True):
442         result = []
443         for ev in self.get_qmp_events(wait=wait):
444             result.append(filter_qmp_event(ev))
445         return result
446
447     def qmp_log(self, cmd, filters=[filter_testfiles], **kwargs):
448         logmsg = '{"execute": "%s", "arguments": %s}' % \
449             (cmd, json.dumps(kwargs, sort_keys=True))
450         log(logmsg, filters)
451         result = self.qmp(cmd, **kwargs)
452         log(json.dumps(result, sort_keys=True), filters)
453         return result
454
455     def run_job(self, job, auto_finalize=True, auto_dismiss=False):
456         while True:
457             for ev in self.get_qmp_events_filtered(wait=True):
458                 if ev['event'] == 'JOB_STATUS_CHANGE':
459                     status = ev['data']['status']
460                     if status == 'aborting':
461                         result = self.qmp('query-jobs')
462                         for j in result['return']:
463                             if j['id'] == job:
464                                 log('Job failed: %s' % (j['error']))
465                     elif status == 'pending' and not auto_finalize:
466                         self.qmp_log('job-finalize', id=job)
467                     elif status == 'concluded' and not auto_dismiss:
468                         self.qmp_log('job-dismiss', id=job)
469                     elif status == 'null':
470                         return
471                 else:
472                     iotests.log(ev)
473
474
475 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
476
477 class QMPTestCase(unittest.TestCase):
478     '''Abstract base class for QMP test cases'''
479
480     def dictpath(self, d, path):
481         '''Traverse a path in a nested dict'''
482         for component in path.split('/'):
483             m = index_re.match(component)
484             if m:
485                 component, idx = m.groups()
486                 idx = int(idx)
487
488             if not isinstance(d, dict) or component not in d:
489                 self.fail('failed path traversal for "%s" in "%s"' % (path, str(d)))
490             d = d[component]
491
492             if m:
493                 if not isinstance(d, list):
494                     self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d)))
495                 try:
496                     d = d[idx]
497                 except IndexError:
498                     self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d)))
499         return d
500
501     def assert_qmp_absent(self, d, path):
502         try:
503             result = self.dictpath(d, path)
504         except AssertionError:
505             return
506         self.fail('path "%s" has value "%s"' % (path, str(result)))
507
508     def assert_qmp(self, d, path, value):
509         '''Assert that the value for a specific path in a QMP dict matches'''
510         result = self.dictpath(d, path)
511         self.assertEqual(result, value, 'values not equal "%s" and "%s"' % (str(result), str(value)))
512
513     def assert_no_active_block_jobs(self):
514         result = self.vm.qmp('query-block-jobs')
515         self.assert_qmp(result, 'return', [])
516
517     def assert_has_block_node(self, node_name=None, file_name=None):
518         """Issue a query-named-block-nodes and assert node_name and/or
519         file_name is present in the result"""
520         def check_equal_or_none(a, b):
521             return a == None or b == None or a == b
522         assert node_name or file_name
523         result = self.vm.qmp('query-named-block-nodes')
524         for x in result["return"]:
525             if check_equal_or_none(x.get("node-name"), node_name) and \
526                     check_equal_or_none(x.get("file"), file_name):
527                 return
528         self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \
529                 (node_name, file_name, result))
530
531     def assert_json_filename_equal(self, json_filename, reference):
532         '''Asserts that the given filename is a json: filename and that its
533            content is equal to the given reference object'''
534         self.assertEqual(json_filename[:5], 'json:')
535         self.assertEqual(self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
536                          self.vm.flatten_qmp_object(reference))
537
538     def cancel_and_wait(self, drive='drive0', force=False, resume=False):
539         '''Cancel a block job and wait for it to finish, returning the event'''
540         result = self.vm.qmp('block-job-cancel', device=drive, force=force)
541         self.assert_qmp(result, 'return', {})
542
543         if resume:
544             self.vm.resume_drive(drive)
545
546         cancelled = False
547         result = None
548         while not cancelled:
549             for event in self.vm.get_qmp_events(wait=True):
550                 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
551                    event['event'] == 'BLOCK_JOB_CANCELLED':
552                     self.assert_qmp(event, 'data/device', drive)
553                     result = event
554                     cancelled = True
555                 elif event['event'] == 'JOB_STATUS_CHANGE':
556                     self.assert_qmp(event, 'data/id', drive)
557
558
559         self.assert_no_active_block_jobs()
560         return result
561
562     def wait_until_completed(self, drive='drive0', check_offset=True):
563         '''Wait for a block job to finish, returning the event'''
564         while True:
565             for event in self.vm.get_qmp_events(wait=True):
566                 if event['event'] == 'BLOCK_JOB_COMPLETED':
567                     self.assert_qmp(event, 'data/device', drive)
568                     self.assert_qmp_absent(event, 'data/error')
569                     if check_offset:
570                         self.assert_qmp(event, 'data/offset', event['data']['len'])
571                     self.assert_no_active_block_jobs()
572                     return event
573                 elif event['event'] == 'JOB_STATUS_CHANGE':
574                     self.assert_qmp(event, 'data/id', drive)
575
576     def wait_ready(self, drive='drive0'):
577         '''Wait until a block job BLOCK_JOB_READY event'''
578         f = {'data': {'type': 'mirror', 'device': drive } }
579         event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
580
581     def wait_ready_and_cancel(self, drive='drive0'):
582         self.wait_ready(drive=drive)
583         event = self.cancel_and_wait(drive=drive)
584         self.assertEquals(event['event'], 'BLOCK_JOB_COMPLETED')
585         self.assert_qmp(event, 'data/type', 'mirror')
586         self.assert_qmp(event, 'data/offset', event['data']['len'])
587
588     def complete_and_wait(self, drive='drive0', wait_ready=True):
589         '''Complete a block job and wait for it to finish'''
590         if wait_ready:
591             self.wait_ready(drive=drive)
592
593         result = self.vm.qmp('block-job-complete', device=drive)
594         self.assert_qmp(result, 'return', {})
595
596         event = self.wait_until_completed(drive=drive)
597         self.assert_qmp(event, 'data/type', 'mirror')
598
599     def pause_wait(self, job_id='job0'):
600         with Timeout(1, "Timeout waiting for job to pause"):
601             while True:
602                 result = self.vm.qmp('query-block-jobs')
603                 found = False
604                 for job in result['return']:
605                     if job['device'] == job_id:
606                         found = True
607                         if job['paused'] == True and job['busy'] == False:
608                             return job
609                         break
610                 assert found
611
612     def pause_job(self, job_id='job0', wait=True):
613         result = self.vm.qmp('block-job-pause', device=job_id)
614         self.assert_qmp(result, 'return', {})
615         if wait:
616             return self.pause_wait(job_id)
617         return result
618
619
620 def notrun(reason):
621     '''Skip this test suite'''
622     # Each test in qemu-iotests has a number ("seq")
623     seq = os.path.basename(sys.argv[0])
624
625     open('%s/%s.notrun' % (output_dir, seq), 'wb').write(reason + '\n')
626     print('%s not run: %s' % (seq, reason))
627     sys.exit(0)
628
629 def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
630     assert not (supported_fmts and unsupported_fmts)
631
632     if 'generic' in supported_fmts and \
633             os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
634         # similar to
635         #   _supported_fmt generic
636         # for bash tests
637         return
638
639     not_sup = supported_fmts and (imgfmt not in supported_fmts)
640     if not_sup or (imgfmt in unsupported_fmts):
641         notrun('not suitable for this image format: %s' % imgfmt)
642
643 def verify_protocol(supported=[], unsupported=[]):
644     assert not (supported and unsupported)
645
646     if 'generic' in supported:
647         return
648
649     not_sup = supported and (imgproto not in supported)
650     if not_sup or (imgproto in unsupported):
651         notrun('not suitable for this protocol: %s' % imgproto)
652
653 def verify_platform(supported_oses=['linux']):
654     if True not in [sys.platform.startswith(x) for x in supported_oses]:
655         notrun('not suitable for this OS: %s' % sys.platform)
656
657 def verify_cache_mode(supported_cache_modes=[]):
658     if supported_cache_modes and (cachemode not in supported_cache_modes):
659         notrun('not suitable for this cache mode: %s' % cachemode)
660
661 def supports_quorum():
662     return 'quorum' in qemu_img_pipe('--help')
663
664 def verify_quorum():
665     '''Skip test suite if quorum support is not available'''
666     if not supports_quorum():
667         notrun('quorum support missing')
668
669 def main(supported_fmts=[], supported_oses=['linux'], supported_cache_modes=[],
670          unsupported_fmts=[]):
671     '''Run tests'''
672
673     global debug
674
675     # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
676     # indicate that we're not being run via "check". There may be
677     # other things set up by "check" that individual test cases rely
678     # on.
679     if test_dir is None or qemu_default_machine is None:
680         sys.stderr.write('Please run this test via the "check" script\n')
681         sys.exit(os.EX_USAGE)
682
683     debug = '-d' in sys.argv
684     verbosity = 1
685     verify_image_format(supported_fmts, unsupported_fmts)
686     verify_platform(supported_oses)
687     verify_cache_mode(supported_cache_modes)
688
689     if debug:
690         output = sys.stdout
691         verbosity = 2
692         sys.argv.remove('-d')
693     else:
694         # We need to filter out the time taken from the output so that
695         # qemu-iotest can reliably diff the results against master output.
696         if sys.version_info.major >= 3:
697             output = io.StringIO()
698         else:
699             # io.StringIO is for unicode strings, which is not what
700             # 2.x's test runner emits.
701             output = io.BytesIO()
702
703     logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
704
705     class MyTestRunner(unittest.TextTestRunner):
706         def __init__(self, stream=output, descriptions=True, verbosity=verbosity):
707             unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity)
708
709     # unittest.main() will use sys.exit() so expect a SystemExit exception
710     try:
711         unittest.main(testRunner=MyTestRunner)
712     finally:
713         if not debug:
714             sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', output.getvalue()))
This page took 0.065158 seconds and 4 git commands to generate.