Loading django/db/migrations/executor.py +16 −3 Original line number Diff line number Diff line from __future__ import unicode_literals from django.apps.registry import apps as global_apps from django.db import migrations from django.db import migrations, router from .exceptions import InvalidMigrationPlan from .loader import MigrationLoader Loading Loading @@ -250,6 +250,19 @@ class MigrationExecutor(object): tables or columns it would create exist. This is intended only for use on initial migrations (as it only looks for CreateModel and AddField). """ def should_skip_detecting_model(migration, model): """ No need to detect tables for proxy models, unmanaged models, or models that can't be migrated on the current database. """ return ( model._meta.proxy or not model._meta.managed or not router.allow_migrate( self.connection.alias, migration.app_label, model_name=model._meta.model_name, ) ) if migration.initial is None: # Bail if the migration isn't the first one in its app if any(app == migration.app_label for app, name in migration.dependencies): Loading @@ -274,7 +287,7 @@ class MigrationExecutor(object): # We have to fetch the model to test with from the # main app cache, as it's not a direct dependency. model = global_apps.get_model(model._meta.swapped) if model._meta.proxy or not model._meta.managed: if should_skip_detecting_model(migration, model): continue if model._meta.db_table not in existing_table_names: return False, project_state Loading @@ -285,7 +298,7 @@ class MigrationExecutor(object): # We have to fetch the model to test with from the # main app cache, as it's not a direct dependency. model = global_apps.get_model(model._meta.swapped) if model._meta.proxy or not model._meta.managed: if should_skip_detecting_model(migration, model): continue table = model._meta.db_table Loading tests/migrations/routers.py 0 → 100644 +9 −0 Original line number Diff line number Diff line class TestRouter(object): def allow_migrate(self, db, app_label, model_name=None, **hints): """ The Tribble model should be the only one to appear in the 'other' db. """ if model_name == 'tribble': return db == 'other' elif db == 'other': return False tests/migrations/test_base.py +30 −26 Original line number Diff line number Diff line Loading @@ -5,7 +5,7 @@ from contextlib import contextmanager from importlib import import_module from django.apps import apps from django.db import connection from django.db import connections from django.db.migrations.recorder import MigrationRecorder from django.test import TransactionTestCase from django.test.utils import extend_sys_path Loading @@ -21,40 +21,44 @@ class MigrationTestBase(TransactionTestCase): def tearDown(self): # Reset applied-migrations state. recorder = MigrationRecorder(connection) for db in connections: recorder = MigrationRecorder(connections[db]) recorder.migration_qs.filter(app='migrations').delete() def get_table_description(self, table): with connection.cursor() as cursor: return connection.introspection.get_table_description(cursor, table) def get_table_description(self, table, using='default'): with connections[using].cursor() as cursor: return connections[using].introspection.get_table_description(cursor, table) def assertTableExists(self, table): with connection.cursor() as cursor: self.assertIn(table, connection.introspection.table_names(cursor)) def assertTableExists(self, table, using='default'): with connections[using].cursor() as cursor: self.assertIn(table, connections[using].introspection.table_names(cursor)) def assertTableNotExists(self, table): with connection.cursor() as cursor: self.assertNotIn(table, connection.introspection.table_names(cursor)) def assertTableNotExists(self, table, using='default'): with connections[using].cursor() as cursor: self.assertNotIn(table, connections[using].introspection.table_names(cursor)) def assertColumnExists(self, table, column): self.assertIn(column, [c.name for c in self.get_table_description(table)]) def assertColumnExists(self, table, column, using='default'): self.assertIn(column, [c.name for c in self.get_table_description(table, using=using)]) def assertColumnNotExists(self, table, column): self.assertNotIn(column, [c.name for c in self.get_table_description(table)]) def assertColumnNotExists(self, table, column, using='default'): self.assertNotIn(column, [c.name for c in self.get_table_description(table, using=using)]) def assertColumnNull(self, table, column): self.assertEqual([c.null_ok for c in self.get_table_description(table) if c.name == column][0], True) def _get_column_allows_null(self, table, column, using): return [c.null_ok for c in self.get_table_description(table, using=using) if c.name == column][0] def assertColumnNotNull(self, table, column): self.assertEqual([c.null_ok for c in self.get_table_description(table) if c.name == column][0], False) def assertColumnNull(self, table, column, using='default'): self.assertEqual(self._get_column_allows_null(table, column, using), True) def assertIndexExists(self, table, columns, value=True): with connection.cursor() as cursor: def assertColumnNotNull(self, table, column, using='default'): self.assertEqual(self._get_column_allows_null(table, column, using), False) def assertIndexExists(self, table, columns, value=True, using='default'): with connections[using].cursor() as cursor: self.assertEqual( value, any( c["index"] for c in connection.introspection.get_constraints(cursor, table).values() for c in connections[using].introspection.get_constraints(cursor, table).values() if c['columns'] == list(columns) ), ) Loading @@ -62,13 +66,13 @@ class MigrationTestBase(TransactionTestCase): def assertIndexNotExists(self, table, columns): return self.assertIndexExists(table, columns, False) def assertFKExists(self, table, columns, to, value=True): with connection.cursor() as cursor: def assertFKExists(self, table, columns, to, value=True, using='default'): with connections[using].cursor() as cursor: self.assertEqual( value, any( c["foreign_key"] == to for c in connection.introspection.get_constraints(cursor, table).values() for c in connections[using].introspection.get_constraints(cursor, table).values() if c['columns'] == list(columns) ), ) Loading tests/migrations/test_commands.py +33 −12 Original line number Diff line number Diff line Loading @@ -7,7 +7,7 @@ import os from django.apps import apps from django.core.management import CommandError, call_command from django.db import DatabaseError, connection, models from django.db import DatabaseError, connection, connections, models from django.db.migrations.recorder import MigrationRecorder from django.test import ignore_warnings, mock, override_settings from django.utils import six Loading @@ -22,6 +22,7 @@ class MigrateTests(MigrationTestBase): """ Tests running the migrate command. """ multi_db = True @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"}) def test_migrate(self): Loading Loading @@ -75,25 +76,36 @@ class MigrateTests(MigrationTestBase): self.assertTableNotExists("migrations_tribble") self.assertTableNotExists("migrations_book") @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"}) @override_settings( MIGRATION_MODULES={"migrations": "migrations.test_migrations"}, DATABASE_ROUTERS=['migrations.routers.TestRouter'], ) def test_migrate_fake_initial(self): """ #24184 - Tests that --fake-initial only works if all tables created in the initial migration of an app exists --fake-initial only works if all tables created in the initial migration of an app exists. Database routers must be obeyed when doing that check. """ # Make sure no tables are created self.assertTableNotExists("migrations_author") self.assertTableNotExists("migrations_tribble") for db in connections: self.assertTableNotExists("migrations_author", using=db) self.assertTableNotExists("migrations_tribble", using=db) # Run the migrations to 0001 only call_command("migrate", "migrations", "0001", verbosity=0) call_command("migrate", "migrations", "0001", verbosity=0, database="other") # Make sure the right tables exist self.assertTableExists("migrations_author") self.assertTableExists("migrations_tribble") self.assertTableNotExists("migrations_tribble") # Also check the "other" database self.assertTableNotExists("migrations_author", using="other") self.assertTableExists("migrations_tribble", using="other") # Fake a roll-back call_command("migrate", "migrations", "zero", fake=True, verbosity=0) call_command("migrate", "migrations", "zero", fake=True, verbosity=0, database="other") # Make sure the tables still exist self.assertTableExists("migrations_author") self.assertTableExists("migrations_tribble") self.assertTableExists("migrations_tribble", using="other") # Try to run initial migration with self.assertRaises(DatabaseError): call_command("migrate", "migrations", "0001", verbosity=0) Loading @@ -101,18 +113,24 @@ class MigrateTests(MigrationTestBase): out = six.StringIO() with mock.patch('django.core.management.color.supports_color', lambda *args: False): call_command("migrate", "migrations", "0001", fake_initial=True, stdout=out, verbosity=1) call_command("migrate", "migrations", "0001", fake_initial=True, verbosity=0, database="other") self.assertIn( "migrations.0001_initial... faked", out.getvalue().lower() ) # Run migrations all the way call_command("migrate", verbosity=0) call_command("migrate", verbosity=0, database="other") # Make sure the right tables exist self.assertTableExists("migrations_author") self.assertTableNotExists("migrations_tribble") self.assertTableExists("migrations_book") self.assertTableNotExists("migrations_author", using="other") self.assertTableNotExists("migrations_tribble", using="other") self.assertTableNotExists("migrations_book", using="other") # Fake a roll-back call_command("migrate", "migrations", "zero", fake=True, verbosity=0) call_command("migrate", "migrations", "zero", fake=True, verbosity=0, database="other") # Make sure the tables still exist self.assertTableExists("migrations_author") self.assertTableNotExists("migrations_tribble") Loading @@ -127,12 +145,15 @@ class MigrateTests(MigrationTestBase): call_command("migrate", "migrations", fake_initial=True, verbosity=0) # Fake a apply call_command("migrate", "migrations", fake=True, verbosity=0) call_command("migrate", "migrations", fake=True, verbosity=0, database="other") # Unmigrate everything call_command("migrate", "migrations", "zero", verbosity=0) call_command("migrate", "migrations", "zero", verbosity=0, database="other") # Make sure it's all gone self.assertTableNotExists("migrations_author") self.assertTableNotExists("migrations_tribble") self.assertTableNotExists("migrations_book") for db in connections: self.assertTableNotExists("migrations_author", using=db) self.assertTableNotExists("migrations_tribble", using=db) self.assertTableNotExists("migrations_book", using=db) @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_fake_split_initial"}) def test_migrate_fake_split_initial(self): Loading Loading @@ -1066,7 +1087,7 @@ class SquashMigrationsTests(MigrationTestBase): out = six.StringIO() with self.temporary_migration_module(module="migrations.test_migrations"): call_command("squashmigrations", "migrations", "0002", interactive=False, verbosity=1, stdout=out) self.assertIn("Optimized from 7 operations to 3 operations.", force_text(out.getvalue())) self.assertIn("Optimized from 8 operations to 3 operations.", force_text(out.getvalue())) def test_ticket_23799_squashmigrations_no_optimize(self): """ Loading tests/migrations/test_migrations/0001_initial.py +5 −4 Original line number Diff line number Diff line Loading @@ -9,7 +9,6 @@ class Migration(migrations.Migration): initial = True operations = [ migrations.CreateModel( "Author", [ Loading @@ -20,7 +19,6 @@ class Migration(migrations.Migration): ("silly_field", models.BooleanField(default=False)), ], ), migrations.CreateModel( "Tribble", [ Loading @@ -28,10 +26,13 @@ class Migration(migrations.Migration): ("fluffy", models.BooleanField(default=True)), ], ), migrations.AddField( model_name='tribble', name='bool', field=models.BooleanField(default=False), ), migrations.AlterUniqueTogether( name='author', unique_together=set([('name', 'slug')]), ), ] Loading
django/db/migrations/executor.py +16 −3 Original line number Diff line number Diff line from __future__ import unicode_literals from django.apps.registry import apps as global_apps from django.db import migrations from django.db import migrations, router from .exceptions import InvalidMigrationPlan from .loader import MigrationLoader Loading Loading @@ -250,6 +250,19 @@ class MigrationExecutor(object): tables or columns it would create exist. This is intended only for use on initial migrations (as it only looks for CreateModel and AddField). """ def should_skip_detecting_model(migration, model): """ No need to detect tables for proxy models, unmanaged models, or models that can't be migrated on the current database. """ return ( model._meta.proxy or not model._meta.managed or not router.allow_migrate( self.connection.alias, migration.app_label, model_name=model._meta.model_name, ) ) if migration.initial is None: # Bail if the migration isn't the first one in its app if any(app == migration.app_label for app, name in migration.dependencies): Loading @@ -274,7 +287,7 @@ class MigrationExecutor(object): # We have to fetch the model to test with from the # main app cache, as it's not a direct dependency. model = global_apps.get_model(model._meta.swapped) if model._meta.proxy or not model._meta.managed: if should_skip_detecting_model(migration, model): continue if model._meta.db_table not in existing_table_names: return False, project_state Loading @@ -285,7 +298,7 @@ class MigrationExecutor(object): # We have to fetch the model to test with from the # main app cache, as it's not a direct dependency. model = global_apps.get_model(model._meta.swapped) if model._meta.proxy or not model._meta.managed: if should_skip_detecting_model(migration, model): continue table = model._meta.db_table Loading
tests/migrations/routers.py 0 → 100644 +9 −0 Original line number Diff line number Diff line class TestRouter(object): def allow_migrate(self, db, app_label, model_name=None, **hints): """ The Tribble model should be the only one to appear in the 'other' db. """ if model_name == 'tribble': return db == 'other' elif db == 'other': return False
tests/migrations/test_base.py +30 −26 Original line number Diff line number Diff line Loading @@ -5,7 +5,7 @@ from contextlib import contextmanager from importlib import import_module from django.apps import apps from django.db import connection from django.db import connections from django.db.migrations.recorder import MigrationRecorder from django.test import TransactionTestCase from django.test.utils import extend_sys_path Loading @@ -21,40 +21,44 @@ class MigrationTestBase(TransactionTestCase): def tearDown(self): # Reset applied-migrations state. recorder = MigrationRecorder(connection) for db in connections: recorder = MigrationRecorder(connections[db]) recorder.migration_qs.filter(app='migrations').delete() def get_table_description(self, table): with connection.cursor() as cursor: return connection.introspection.get_table_description(cursor, table) def get_table_description(self, table, using='default'): with connections[using].cursor() as cursor: return connections[using].introspection.get_table_description(cursor, table) def assertTableExists(self, table): with connection.cursor() as cursor: self.assertIn(table, connection.introspection.table_names(cursor)) def assertTableExists(self, table, using='default'): with connections[using].cursor() as cursor: self.assertIn(table, connections[using].introspection.table_names(cursor)) def assertTableNotExists(self, table): with connection.cursor() as cursor: self.assertNotIn(table, connection.introspection.table_names(cursor)) def assertTableNotExists(self, table, using='default'): with connections[using].cursor() as cursor: self.assertNotIn(table, connections[using].introspection.table_names(cursor)) def assertColumnExists(self, table, column): self.assertIn(column, [c.name for c in self.get_table_description(table)]) def assertColumnExists(self, table, column, using='default'): self.assertIn(column, [c.name for c in self.get_table_description(table, using=using)]) def assertColumnNotExists(self, table, column): self.assertNotIn(column, [c.name for c in self.get_table_description(table)]) def assertColumnNotExists(self, table, column, using='default'): self.assertNotIn(column, [c.name for c in self.get_table_description(table, using=using)]) def assertColumnNull(self, table, column): self.assertEqual([c.null_ok for c in self.get_table_description(table) if c.name == column][0], True) def _get_column_allows_null(self, table, column, using): return [c.null_ok for c in self.get_table_description(table, using=using) if c.name == column][0] def assertColumnNotNull(self, table, column): self.assertEqual([c.null_ok for c in self.get_table_description(table) if c.name == column][0], False) def assertColumnNull(self, table, column, using='default'): self.assertEqual(self._get_column_allows_null(table, column, using), True) def assertIndexExists(self, table, columns, value=True): with connection.cursor() as cursor: def assertColumnNotNull(self, table, column, using='default'): self.assertEqual(self._get_column_allows_null(table, column, using), False) def assertIndexExists(self, table, columns, value=True, using='default'): with connections[using].cursor() as cursor: self.assertEqual( value, any( c["index"] for c in connection.introspection.get_constraints(cursor, table).values() for c in connections[using].introspection.get_constraints(cursor, table).values() if c['columns'] == list(columns) ), ) Loading @@ -62,13 +66,13 @@ class MigrationTestBase(TransactionTestCase): def assertIndexNotExists(self, table, columns): return self.assertIndexExists(table, columns, False) def assertFKExists(self, table, columns, to, value=True): with connection.cursor() as cursor: def assertFKExists(self, table, columns, to, value=True, using='default'): with connections[using].cursor() as cursor: self.assertEqual( value, any( c["foreign_key"] == to for c in connection.introspection.get_constraints(cursor, table).values() for c in connections[using].introspection.get_constraints(cursor, table).values() if c['columns'] == list(columns) ), ) Loading
tests/migrations/test_commands.py +33 −12 Original line number Diff line number Diff line Loading @@ -7,7 +7,7 @@ import os from django.apps import apps from django.core.management import CommandError, call_command from django.db import DatabaseError, connection, models from django.db import DatabaseError, connection, connections, models from django.db.migrations.recorder import MigrationRecorder from django.test import ignore_warnings, mock, override_settings from django.utils import six Loading @@ -22,6 +22,7 @@ class MigrateTests(MigrationTestBase): """ Tests running the migrate command. """ multi_db = True @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"}) def test_migrate(self): Loading Loading @@ -75,25 +76,36 @@ class MigrateTests(MigrationTestBase): self.assertTableNotExists("migrations_tribble") self.assertTableNotExists("migrations_book") @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations"}) @override_settings( MIGRATION_MODULES={"migrations": "migrations.test_migrations"}, DATABASE_ROUTERS=['migrations.routers.TestRouter'], ) def test_migrate_fake_initial(self): """ #24184 - Tests that --fake-initial only works if all tables created in the initial migration of an app exists --fake-initial only works if all tables created in the initial migration of an app exists. Database routers must be obeyed when doing that check. """ # Make sure no tables are created self.assertTableNotExists("migrations_author") self.assertTableNotExists("migrations_tribble") for db in connections: self.assertTableNotExists("migrations_author", using=db) self.assertTableNotExists("migrations_tribble", using=db) # Run the migrations to 0001 only call_command("migrate", "migrations", "0001", verbosity=0) call_command("migrate", "migrations", "0001", verbosity=0, database="other") # Make sure the right tables exist self.assertTableExists("migrations_author") self.assertTableExists("migrations_tribble") self.assertTableNotExists("migrations_tribble") # Also check the "other" database self.assertTableNotExists("migrations_author", using="other") self.assertTableExists("migrations_tribble", using="other") # Fake a roll-back call_command("migrate", "migrations", "zero", fake=True, verbosity=0) call_command("migrate", "migrations", "zero", fake=True, verbosity=0, database="other") # Make sure the tables still exist self.assertTableExists("migrations_author") self.assertTableExists("migrations_tribble") self.assertTableExists("migrations_tribble", using="other") # Try to run initial migration with self.assertRaises(DatabaseError): call_command("migrate", "migrations", "0001", verbosity=0) Loading @@ -101,18 +113,24 @@ class MigrateTests(MigrationTestBase): out = six.StringIO() with mock.patch('django.core.management.color.supports_color', lambda *args: False): call_command("migrate", "migrations", "0001", fake_initial=True, stdout=out, verbosity=1) call_command("migrate", "migrations", "0001", fake_initial=True, verbosity=0, database="other") self.assertIn( "migrations.0001_initial... faked", out.getvalue().lower() ) # Run migrations all the way call_command("migrate", verbosity=0) call_command("migrate", verbosity=0, database="other") # Make sure the right tables exist self.assertTableExists("migrations_author") self.assertTableNotExists("migrations_tribble") self.assertTableExists("migrations_book") self.assertTableNotExists("migrations_author", using="other") self.assertTableNotExists("migrations_tribble", using="other") self.assertTableNotExists("migrations_book", using="other") # Fake a roll-back call_command("migrate", "migrations", "zero", fake=True, verbosity=0) call_command("migrate", "migrations", "zero", fake=True, verbosity=0, database="other") # Make sure the tables still exist self.assertTableExists("migrations_author") self.assertTableNotExists("migrations_tribble") Loading @@ -127,12 +145,15 @@ class MigrateTests(MigrationTestBase): call_command("migrate", "migrations", fake_initial=True, verbosity=0) # Fake a apply call_command("migrate", "migrations", fake=True, verbosity=0) call_command("migrate", "migrations", fake=True, verbosity=0, database="other") # Unmigrate everything call_command("migrate", "migrations", "zero", verbosity=0) call_command("migrate", "migrations", "zero", verbosity=0, database="other") # Make sure it's all gone self.assertTableNotExists("migrations_author") self.assertTableNotExists("migrations_tribble") self.assertTableNotExists("migrations_book") for db in connections: self.assertTableNotExists("migrations_author", using=db) self.assertTableNotExists("migrations_tribble", using=db) self.assertTableNotExists("migrations_book", using=db) @override_settings(MIGRATION_MODULES={"migrations": "migrations.test_migrations_fake_split_initial"}) def test_migrate_fake_split_initial(self): Loading Loading @@ -1066,7 +1087,7 @@ class SquashMigrationsTests(MigrationTestBase): out = six.StringIO() with self.temporary_migration_module(module="migrations.test_migrations"): call_command("squashmigrations", "migrations", "0002", interactive=False, verbosity=1, stdout=out) self.assertIn("Optimized from 7 operations to 3 operations.", force_text(out.getvalue())) self.assertIn("Optimized from 8 operations to 3 operations.", force_text(out.getvalue())) def test_ticket_23799_squashmigrations_no_optimize(self): """ Loading
tests/migrations/test_migrations/0001_initial.py +5 −4 Original line number Diff line number Diff line Loading @@ -9,7 +9,6 @@ class Migration(migrations.Migration): initial = True operations = [ migrations.CreateModel( "Author", [ Loading @@ -20,7 +19,6 @@ class Migration(migrations.Migration): ("silly_field", models.BooleanField(default=False)), ], ), migrations.CreateModel( "Tribble", [ Loading @@ -28,10 +26,13 @@ class Migration(migrations.Migration): ("fluffy", models.BooleanField(default=True)), ], ), migrations.AddField( model_name='tribble', name='bool', field=models.BooleanField(default=False), ), migrations.AlterUniqueTogether( name='author', unique_together=set([('name', 'slug')]), ), ]