Commit cd9fcd4e authored by Aymeric Augustin's avatar Aymeric Augustin
Browse files

Implemented a parallel test runner.

parent acb83308
Loading
Loading
Loading
Loading
+226 −8
Original line number Diff line number Diff line
import collections
import itertools
import logging
import multiprocessing
import os
import unittest
from importlib import import_module
from unittest import TestSuite, defaultTestLoader

from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
@@ -13,6 +14,11 @@ from django.test.utils import setup_test_environment, teardown_test_environment
from django.utils.datastructures import OrderedSet
from django.utils.six import StringIO

try:
    import tblib.pickling_support
except ImportError:
    tblib = None


class DebugSQLTextTestResult(unittest.TextTestResult):
    def __init__(self, stream, descriptions, verbosity):
@@ -54,19 +60,206 @@ class DebugSQLTextTestResult(unittest.TextTestResult):
            self.stream.writeln("%s" % sql_debug)


class RemoteTestResult(object):
    """
    Record information about which tests have succeeded and which have failed.

    The sole purpose of this class is to record events in the child processes
    so they can be replayed in the master process. As a consequence it doesn't
    inherit unittest.TestResult and doesn't attempt to implement all its API.

    The implementation matches the unpythonic coding style of unittest2.
    """

    def __init__(self):
        self.events = []
        self.failfast = False
        self.shouldStop = False
        self.testsRun = 0

    @property
    def test_index(self):
        return self.testsRun - 1

    def stop_if_failfast(self):
        if self.failfast:
            self.stop()

    def stop(self):
        self.shouldStop = True

    def startTestRun(self):
        self.events.append(('startTestRun',))

    def stopTestRun(self):
        self.events.append(('stopTestRun',))

    def startTest(self, test):
        self.testsRun += 1
        self.events.append(('startTest', self.test_index))

    def stopTest(self, test):
        self.events.append(('stopTest', self.test_index))

    def addError(self, test, err):
        self.events.append(('addError', self.test_index, err))
        self.stop_if_failfast()

    def addFailure(self, test, err):
        self.events.append(('addFailure', self.test_index, err))
        self.stop_if_failfast()

    def addSubTest(self, test, subtest, err):
        raise NotImplementedError("subtests aren't supported at this time")

    def addSuccess(self, test):
        self.events.append(('addSuccess', self.test_index))

    def addSkip(self, test, reason):
        self.events.append(('addSkip', self.test_index, reason))

    def addExpectedFailure(self, test, err):
        self.events.append(('addExpectedFailure', self.test_index, err))

    def addUnexpectedSuccess(self, test):
        self.events.append(('addUnexpectedSuccess', self.test_index))
        self.stop_if_failfast()


class RemoteTestRunner(object):
    """
    Run tests and record everything but don't display anything.

    The implementation matches the unpythonic coding style of unittest2.
    """

    resultclass = RemoteTestResult

    def __init__(self, failfast=False, resultclass=None):
        self.failfast = failfast
        if resultclass is not None:
            self.resultclass = resultclass

    def run(self, test):
        result = self.resultclass()
        unittest.registerResult(result)
        result.failfast = self.failfast
        test(result)
        return result


def default_test_processes():
    """
    Default number of test processes when using the --parallel option.
    """
    try:
        return int(os.environ['DJANGO_TEST_PROCESSES'])
    except KeyError:
        return multiprocessing.cpu_count()


def _run_subsuite(args):
    """
    Run a suite of tests with a RemoteTestRunner and return a RemoteTestResult.

    This helper lives at module-level and its arguments are wrapped in a tuple
    because of idiosyncrasies of Python's multiprocessing module.
    """
    subsuite_index, subsuite, failfast = args
    runner = RemoteTestRunner(failfast=failfast)
    result = runner.run(subsuite)
    return subsuite_index, result.events


