]> Git Repo - qemu.git/blob - tests/qemu-iotests/iotests.py
Merge remote-tracking branch 'remotes/xtensa/tags/20190228-xtensa' into staging
[qemu.git] / tests / qemu-iotests / iotests.py
1 from __future__ import print_function
2 # Common utilities and Python wrappers for qemu-iotests
3 #
4 # Copyright (C) 2012 IBM Corp.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 import errno
21 import os
22 import re
23 import subprocess
24 import string
25 import unittest
26 import sys
27 import struct
28 import json
29 import signal
30 import logging
31 import atexit
32 import io
33 from collections import OrderedDict
34
35 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts'))
36 import qtest
37
38
39 # This will not work if arguments contain spaces but is necessary if we
40 # want to support the override options that ./check supports.
41 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
42 if os.environ.get('QEMU_IMG_OPTIONS'):
43     qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
44
45 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
46 if os.environ.get('QEMU_IO_OPTIONS'):
47     qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
48
49 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
50 if os.environ.get('QEMU_NBD_OPTIONS'):
51     qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
52
53 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
54 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
55
56 imgfmt = os.environ.get('IMGFMT', 'raw')
57 imgproto = os.environ.get('IMGPROTO', 'file')
58 test_dir = os.environ.get('TEST_DIR')
59 output_dir = os.environ.get('OUTPUT_DIR', '.')
60 cachemode = os.environ.get('CACHEMODE')
61 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
62
63 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
64 debug = False
65
66 luks_default_secret_object = 'secret,id=keysec0,data=' + \
67                              os.environ.get('IMGKEYSECRET', '')
68 luks_default_key_secret_opt = 'key-secret=keysec0'
69
70
71 def qemu_img(*args):
72     '''Run qemu-img and return the exit code'''
73     devnull = open('/dev/null', 'r+')
74     exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull)
75     if exitcode < 0:
76         sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
77     return exitcode
78
79 def ordered_qmp(qmsg, conv_keys=True):
80     # Dictionaries are not ordered prior to 3.6, therefore:
81     if isinstance(qmsg, list):
82         return [ordered_qmp(atom) for atom in qmsg]
83     if isinstance(qmsg, dict):
84         od = OrderedDict()
85         for k, v in sorted(qmsg.items()):
86             if conv_keys:
87                 k = k.replace('_', '-')
88             od[k] = ordered_qmp(v, conv_keys=False)
89         return od
90     return qmsg
91
92 def qemu_img_create(*args):
93     args = list(args)
94
95     # default luks support
96     if '-f' in args and args[args.index('-f') + 1] == 'luks':
97         if '-o' in args:
98             i = args.index('-o')
99             if 'key-secret' not in args[i + 1]:
100                 args[i + 1].append(luks_default_key_secret_opt)
101                 args.insert(i + 2, '--object')
102                 args.insert(i + 3, luks_default_secret_object)
103         else:
104             args = ['-o', luks_default_key_secret_opt,
105                     '--object', luks_default_secret_object] + args
106
107     args.insert(0, 'create')
108
109     return qemu_img(*args)
110
111 def qemu_img_verbose(*args):
112     '''Run qemu-img without suppressing its output and return the exit code'''
113     exitcode = subprocess.call(qemu_img_args + list(args))
114     if exitcode < 0:
115         sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
116     return exitcode
117
118 def qemu_img_pipe(*args):
119     '''Run qemu-img and return its output'''
120     subp = subprocess.Popen(qemu_img_args + list(args),
121                             stdout=subprocess.PIPE,
122                             stderr=subprocess.STDOUT,
123                             universal_newlines=True)
124     exitcode = subp.wait()
125     if exitcode < 0:
126         sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
127     return subp.communicate()[0]
128
129 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=[]):
130     args = [ 'info' ]
131     if imgopts:
132         args.append('--image-opts')
133     else:
134         args += [ '-f', imgfmt ]
135     args += extra_args
136     args.append(filename)
137
138     output = qemu_img_pipe(*args)
139     if not filter_path:
140         filter_path = filename
141     log(filter_img_info(output, filter_path))
142
143 def qemu_io(*args):
144     '''Run qemu-io and return the stdout data'''
145     args = qemu_io_args + list(args)
146     subp = subprocess.Popen(args, stdout=subprocess.PIPE,
147                             stderr=subprocess.STDOUT,
148                             universal_newlines=True)
149     exitcode = subp.wait()
150     if exitcode < 0:
151         sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
152     return subp.communicate()[0]
153
154 def qemu_io_silent(*args):
155     '''Run qemu-io and return the exit code, suppressing stdout'''
156     args = qemu_io_args + list(args)
157     exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
158     if exitcode < 0:
159         sys.stderr.write('qemu-io received signal %i: %s\n' %
160                          (-exitcode, ' '.join(args)))
161     return exitcode
162
163
164 class QemuIoInteractive:
165     def __init__(self, *args):
166         self.args = qemu_io_args + list(args)
167         self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
168                                    stdout=subprocess.PIPE,
169                                    stderr=subprocess.STDOUT,
170                                    universal_newlines=True)
171         assert self._p.stdout.read(9) == 'qemu-io> '
172
173     def close(self):
174         self._p.communicate('q\n')
175
176     def _read_output(self):
177         pattern = 'qemu-io> '
178         n = len(pattern)
179         pos = 0
180         s = []
181         while pos != n:
182             c = self._p.stdout.read(1)
183             # check unexpected EOF
184             assert c != ''
185             s.append(c)
186             if c == pattern[pos]:
187                 pos += 1
188             else:
189                 pos = 0
190
191         return ''.join(s[:-n])
192
193     def cmd(self, cmd):
194         # quit command is in close(), '\n' is added automatically
195         assert '\n' not in cmd
196         cmd = cmd.strip()
197         assert cmd != 'q' and cmd != 'quit'
198         self._p.stdin.write(cmd + '\n')
199         self._p.stdin.flush()
200         return self._read_output()
201
202
203 def qemu_nbd(*args):
204     '''Run qemu-nbd in daemon mode and return the parent's exit code'''
205     return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
206
207 def qemu_nbd_pipe(*args):
208     '''Run qemu-nbd in daemon mode and return both the parent's exit code
209        and its output'''
210     subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
211                             stdout=subprocess.PIPE,
212                             stderr=subprocess.STDOUT,
213                             universal_newlines=True)
214     exitcode = subp.wait()
215     if exitcode < 0:
216         sys.stderr.write('qemu-nbd received signal %i: %s\n' %
217                          (-exitcode,
218                           ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
219     return exitcode, subp.communicate()[0]
220
221 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
222     '''Return True if two image files are identical'''
223     return qemu_img('compare', '-f', fmt1,
224                     '-F', fmt2, img1, img2) == 0
225
226 def create_image(name, size):
227     '''Create a fully-allocated raw image with sector markers'''
228     file = open(name, 'wb')
229     i = 0
230     while i < size:
231         sector = struct.pack('>l504xl', i // 512, i // 512)
232         file.write(sector)
233         i = i + 512
234     file.close()
235
236 def image_size(img):
237     '''Return image's virtual size'''
238     r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
239     return json.loads(r)['virtual-size']
240
241 def is_str(val):
242     if sys.version_info.major >= 3:
243         return isinstance(val, str)
244     else:
245         return isinstance(val, str) or isinstance(val, unicode)
246
247 test_dir_re = re.compile(r"%s" % test_dir)
248 def filter_test_dir(msg):
249     return test_dir_re.sub("TEST_DIR", msg)
250
251 win32_re = re.compile(r"\r")
252 def filter_win32(msg):
253     return win32_re.sub("", msg)
254
255 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)")
256 def filter_qemu_io(msg):
257     msg = filter_win32(msg)
258     return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg)
259
260 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
261 def filter_chown(msg):
262     return chown_re.sub("chown UID:GID", msg)
263
264 def filter_qmp_event(event):
265     '''Filter a QMP event dict'''
266     event = dict(event)
267     if 'timestamp' in event:
268         event['timestamp']['seconds'] = 'SECS'
269         event['timestamp']['microseconds'] = 'USECS'
270     return event
271
272 def filter_qmp(qmsg, filter_fn):
273     '''Given a string filter, filter a QMP object's values.
274     filter_fn takes a (key, value) pair.'''
275     # Iterate through either lists or dicts;
276     if isinstance(qmsg, list):
277         items = enumerate(qmsg)
278     else:
279         items = qmsg.items()
280
281     for k, v in items:
282         if isinstance(v, list) or isinstance(v, dict):
283             qmsg[k] = filter_qmp(v, filter_fn)
284         else:
285             qmsg[k] = filter_fn(k, v)
286     return qmsg
287
288 def filter_testfiles(msg):
289     prefix = os.path.join(test_dir, "%s-" % (os.getpid()))
290     return msg.replace(prefix, 'TEST_DIR/PID-')
291
292 def filter_qmp_testfiles(qmsg):
293     def _filter(key, value):
294         if is_str(value):
295             return filter_testfiles(value)
296         return value
297     return filter_qmp(qmsg, _filter)
298
299 def filter_generated_node_ids(msg):
300     return re.sub("#block[0-9]+", "NODE_NAME", msg)
301
302 def filter_img_info(output, filename):
303     lines = []
304     for line in output.split('\n'):
305         if 'disk size' in line or 'actual-size' in line:
306             continue
307         line = line.replace(filename, 'TEST_IMG') \
308                    .replace(imgfmt, 'IMGFMT')
309         line = re.sub('iters: [0-9]+', 'iters: XXX', line)
310         line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line)
311         line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
312         lines.append(line)
313     return '\n'.join(lines)
314
315 def filter_imgfmt(msg):
316     return msg.replace(imgfmt, 'IMGFMT')
317
318 def filter_qmp_imgfmt(qmsg):
319     def _filter(key, value):
320         if is_str(value):
321             return filter_imgfmt(value)
322         return value
323     return filter_qmp(qmsg, _filter)
324
325 def log(msg, filters=[], indent=None):
326     '''Logs either a string message or a JSON serializable message (like QMP).
327     If indent is provided, JSON serializable messages are pretty-printed.'''
328     for flt in filters:
329         msg = flt(msg)
330     if isinstance(msg, dict) or isinstance(msg, list):
331         # Python < 3.4 needs to know not to add whitespace when pretty-printing:
332         separators = (', ', ': ') if indent is None else (',', ': ')
333         # Don't sort if it's already sorted
334         do_sort = not isinstance(msg, OrderedDict)
335         print(json.dumps(msg, sort_keys=do_sort,
336                          indent=indent, separators=separators))
337     else:
338         print(msg)
339
340 class Timeout:
341     def __init__(self, seconds, errmsg = "Timeout"):
342         self.seconds = seconds
343         self.errmsg = errmsg
344     def __enter__(self):
345         signal.signal(signal.SIGALRM, self.timeout)
346         signal.setitimer(signal.ITIMER_REAL, self.seconds)
347         return self
348     def __exit__(self, type, value, traceback):
349         signal.setitimer(signal.ITIMER_REAL, 0)
350         return False
351     def timeout(self, signum, frame):
352         raise Exception(self.errmsg)
353
354
355 class FilePath(object):
356     '''An auto-generated filename that cleans itself up.
357
358     Use this context manager to generate filenames and ensure that the file
359     gets deleted::
360
361         with TestFilePath('test.img') as img_path:
362             qemu_img('create', img_path, '1G')
363         # migration_sock_path is automatically deleted
364     '''
365     def __init__(self, name):
366         filename = '{0}-{1}'.format(os.getpid(), name)
367         self.path = os.path.join(test_dir, filename)
368
369     def __enter__(self):
370         return self.path
371
372     def __exit__(self, exc_type, exc_val, exc_tb):
373         try:
374             os.remove(self.path)
375         except OSError:
376             pass
377         return False
378
379
380 def file_path_remover():
381     for path in reversed(file_path_remover.paths):
382         try:
383             os.remove(path)
384         except OSError:
385             pass
386
387
388 def file_path(*names):
389     ''' Another way to get auto-generated filename that cleans itself up.
390
391     Use is as simple as:
392
393     img_a, img_b = file_path('a.img', 'b.img')
394     sock = file_path('socket')
395     '''
396
397     if not hasattr(file_path_remover, 'paths'):
398         file_path_remover.paths = []
399         atexit.register(file_path_remover)
400
401     paths = []
402     for name in names:
403         filename = '{0}-{1}'.format(os.getpid(), name)
404         path = os.path.join(test_dir, filename)
405         file_path_remover.paths.append(path)
406         paths.append(path)
407
408     return paths[0] if len(paths) == 1 else paths
409
410 def remote_filename(path):
411     if imgproto == 'file':
412         return path
413     elif imgproto == 'ssh':
414         return "ssh://127.0.0.1%s" % (path)
415     else:
416         raise Exception("Protocol %s not supported" % (imgproto))
417
418 class VM(qtest.QEMUQtestMachine):
419     '''A QEMU VM'''
420
421     def __init__(self, path_suffix=''):
422         name = "qemu%s-%d" % (path_suffix, os.getpid())
423         super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
424                                  test_dir=test_dir,
425                                  socket_scm_helper=socket_scm_helper)
426         self._num_drives = 0
427
428     def add_object(self, opts):
429         self._args.append('-object')
430         self._args.append(opts)
431         return self
432
433     def add_device(self, opts):
434         self._args.append('-device')
435         self._args.append(opts)
436         return self
437
438     def add_drive_raw(self, opts):
439         self._args.append('-drive')
440         self._args.append(opts)
441         return self
442
443     def add_drive(self, path, opts='', interface='virtio', format=imgfmt):
444         '''Add a virtio-blk drive to the VM'''
445         options = ['if=%s' % interface,
446                    'id=drive%d' % self._num_drives]
447
448         if path is not None:
449             options.append('file=%s' % path)
450             options.append('format=%s' % format)
451             options.append('cache=%s' % cachemode)
452
453         if opts:
454             options.append(opts)
455
456         if format == 'luks' and 'key-secret' not in opts:
457             # default luks support
458             if luks_default_secret_object not in self._args:
459                 self.add_object(luks_default_secret_object)
460
461             options.append(luks_default_key_secret_opt)
462
463         self._args.append('-drive')
464         self._args.append(','.join(options))
465         self._num_drives += 1
466         return self
467
468     def add_blockdev(self, opts):
469         self._args.append('-blockdev')
470         if isinstance(opts, str):
471             self._args.append(opts)
472         else:
473             self._args.append(','.join(opts))
474         return self
475
476     def add_incoming(self, addr):
477         self._args.append('-incoming')
478         self._args.append(addr)
479         return self
480
481     def pause_drive(self, drive, event=None):
482         '''Pause drive r/w operations'''
483         if not event:
484             self.pause_drive(drive, "read_aio")
485             self.pause_drive(drive, "write_aio")
486             return
487         self.qmp('human-monitor-command',
488                     command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive))
489
490     def resume_drive(self, drive):
491         self.qmp('human-monitor-command',
492                     command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive))
493
494     def hmp_qemu_io(self, drive, cmd):
495         '''Write to a given drive using an HMP command'''
496         return self.qmp('human-monitor-command',
497                         command_line='qemu-io %s "%s"' % (drive, cmd))
498
499     def flatten_qmp_object(self, obj, output=None, basestr=''):
500         if output is None:
501             output = dict()
502         if isinstance(obj, list):
503             for i in range(len(obj)):
504                 self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.')
505         elif isinstance(obj, dict):
506             for key in obj:
507                 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
508         else:
509             output[basestr[:-1]] = obj # Strip trailing '.'
510         return output
511
512     def qmp_to_opts(self, obj):
513         obj = self.flatten_qmp_object(obj)
514         output_list = list()
515         for key in obj:
516             output_list += [key + '=' + obj[key]]
517         return ','.join(output_list)
518
519     def get_qmp_events_filtered(self, wait=True):
520         result = []
521         for ev in self.get_qmp_events(wait=wait):
522             result.append(filter_qmp_event(ev))
523         return result
524
525     def qmp_log(self, cmd, filters=[], indent=None, **kwargs):
526         full_cmd = OrderedDict((
527             ("execute", cmd),
528             ("arguments", ordered_qmp(kwargs))
529         ))
530         log(full_cmd, filters, indent=indent)
531         result = self.qmp(cmd, **kwargs)
532         log(result, filters, indent=indent)
533         return result
534
535     # Returns None on success, and an error string on failure
536     def run_job(self, job, auto_finalize=True, auto_dismiss=False):
537         error = None
538         while True:
539             for ev in self.get_qmp_events_filtered(wait=True):
540                 if ev['event'] == 'JOB_STATUS_CHANGE':
541                     status = ev['data']['status']
542                     if status == 'aborting':
543                         result = self.qmp('query-jobs')
544                         for j in result['return']:
545                             if j['id'] == job:
546                                 error = j['error']
547                                 log('Job failed: %s' % (j['error']))
548                     elif status == 'pending' and not auto_finalize:
549                         self.qmp_log('job-finalize', id=job)
550                     elif status == 'concluded' and not auto_dismiss:
551                         self.qmp_log('job-dismiss', id=job)
552                     elif status == 'null':
553                         return error
554                 else:
555                     iotests.log(ev)
556
557     def node_info(self, node_name):
558         nodes = self.qmp('query-named-block-nodes')
559         for x in nodes['return']:
560             if x['node-name'] == node_name:
561                 return x
562         return None
563
564
565 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
566
567 class QMPTestCase(unittest.TestCase):
568     '''Abstract base class for QMP test cases'''
569
570     def dictpath(self, d, path):
571         '''Traverse a path in a nested dict'''
572         for component in path.split('/'):
573             m = index_re.match(component)
574             if m:
575                 component, idx = m.groups()
576                 idx = int(idx)
577
578             if not isinstance(d, dict) or component not in d:
579                 self.fail('failed path traversal for "%s" in "%s"' % (path, str(d)))
580             d = d[component]
581
582             if m:
583                 if not isinstance(d, list):
584                     self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d)))
585                 try:
586                     d = d[idx]
587                 except IndexError:
588                     self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d)))
589         return d
590
591     def assert_qmp_absent(self, d, path):
592         try:
593             result = self.dictpath(d, path)
594         except AssertionError:
595             return
596         self.fail('path "%s" has value "%s"' % (path, str(result)))
597
598     def assert_qmp(self, d, path, value):
599         '''Assert that the value for a specific path in a QMP dict matches'''
600         result = self.dictpath(d, path)
601         self.assertEqual(result, value, 'values not equal "%s" and "%s"' % (str(result), str(value)))
602
603     def assert_no_active_block_jobs(self):
604         result = self.vm.qmp('query-block-jobs')
605         self.assert_qmp(result, 'return', [])
606
607     def assert_has_block_node(self, node_name=None, file_name=None):
608         """Issue a query-named-block-nodes and assert node_name and/or
609         file_name is present in the result"""
610         def check_equal_or_none(a, b):
611             return a == None or b == None or a == b
612         assert node_name or file_name
613         result = self.vm.qmp('query-named-block-nodes')
614         for x in result["return"]:
615             if check_equal_or_none(x.get("node-name"), node_name) and \
616                     check_equal_or_none(x.get("file"), file_name):
617                 return
618         self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \
619                 (node_name, file_name, result))
620
621     def assert_json_filename_equal(self, json_filename, reference):
622         '''Asserts that the given filename is a json: filename and that its
623            content is equal to the given reference object'''
624         self.assertEqual(json_filename[:5], 'json:')
625         self.assertEqual(self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
626                          self.vm.flatten_qmp_object(reference))
627
628     def cancel_and_wait(self, drive='drive0', force=False, resume=False):
629         '''Cancel a block job and wait for it to finish, returning the event'''
630         result = self.vm.qmp('block-job-cancel', device=drive, force=force)
631         self.assert_qmp(result, 'return', {})
632
633         if resume:
634             self.vm.resume_drive(drive)
635
636         cancelled = False
637         result = None
638         while not cancelled:
639             for event in self.vm.get_qmp_events(wait=True):
640                 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
641                    event['event'] == 'BLOCK_JOB_CANCELLED':
642                     self.assert_qmp(event, 'data/device', drive)
643                     result = event
644                     cancelled = True
645                 elif event['event'] == 'JOB_STATUS_CHANGE':
646                     self.assert_qmp(event, 'data/id', drive)
647
648
649         self.assert_no_active_block_jobs()
650         return result
651
652     def wait_until_completed(self, drive='drive0', check_offset=True):
653         '''Wait for a block job to finish, returning the event'''
654         while True:
655             for event in self.vm.get_qmp_events(wait=True):
656                 if event['event'] == 'BLOCK_JOB_COMPLETED':
657                     self.assert_qmp(event, 'data/device', drive)
658                     self.assert_qmp_absent(event, 'data/error')
659                     if check_offset:
660                         self.assert_qmp(event, 'data/offset', event['data']['len'])
661                     self.assert_no_active_block_jobs()
662                     return event
663                 elif event['event'] == 'JOB_STATUS_CHANGE':
664                     self.assert_qmp(event, 'data/id', drive)
665
666     def wait_ready(self, drive='drive0'):
667         '''Wait until a block job BLOCK_JOB_READY event'''
668         f = {'data': {'type': 'mirror', 'device': drive } }
669         event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
670
671     def wait_ready_and_cancel(self, drive='drive0'):
672         self.wait_ready(drive=drive)
673         event = self.cancel_and_wait(drive=drive)
674         self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
675         self.assert_qmp(event, 'data/type', 'mirror')
676         self.assert_qmp(event, 'data/offset', event['data']['len'])
677
678     def complete_and_wait(self, drive='drive0', wait_ready=True):
679         '''Complete a block job and wait for it to finish'''
680         if wait_ready:
681             self.wait_ready(drive=drive)
682
683         result = self.vm.qmp('block-job-complete', device=drive)
684         self.assert_qmp(result, 'return', {})
685
686         event = self.wait_until_completed(drive=drive)
687         self.assert_qmp(event, 'data/type', 'mirror')
688
689     def pause_wait(self, job_id='job0'):
690         with Timeout(1, "Timeout waiting for job to pause"):
691             while True:
692                 result = self.vm.qmp('query-block-jobs')
693                 found = False
694                 for job in result['return']:
695                     if job['device'] == job_id:
696                         found = True
697                         if job['paused'] == True and job['busy'] == False:
698                             return job
699                         break
700                 assert found
701
702     def pause_job(self, job_id='job0', wait=True):
703         result = self.vm.qmp('block-job-pause', device=job_id)
704         self.assert_qmp(result, 'return', {})
705         if wait:
706             return self.pause_wait(job_id)
707         return result
708
709
710 def notrun(reason):
711     '''Skip this test suite'''
712     # Each test in qemu-iotests has a number ("seq")
713     seq = os.path.basename(sys.argv[0])
714
715     open('%s/%s.notrun' % (output_dir, seq), 'wb').write(reason + '\n')
716     print('%s not run: %s' % (seq, reason))
717     sys.exit(0)
718
719 def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
720     assert not (supported_fmts and unsupported_fmts)
721
722     if 'generic' in supported_fmts and \
723             os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
724         # similar to
725         #   _supported_fmt generic
726         # for bash tests
727         return
728
729     not_sup = supported_fmts and (imgfmt not in supported_fmts)
730     if not_sup or (imgfmt in unsupported_fmts):
731         notrun('not suitable for this image format: %s' % imgfmt)
732
733 def verify_protocol(supported=[], unsupported=[]):
734     assert not (supported and unsupported)
735
736     if 'generic' in supported:
737         return
738
739     not_sup = supported and (imgproto not in supported)
740     if not_sup or (imgproto in unsupported):
741         notrun('not suitable for this protocol: %s' % imgproto)
742
743 def verify_platform(supported_oses=['linux']):
744     if True not in [sys.platform.startswith(x) for x in supported_oses]:
745         notrun('not suitable for this OS: %s' % sys.platform)
746
747 def verify_cache_mode(supported_cache_modes=[]):
748     if supported_cache_modes and (cachemode not in supported_cache_modes):
749         notrun('not suitable for this cache mode: %s' % cachemode)
750
751 def supports_quorum():
752     return 'quorum' in qemu_img_pipe('--help')
753
754 def verify_quorum():
755     '''Skip test suite if quorum support is not available'''
756     if not supports_quorum():
757         notrun('quorum support missing')
758
759 def main(supported_fmts=[], supported_oses=['linux'], supported_cache_modes=[],
760          unsupported_fmts=[]):
761     '''Run tests'''
762
763     global debug
764
765     # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
766     # indicate that we're not being run via "check". There may be
767     # other things set up by "check" that individual test cases rely
768     # on.
769     if test_dir is None or qemu_default_machine is None:
770         sys.stderr.write('Please run this test via the "check" script\n')
771         sys.exit(os.EX_USAGE)
772
773     debug = '-d' in sys.argv
774     verbosity = 1
775     verify_image_format(supported_fmts, unsupported_fmts)
776     verify_platform(supported_oses)
777     verify_cache_mode(supported_cache_modes)
778
779     if debug:
780         output = sys.stdout
781         verbosity = 2
782         sys.argv.remove('-d')
783     else:
784         # We need to filter out the time taken from the output so that
785         # qemu-iotest can reliably diff the results against master output.
786         if sys.version_info.major >= 3:
787             output = io.StringIO()
788         else:
789             # io.StringIO is for unicode strings, which is not what
790             # 2.x's test runner emits.
791             output = io.BytesIO()
792
793     logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
794
795     class MyTestRunner(unittest.TextTestRunner):
796         def __init__(self, stream=output, descriptions=True, verbosity=verbosity):
797             unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity)
798
799     # unittest.main() will use sys.exit() so expect a SystemExit exception
800     try:
801         unittest.main(testRunner=MyTestRunner)
802     finally:
803         if not debug:
804             sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', output.getvalue()))
This page took 0.070446 seconds and 4 git commands to generate.