Commit 6a8cfbf0 authored by Andrew Godwin's avatar Andrew Godwin
Browse files

Support for index_together in schema backends

parent 3b20af3e
Loading
Loading
Loading
Loading
+35 −1
Original line number Diff line number Diff line
@@ -84,7 +84,7 @@ class BaseDatabaseSchemaEditor(object):
        # Get the cursor
        cursor = self.connection.cursor()
        # Log the command we're running, then run it
        logger.info("%s; (params %r)" % (sql, params))
        logger.debug("%s; (params %r)" % (sql, params))
        cursor.execute(sql, params)

    def quote_name(self, name):
@@ -253,6 +253,40 @@ class BaseDatabaseSchemaEditor(object):
                "columns": ", ".join(self.quote_name(column) for column in columns),
            })

    def alter_index_together(self, model, old_index_together, new_index_together):
        """
        Deals with a model changing its index_together.
        Note: The input index_togethers must be doubly-nested, not the single-
        nested ["foo", "bar"] format.
        """
        olds = set(frozenset(fields) for fields in old_index_together)
        news = set(frozenset(fields) for fields in new_index_together)
        # Deleted indexes
        for fields in olds.difference(news):
            columns = [model._meta.get_field_by_name(field)[0].column for field in fields]
            constraint_names = self._constraint_names(model, list(columns), index=True)
            if len(constraint_names) != 1:
                raise ValueError("Found wrong number (%s) of constraints for %s(%s)" % (
                    len(constraint_names),
                    model._meta.db_table,
                    ", ".join(columns),
                ))
            self.execute(
                self.sql_delete_index % {
                    "table": self.quote_name(model._meta.db_table),
                    "name": constraint_names[0],
                },
            )
        # Created indexes
        for fields in news.difference(olds):
            columns = [model._meta.get_field_by_name(field)[0].column for field in fields]
            self.execute(self.sql_create_index % {
                "table": self.quote_name(model._meta.db_table),
                "name": self._create_index_name(model, columns, suffix="_idx"),
                "columns": ", ".join(self.quote_name(column) for column in columns),
                "extra": "",
            })

    def alter_db_table(self, model, old_db_table, new_db_table):
        """
        Renames the table a model points to.
+51 −0
Original line number Diff line number Diff line
@@ -452,6 +452,57 @@ class SchemaTests(TransactionTestCase):
        self.assertRaises(IntegrityError, UniqueTest.objects.create, year=2012, slug="foo")
        UniqueTest.objects.all().delete()

    def test_index_together(self):
        """
        Tests removing and adding index_together constraints on a model.
        """
        # Create the table
        with connection.schema_editor() as editor:
            editor.create_model(Tag)
        # Ensure there's no index on the year/slug columns first
        self.assertEqual(
            False,
            any(
                c["index"]
                for c in connection.introspection.get_constraints(connection.cursor(), "schema_tag").values()
                if c['columns'] == set(["slug", "title"])
            ),
        )
        # Alter the model to add an index
        with connection.schema_editor() as editor:
            editor.alter_index_together(
                Tag,
                [],
                [("slug", "title")],
            )
        # Ensure there is now an index
        self.assertEqual(
            True,
            any(
                c["index"]
                for c in connection.introspection.get_constraints(connection.cursor(), "schema_tag").values()
                if c['columns'] == set(["slug", "title"])
            ),
        )
        # Alter it back
        new_new_field = SlugField(unique=True)
        new_new_field.set_attributes_from_name("slug")
        with connection.schema_editor() as editor:
            editor.alter_unique_together(
                Tag,
                [("slug", "title")],
                [],
            )
        # Ensure there's no index
        self.assertEqual(
            False,
            any(
                c["index"]
                for c in connection.introspection.get_constraints(connection.cursor(), "schema_tag").values()
                if c['columns'] == set(["slug", "title"])
            ),
        )

    def test_db_table(self):
        """
        Tests renaming of the table