class ParallelTestSuite(unittest.TestSuite):
    """
    Run a series of tests in parallel in several processes.

    While the unittest module's documentation implies that orchestrating the
    execution of tests is the responsibility of the test runner, in practice,
    it appears that TestRunner classes are more concerned with formatting and
    displaying test results.

    Since there are fewer use cases for customizing TestSuite than TestRunner,
    implementing parallelization at the level of the TestSuite improves
    interoperability with existing custom test runners. A single instance of a
    test runner can still collect results from all tests without being aware
    that they have been run in parallel.
    """

    def __init__(self, suite, processes, failfast=False):
        self.subsuites = partition_suite_by_case(suite)
        self.processes = processes
        self.failfast = failfast
        super(ParallelTestSuite, self).__init__()

    def run(self, result):
        """
        Distribute test cases across workers.

        Return an identifier of each test case with its result in order to use
        imap_unordered to show results as soon as they're available.

        To minimize pickling errors when getting results from workers:

        - pass back numeric indexes in self.subsuites instead of tests
        - make tracebacks pickleable with tblib, if available

        Even with tblib, errors may still occur for dynamically created
        exception classes such Model.DoesNotExist which cannot be unpickled.
        """
        if tblib is not None:
            tblib.pickling_support.install()

        pool = multiprocessing.Pool(processes=self.processes)
        args = [
            (index, subsuite, self.failfast)
            for index, subsuite in enumerate(self.subsuites)
        ]
        test_results = pool.imap_unordered(_run_subsuite, args)

        while True:
            if result.shouldStop:
                pool.terminate()
                break

            try:
                subsuite_index, events = test_results.next(timeout=0.1)
            except multiprocessing.TimeoutError:
                continue
            except StopIteration:
                pool.close()
                break

            tests = list(self.subsuites[subsuite_index])
            for event in events:
                event_name = event[0]
                handler = getattr(result, event_name, None)
                if handler is None:
                    continue
                test = tests[event[1]]
                args = event[2:]
                handler(test, *args)

        pool.join()

        return result


class DiscoverRunner(object):
    """
    A Django test runner that uses unittest2 test discovery.
    """

    test_suite = TestSuite
    test_suite = unittest.TestSuite
    test_runner = unittest.TextTestRunner
    test_loader = defaultTestLoader
    test_loader = unittest.defaultTestLoader
    reorder_by = (TestCase, SimpleTestCase)

    def __init__(self, pattern=None, top_level=None, verbosity=1,
                 interactive=True, failfast=False, keepdb=False,
                 reverse=False, debug_sql=False, **kwargs):
                 reverse=False, debug_sql=False, parallel=0,
                 **kwargs):

        self.pattern = pattern
        self.top_level = top_level
@@ -77,6 +270,7 @@ class DiscoverRunner(object):
        self.keepdb = keepdb
        self.reverse = reverse
        self.debug_sql = debug_sql
        self.parallel = parallel

    @classmethod
    def add_arguments(cls, parser):
@@ -95,6 +289,10 @@ class DiscoverRunner(object):
        parser.add_argument('-d', '--debug-sql', action='store_true', dest='debug_sql',
            default=False,
            help='Prints logged SQL queries on failure.')
        parser.add_argument(
            '--parallel', dest='parallel', nargs='?', default=1, type=int,
            const=default_test_processes(),
            help='Run tests in parallel processes.')

    def setup_test_environment(self, **kwargs):
        setup_test_environment()
