1#!/usr/bin/env python
2# SPDX-License-Identifier: GPL-2.0+
3#
4# Modified by: Corey Goldberg, 2013
5#
6# Original code from:
7#   Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013)
8#   Copyright (C) 2005-2011 Canonical Ltd
9
10"""Python testtools extension for running unittest suites concurrently.
11
12The `testtools` project provides a ConcurrentTestSuite class, but does
13not provide a `make_tests` implementation needed to use it.
14
15This allows you to parallelize a test run across a configurable number
16of worker processes. While this can speed up CPU-bound test runs, it is
17mainly useful for IO-bound tests that spend most of their time waiting for
18data to arrive from someplace else and can benefit from cocncurrency.
19
20Unix only.
21"""
22
23import os
24import sys
25import traceback
26import unittest
27from itertools import cycle
28from multiprocessing import cpu_count
29
30from subunit import ProtocolTestCase, TestProtocolClient
31from subunit.test_results import AutoTimingTestResultDecorator
32
33from testtools import ConcurrentTestSuite, iterate_tests
34
35
36_all__ = [
37    'ConcurrentTestSuite',
38    'fork_for_tests',
39    'partition_tests',
40]
41
42
43CPU_COUNT = cpu_count()
44
45
46def fork_for_tests(concurrency_num=CPU_COUNT):
47    """Implementation of `make_tests` used to construct `ConcurrentTestSuite`.
48
49    :param concurrency_num: number of processes to use.
50    """
51    def do_fork(suite):
52        """Take suite and start up multiple runners by forking (Unix only).
53
54        :param suite: TestSuite object.
55
56        :return: An iterable of TestCase-like objects which can each have
57        run(result) called on them to feed tests to result.
58        """
59        result = []
60        test_blocks = partition_tests(suite, concurrency_num)
61        # Clear the tests from the original suite so it doesn't keep them alive
62        suite._tests[:] = []
63        for process_tests in test_blocks:
64            process_suite = unittest.TestSuite(process_tests)
65            # Also clear each split list so new suite has only reference
66            process_tests[:] = []
67            c2pread, c2pwrite = os.pipe()
68            pid = os.fork()
69            if pid == 0:
70                try:
71                    stream = os.fdopen(c2pwrite, 'wb', 1)
72                    os.close(c2pread)
73                    # Leave stderr and stdout open so we can see test noise
74                    # Close stdin so that the child goes away if it decides to
75                    # read from stdin (otherwise its a roulette to see what
76                    # child actually gets keystrokes for pdb etc).
77                    sys.stdin.close()
78                    subunit_result = AutoTimingTestResultDecorator(
79                        TestProtocolClient(stream)
80                    )
81                    process_suite.run(subunit_result)
82                except:
83                    # Try and report traceback on stream, but exit with error
84                    # even if stream couldn't be created or something else
85                    # goes wrong.  The traceback is formatted to a string and
86                    # written in one go to avoid interleaving lines from
87                    # multiple failing children.
88                    try:
89                        stream.write(traceback.format_exc())
90                    finally:
91                        os._exit(1)
92                os._exit(0)
93            else:
94                os.close(c2pwrite)
95                stream = os.fdopen(c2pread, 'rb', 1)
96                test = ProtocolTestCase(stream)
97                result.append(test)
98        return result
99    return do_fork
100
101
102def partition_tests(suite, count):
103    """Partition suite into count lists of tests."""
104    # This just assigns tests in a round-robin fashion.  On one hand this
105    # splits up blocks of related tests that might run faster if they shared
106    # resources, but on the other it avoids assigning blocks of slow tests to
107    # just one partition.  So the slowest partition shouldn't be much slower
108    # than the fastest.
109    partitions = [list() for _ in range(count)]
110    tests = iterate_tests(suite)
111    for partition, test in zip(cycle(partitions), tests):
112        partition.append(test)
113    return partitions
114
115
116if __name__ == '__main__':
117    import time
118
119    class SampleTestCase(unittest.TestCase):
120        """Dummy tests that sleep for demo."""
121
122        def test_me_1(self):
123            time.sleep(0.5)
124
125        def test_me_2(self):
126            time.sleep(0.5)
127
128        def test_me_3(self):
129            time.sleep(0.5)
130
131        def test_me_4(self):
132            time.sleep(0.5)
133
134    # Load tests from SampleTestCase defined above
135    suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase)
136    runner = unittest.TextTestRunner()
137
138    # Run tests sequentially
139    runner.run(suite)
140
141    # Run same tests across 4 processes
142    suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase)
143    concurrent_suite = ConcurrentTestSuite(suite, fork_for_tests(4))
144    runner.run(concurrent_suite)
145