]>
Commit | Line | Data |
---|---|---|
11ae93ee SG |
1 | #!/usr/bin/env python |
2 | # SPDX-License-Identifier: GPL-2.0+ | |
3 | # | |
4 | # Modified by: Corey Goldberg, 2013 | |
5 | # | |
6 | # Original code from: | |
7 | # Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013) | |
8 | # Copyright (C) 2005-2011 Canonical Ltd | |
9 | ||
10 | """Python testtools extension for running unittest suites concurrently. | |
11 | ||
12 | The `testtools` project provides a ConcurrentTestSuite class, but does | |
13 | not provide a `make_tests` implementation needed to use it. | |
14 | ||
15 | This allows you to parallelize a test run across a configurable number | |
16 | of worker processes. While this can speed up CPU-bound test runs, it is | |
17 | mainly useful for IO-bound tests that spend most of their time waiting for | |
18 | data to arrive from someplace else and can benefit from cocncurrency. | |
19 | ||
20 | Unix only. | |
21 | """ | |
22 | ||
23 | import os | |
24 | import sys | |
25 | import traceback | |
26 | import unittest | |
27 | from itertools import cycle | |
28 | from multiprocessing import cpu_count | |
29 | ||
30 | from subunit import ProtocolTestCase, TestProtocolClient | |
31 | from subunit.test_results import AutoTimingTestResultDecorator | |
32 | ||
33 | from testtools import ConcurrentTestSuite, iterate_tests | |
34 | ||
35 | ||
36 | _all__ = [ | |
37 | 'ConcurrentTestSuite', | |
38 | 'fork_for_tests', | |
39 | 'partition_tests', | |
40 | ] | |
41 | ||
42 | ||
43 | CPU_COUNT = cpu_count() | |
44 | ||
45 | ||
46 | def fork_for_tests(concurrency_num=CPU_COUNT): | |
47 | """Implementation of `make_tests` used to construct `ConcurrentTestSuite`. | |
48 | ||
49 | :param concurrency_num: number of processes to use. | |
50 | """ | |
51 | def do_fork(suite): | |
52 | """Take suite and start up multiple runners by forking (Unix only). | |
53 | ||
54 | :param suite: TestSuite object. | |
55 | ||
56 | :return: An iterable of TestCase-like objects which can each have | |
57 | run(result) called on them to feed tests to result. | |
58 | """ | |
59 | result = [] | |
60 | test_blocks = partition_tests(suite, concurrency_num) | |
61 | # Clear the tests from the original suite so it doesn't keep them alive | |
62 | suite._tests[:] = [] | |
63 | for process_tests in test_blocks: | |
64 | process_suite = unittest.TestSuite(process_tests) | |
65 | # Also clear each split list so new suite has only reference | |
66 | process_tests[:] = [] | |
67 | c2pread, c2pwrite = os.pipe() | |
68 | pid = os.fork() | |
69 | if pid == 0: | |
70 | try: | |
71 | stream = os.fdopen(c2pwrite, 'wb', 1) | |
72 | os.close(c2pread) | |
73 | # Leave stderr and stdout open so we can see test noise | |
74 | # Close stdin so that the child goes away if it decides to | |
75 | # read from stdin (otherwise its a roulette to see what | |
76 | # child actually gets keystrokes for pdb etc). | |
77 | sys.stdin.close() | |
78 | subunit_result = AutoTimingTestResultDecorator( | |
79 | TestProtocolClient(stream) | |
80 | ) | |
81 | process_suite.run(subunit_result) | |
82 | except: | |
83 | # Try and report traceback on stream, but exit with error | |
84 | # even if stream couldn't be created or something else | |
85 | # goes wrong. The traceback is formatted to a string and | |
86 | # written in one go to avoid interleaving lines from | |
87 | # multiple failing children. | |
88 | try: | |
89 | stream.write(traceback.format_exc()) | |
90 | finally: | |
91 | os._exit(1) | |
92 | os._exit(0) | |
93 | else: | |
94 | os.close(c2pwrite) | |
95 | stream = os.fdopen(c2pread, 'rb', 1) | |
96 | test = ProtocolTestCase(stream) | |
97 | result.append(test) | |
98 | return result | |
99 | return do_fork | |
100 | ||
101 | ||
102 | def partition_tests(suite, count): | |
103 | """Partition suite into count lists of tests.""" | |
104 | # This just assigns tests in a round-robin fashion. On one hand this | |
105 | # splits up blocks of related tests that might run faster if they shared | |
106 | # resources, but on the other it avoids assigning blocks of slow tests to | |
107 | # just one partition. So the slowest partition shouldn't be much slower | |
108 | # than the fastest. | |
109 | partitions = [list() for _ in range(count)] | |
110 | tests = iterate_tests(suite) | |
111 | for partition, test in zip(cycle(partitions), tests): | |
112 | partition.append(test) | |
113 | return partitions | |
114 | ||
115 | ||
116 | if __name__ == '__main__': | |
117 | import time | |
118 | ||
119 | class SampleTestCase(unittest.TestCase): | |
120 | """Dummy tests that sleep for demo.""" | |
121 | ||
122 | def test_me_1(self): | |
123 | time.sleep(0.5) | |
124 | ||
125 | def test_me_2(self): | |
126 | time.sleep(0.5) | |
127 | ||
128 | def test_me_3(self): | |
129 | time.sleep(0.5) | |
130 | ||
131 | def test_me_4(self): | |
132 | time.sleep(0.5) | |
133 | ||
134 | # Load tests from SampleTestCase defined above | |
135 | suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase) | |
136 | runner = unittest.TextTestRunner() | |
137 | ||
138 | # Run tests sequentially | |
139 | runner.run(suite) | |
140 | ||
141 | # Run same tests across 4 processes | |
142 | suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase) | |
143 | concurrent_suite = ConcurrentTestSuite(suite, fork_for_tests(4)) | |
144 | runner.run(concurrent_suite) |