@@ -160,7 +358,12 @@ class DiscoverRunner(object):
        for test in extra_tests:
            suite.addTest(test)

        return reorder_suite(suite, self.reorder_by, self.reverse)
        suite = reorder_suite(suite, self.reorder_by, self.reverse)

        if self.parallel > 1:
            suite = ParallelTestSuite(suite, self.parallel, self.failfast)

        return suite

    def setup_databases(self, **kwargs):
        return setup_databases(
@@ -288,14 +491,14 @@ def reorder_suite(suite, classes, reverse=False):
    class_count = len(classes)
    suite_class = type(suite)
    bins = [OrderedSet() for i in range(class_count + 1)]
    partition_suite(suite, classes, bins, reverse=reverse)
    partition_suite_by_type(suite, classes, bins, reverse=reverse)
    reordered_suite = suite_class()
    for i in range(class_count + 1):
        reordered_suite.addTests(bins[i])
    return reordered_suite


def partition_suite(suite, classes, bins, reverse=False):
def partition_suite_by_type(suite, classes, bins, reverse=False):
    """
    Partitions a test suite by test type. Also prevents duplicated tests.

@@ -311,7 +514,7 @@ def partition_suite(suite, classes, bins, reverse=False):
        suite = reversed(tuple(suite))
    for test in suite:
        if isinstance(test, suite_class):
            partition_suite(test, classes, bins, reverse=reverse)
            partition_suite_by_type(test, classes, bins, reverse=reverse)
        else:
            for i in range(len(classes)):
                if isinstance(test, classes[i]):
@@ -321,6 +524,21 @@ def partition_suite(suite, classes, bins, reverse=False):
                bins[-1].add(test)


def partition_suite_by_case(suite):
    """
    Partitions a test suite by test case, preserving the order of tests.
    """
    groups = []
    suite_class = type(suite)
    for test_type, test_group in itertools.groupby(suite, type):
        if issubclass(test_type, unittest.TestCase):
            groups.append(suite_class(test_group))
        else:
            for item in test_group:
                groups.extend(partition_suite_by_case(item))
    return groups


def get_unique_databases():
    """
    Figure out which databases actually need to be created.
+20 −0
Original line number Diff line number Diff line
@@ -1257,6 +1257,26 @@ The ``--debug-sql`` option can be used to enable :ref:`SQL logging
<django-db-logger>` for failing tests. If :djadminopt:`--verbosity` is ``2``,
then queries in passing tests are also output.

.. django-admin-option:: --parallel

.. versionadded:: 1.9

The ``--parallel`` option can be used to run tests in parallel in separate
processes. Since modern processors have multiple cores, this allows running
tests significantly faster.

By default ``--parallel`` runs one process per core according to
:func:`multiprocessing.cpu_count()`. You can adjust the number of processes
either by providing it as the option's value, e.g. ``--parallel=4``, or by
setting the ``DJANGO_TEST_PROCESSES`` environment variable.

This option requires the third-party ``tblib`` package to display tracebacks
correctly:

.. code-block:: console

    $ pip install tblib

testserver <fixture fixture ...>
--------------------------------

+6 −0
Original line number Diff line number Diff line
@@ -125,6 +125,12 @@ degradation.

.. _YUI's A-grade: https://github.com/yui/yui3/wiki/Graded-Browser-Support

Running tests in parallel
~~~~~~~~~~~~~~~~~~~~~~~~~

The :djadmin:`test` command now supports a :djadminopt:`--parallel` option to
run a project's tests in multiple processes in parallel.

Minor features
~~~~~~~~~~~~~~

+16 −7
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@ from django.apps import apps
from django.conf import settings
from django.db import connection
from django.test import TestCase, TransactionTestCase
from django.test.runner import default_test_processes
from django.test.utils import get_runner
from django.utils import six
from django.utils._os import upath
@@ -93,9 +94,10 @@ def get_installed():
    return [app_config.name for app_config in apps.get_app_configs()]


def setup(verbosity, test_labels):
def setup(verbosity, test_labels, parallel):
    if verbosity >= 1:
        print("Testing against Django installed in '%s'" % os.path.dirname(django.__file__))
        print("Testing against Django installed in '%s' with %d processes" % (
            os.path.dirname(django.__file__), parallel))

    # Force declaring available_apps in TransactionTestCase for faster tests.
    def no_available_apps(self):
@@ -232,8 +234,9 @@ def teardown(state):
        setattr(settings, key, value)


def django_tests(verbosity, interactive, failfast, keepdb, reverse, test_labels, debug_sql):
    state = setup(verbosity, test_labels)
def django_tests(verbosity, interactive, failfast, keepdb, reverse,
                 test_labels, debug_sql, parallel):
    state = setup(verbosity, test_labels, parallel)
    extra_tests = []

    # Run the test suite, including the extra validation tests.
@@ -248,6 +251,7 @@ def django_tests(verbosity, interactive, failfast, keepdb, reverse, test_labels,
        keepdb=keepdb,
        reverse=reverse,
        debug_sql=debug_sql,
        parallel=parallel,
    )
    failures = test_runner.run_tests(
        test_labels or get_installed(),
@@ -389,10 +393,15 @@ if __name__ == "__main__":
             'is localhost:8081.')
    parser.add_argument(
        '--selenium', action='store_true', dest='selenium', default=False,
        help='Run the Selenium tests as well (if Selenium is installed)')
        help='Run the Selenium tests as well (if Selenium is installed).')
    parser.add_argument(
        '--debug-sql', action='store_true', dest='debug_sql', default=False,
        help='Turn on the SQL query logger within tests')
        help='Turn on the SQL query logger within tests.')
    parser.add_argument(
        '--parallel', dest='parallel', nargs='?', default=1, type=int,
        const=default_test_processes(),
        help='Run tests in parallel processes.')

    options = parser.parse_args()

    # mock is a required dependency
@@ -429,6 +438,6 @@ if __name__ == "__main__":
        failures = django_tests(options.verbosity, options.interactive,
                                options.failfast, options.keepdb,
                                options.reverse, options.modules,
                                options.debug_sql)
                                options.debug_sql, options.parallel)
        if failures:
            sys.exit(bool(failures))