Loading django/core/management/commands/test.py +1 −1 Original line number Diff line number Diff line Loading @@ -47,7 +47,7 @@ class Command(BaseCommand): action='store', dest='liveserver', default=None, help='Overrides the default address where the live server (used ' 'with LiveServerTestCase) is expected to run from. The ' 'default value is localhost:8081.'), 'default value is localhost:8081-8179.'), test_runner_class = get_runner(settings, self.test_runner) if hasattr(test_runner_class, 'option_list'): Loading django/db/backends/base/base.py +14 −0 Original line number Diff line number Diff line import copy import time import warnings from collections import deque Loading Loading @@ -622,3 +623,16 @@ class BaseDatabaseWrapper(object): func() finally: self.run_on_commit = [] def copy(self, alias=None, allow_thread_sharing=None): """ Return a copy of this connection. For tests that require two connections to the same database. """ settings_dict = copy.deepcopy(self.settings_dict) if alias is None: alias = self.alias if allow_thread_sharing is None: allow_thread_sharing = self.allow_thread_sharing return type(self)(settings_dict, alias, allow_thread_sharing) django/db/backends/base/creation.py +48 −4 Original line number Diff line number Diff line Loading @@ -190,13 +190,56 @@ class BaseDatabaseCreation(object): return test_database_name def destroy_test_db(self, old_database_name, verbosity=1, keepdb=False): def clone_test_db(self, number, verbosity=1, autoclobber=False, keepdb=False): """ Clone a test database. """ source_database_name = self.connection.settings_dict['NAME'] if verbosity >= 1: test_db_repr = '' action = 'Cloning test database' if verbosity >= 2: test_db_repr = " ('%s')" % source_database_name if keepdb: action = 'Using existing clone' print("%s for alias '%s'%s..." % (action, self.connection.alias, test_db_repr)) # We could skip this call if keepdb is True, but we instead # give it the keepdb param. See create_test_db for details. self._clone_test_db(number, verbosity, keepdb) def get_test_db_clone_settings(self, number): """ Return a modified connection settings dict for the n-th clone of a DB. """ # When this function is called, the test database has been created # already and its name has been copied to settings_dict['NAME'] so # we don't need to call _get_test_db_name. orig_settings_dict = self.connection.settings_dict new_settings_dict = orig_settings_dict.copy() new_settings_dict['NAME'] = '{}_{}'.format(orig_settings_dict['NAME'], number) return new_settings_dict def _clone_test_db(self, number, verbosity, keepdb=False): """ Internal implementation - duplicate the test db tables. """ raise NotImplementedError( "The database backend doesn't support cloning databases. " "Disable the option to run tests in parallel processes.") def destroy_test_db(self, old_database_name=None, verbosity=1, keepdb=False, number=None): """ Destroy a test database, prompting the user for confirmation if the database already exists. """ self.connection.close() if number is None: test_database_name = self.connection.settings_dict['NAME'] else: test_database_name = self.get_test_db_clone_settings(number)['NAME'] if verbosity >= 1: test_db_repr = '' action = 'Destroying' Loading @@ -213,6 +256,7 @@ class BaseDatabaseCreation(object): self._destroy_test_db(test_database_name, verbosity) # Restore the original database name if old_database_name is not None: settings.DATABASES[self.connection.alias]["NAME"] = old_database_name self.connection.settings_dict["NAME"] = old_database_name Loading django/db/backends/base/features.py +4 −0 Original line number Diff line number Diff line Loading @@ -212,6 +212,10 @@ class BaseDatabaseFeatures(object): # every expression is null? greatest_least_ignores_nulls = False # Can the backend clone databases for parallel test execution? # Defaults to False to allow third-party backends to opt-in. can_clone_databases = False def __init__(self, connection): self.connection = connection Loading django/db/backends/mysql/creation.py +36 −0 Original line number Diff line number Diff line import subprocess import sys from django.db.backends.base.creation import BaseDatabaseCreation from .client import DatabaseClient class DatabaseCreation(BaseDatabaseCreation): Loading @@ -11,3 +16,34 @@ class DatabaseCreation(BaseDatabaseCreation): if test_settings['COLLATION']: suffix.append('COLLATE %s' % test_settings['COLLATION']) return ' '.join(suffix) def _clone_test_db(self, number, verbosity, keepdb=False): qn = self.connection.ops.quote_name source_database_name = self.connection.settings_dict['NAME'] target_database_name = self.get_test_db_clone_settings(number)['NAME'] with self._nodb_connection.cursor() as cursor: try: cursor.execute("CREATE DATABASE %s" % qn(target_database_name)) except Exception as e: if keepdb: return try: if verbosity >= 1: print("Destroying old test database '%s'..." % self.connection.alias) cursor.execute("DROP DATABASE %s" % qn(target_database_name)) cursor.execute("CREATE DATABASE %s" % qn(target_database_name)) except Exception as e: sys.stderr.write("Got an error recreating the test database: %s\n" % e) sys.exit(2) dump_cmd = DatabaseClient.settings_to_cmd_args(self.connection.settings_dict) dump_cmd[0] = 'mysqldump' dump_cmd[-1] = source_database_name load_cmd = DatabaseClient.settings_to_cmd_args(self.connection.settings_dict) load_cmd[-1] = target_database_name dump_proc = subprocess.Popen(dump_cmd, stdout=subprocess.PIPE) load_proc = subprocess.Popen(load_cmd, stdin=dump_proc.stdout, stdout=subprocess.PIPE) dump_proc.stdout.close() # allow dump_proc to receive a SIGPIPE if load_proc exits. load_proc.communicate() Loading
django/core/management/commands/test.py +1 −1 Original line number Diff line number Diff line Loading @@ -47,7 +47,7 @@ class Command(BaseCommand): action='store', dest='liveserver', default=None, help='Overrides the default address where the live server (used ' 'with LiveServerTestCase) is expected to run from. The ' 'default value is localhost:8081.'), 'default value is localhost:8081-8179.'), test_runner_class = get_runner(settings, self.test_runner) if hasattr(test_runner_class, 'option_list'): Loading
django/db/backends/base/base.py +14 −0 Original line number Diff line number Diff line import copy import time import warnings from collections import deque Loading Loading @@ -622,3 +623,16 @@ class BaseDatabaseWrapper(object): func() finally: self.run_on_commit = [] def copy(self, alias=None, allow_thread_sharing=None): """ Return a copy of this connection. For tests that require two connections to the same database. """ settings_dict = copy.deepcopy(self.settings_dict) if alias is None: alias = self.alias if allow_thread_sharing is None: allow_thread_sharing = self.allow_thread_sharing return type(self)(settings_dict, alias, allow_thread_sharing)
django/db/backends/base/creation.py +48 −4 Original line number Diff line number Diff line Loading @@ -190,13 +190,56 @@ class BaseDatabaseCreation(object): return test_database_name def destroy_test_db(self, old_database_name, verbosity=1, keepdb=False): def clone_test_db(self, number, verbosity=1, autoclobber=False, keepdb=False): """ Clone a test database. """ source_database_name = self.connection.settings_dict['NAME'] if verbosity >= 1: test_db_repr = '' action = 'Cloning test database' if verbosity >= 2: test_db_repr = " ('%s')" % source_database_name if keepdb: action = 'Using existing clone' print("%s for alias '%s'%s..." % (action, self.connection.alias, test_db_repr)) # We could skip this call if keepdb is True, but we instead # give it the keepdb param. See create_test_db for details. self._clone_test_db(number, verbosity, keepdb) def get_test_db_clone_settings(self, number): """ Return a modified connection settings dict for the n-th clone of a DB. """ # When this function is called, the test database has been created # already and its name has been copied to settings_dict['NAME'] so # we don't need to call _get_test_db_name. orig_settings_dict = self.connection.settings_dict new_settings_dict = orig_settings_dict.copy() new_settings_dict['NAME'] = '{}_{}'.format(orig_settings_dict['NAME'], number) return new_settings_dict def _clone_test_db(self, number, verbosity, keepdb=False): """ Internal implementation - duplicate the test db tables. """ raise NotImplementedError( "The database backend doesn't support cloning databases. " "Disable the option to run tests in parallel processes.") def destroy_test_db(self, old_database_name=None, verbosity=1, keepdb=False, number=None): """ Destroy a test database, prompting the user for confirmation if the database already exists. """ self.connection.close() if number is None: test_database_name = self.connection.settings_dict['NAME'] else: test_database_name = self.get_test_db_clone_settings(number)['NAME'] if verbosity >= 1: test_db_repr = '' action = 'Destroying' Loading @@ -213,6 +256,7 @@ class BaseDatabaseCreation(object): self._destroy_test_db(test_database_name, verbosity) # Restore the original database name if old_database_name is not None: settings.DATABASES[self.connection.alias]["NAME"] = old_database_name self.connection.settings_dict["NAME"] = old_database_name Loading
django/db/backends/base/features.py +4 −0 Original line number Diff line number Diff line Loading @@ -212,6 +212,10 @@ class BaseDatabaseFeatures(object): # every expression is null? greatest_least_ignores_nulls = False # Can the backend clone databases for parallel test execution? # Defaults to False to allow third-party backends to opt-in. can_clone_databases = False def __init__(self, connection): self.connection = connection Loading
django/db/backends/mysql/creation.py +36 −0 Original line number Diff line number Diff line import subprocess import sys from django.db.backends.base.creation import BaseDatabaseCreation from .client import DatabaseClient class DatabaseCreation(BaseDatabaseCreation): Loading @@ -11,3 +16,34 @@ class DatabaseCreation(BaseDatabaseCreation): if test_settings['COLLATION']: suffix.append('COLLATE %s' % test_settings['COLLATION']) return ' '.join(suffix) def _clone_test_db(self, number, verbosity, keepdb=False): qn = self.connection.ops.quote_name source_database_name = self.connection.settings_dict['NAME'] target_database_name = self.get_test_db_clone_settings(number)['NAME'] with self._nodb_connection.cursor() as cursor: try: cursor.execute("CREATE DATABASE %s" % qn(target_database_name)) except Exception as e: if keepdb: return try: if verbosity >= 1: print("Destroying old test database '%s'..." % self.connection.alias) cursor.execute("DROP DATABASE %s" % qn(target_database_name)) cursor.execute("CREATE DATABASE %s" % qn(target_database_name)) except Exception as e: sys.stderr.write("Got an error recreating the test database: %s\n" % e) sys.exit(2) dump_cmd = DatabaseClient.settings_to_cmd_args(self.connection.settings_dict) dump_cmd[0] = 'mysqldump' dump_cmd[-1] = source_database_name load_cmd = DatabaseClient.settings_to_cmd_args(self.connection.settings_dict) load_cmd[-1] = target_database_name dump_proc = subprocess.Popen(dump_cmd, stdout=subprocess.PIPE) load_proc = subprocess.Popen(load_cmd, stdin=dump_proc.stdout, stdout=subprocess.PIPE) dump_proc.stdout.close() # allow dump_proc to receive a SIGPIPE if load_proc exits. load_proc.communicate()