Loading django/db/backends/mysql/introspection.py +6 −1 Original line number Diff line number Diff line Loading @@ -99,7 +99,12 @@ class DatabaseIntrospection(BaseDatabaseIntrospection): for row in rows: if row[2] in multicol_indexes: continue indexes[row[4]] = {'primary_key': (row[2] == 'PRIMARY'), 'unique': not bool(row[1])} if row[4] not in indexes: indexes[row[4]] = {'primary_key': False, 'unique': False} if row[2] == 'PRIMARY': indexes[row[4]]['primary_key'] = True if not bool(row[1]): indexes[row[4]]['unique'] = True return indexes def get_constraints(self, cursor, table_name): Loading django/db/backends/mysql/schema.py +3 −0 Original line number Diff line number Diff line Loading @@ -21,3 +21,6 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): alter_string_set_null = 'MODIFY %(column)s %(type)s NULL;' alter_string_drop_null = 'MODIFY %(column)s %(type)s NOT NULL;' sql_create_pk = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)" sql_delete_pk = "ALTER TABLE %(table)s DROP PRIMARY KEY" django/db/backends/schema.py +35 −10 Original line number Diff line number Diff line Loading @@ -24,8 +24,6 @@ class BaseDatabaseSchemaEditor(object): - Repointing of FKs - Repointing of M2Ms - Check constraints (PosIntField) - PK changing - db_index on alter field """ # Overrideable SQL templates Loading Loading @@ -57,8 +55,8 @@ class BaseDatabaseSchemaEditor(object): sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s;" sql_delete_index = "DROP INDEX %(name)s" sql_create_pk = "ALTER TABLE %(table)s ADD CONSTRAINT %(constraint)s PRIMARY KEY (%(columns)s)" sql_delete_pk = "ALTER TABLE %(table)s DROP CONSTRAINT %(constraint)s" sql_create_pk = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)" sql_delete_pk = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s" def __init__(self, connection): self.connection = connection Loading Loading @@ -135,9 +133,10 @@ class BaseDatabaseSchemaEditor(object): elif field.unique: sql += " UNIQUE" # If we were told to include a default value, do so if include_default: default_value = self.effective_default(field) if include_default and default_value is not None: sql += " DEFAULT %s" params += [self.effective_default(field)] params += [default_value] # Return the sql return sql, params Loading Loading @@ -491,6 +490,32 @@ class BaseDatabaseSchemaEditor(object): "extra": "", } ) # Changed to become primary key? # Note that we don't detect unsetting of a PK, as we assume another field # will always come along and replace it. if not old_field.primary_key and new_field.primary_key: # First, drop the old PK constraint_names = self._constraint_names(model, primary_key=True) if strict and len(constraint_names) != 1: raise ValueError("Found wrong number (%s) of PK constraints for %s" % ( len(constraint_names), model._meta.db_table, )) for constraint_name in constraint_names: self.execute( self.sql_delete_pk % { "table": self.quote_name(model._meta.db_table), "name": constraint_name, }, ) # Make the new one self.execute( self.sql_create_pk % { "table": self.quote_name(model._meta.db_table), "name": self._create_index_name(model, [new_field.column], suffix="_pk"), "columns": self.quote_name(new_field.column), } ) def _type_for_alter(self, field): """ Loading Loading @@ -518,16 +543,16 @@ class BaseDatabaseSchemaEditor(object): index_name = '%s%s' % (table_name[:(self.connection.features.max_index_name_length - len(part))], part) return index_name def _constraint_names(self, model, column_names, unique=None, primary_key=None, index=None): def _constraint_names(self, model, column_names=None, unique=None, primary_key=None, index=None): "Returns all constraint names matching the columns and conditions" column_names = set(column_names) column_names = set(column_names) if column_names else None constraints = self.connection.introspection.get_constraints(self.connection.cursor(), model._meta.db_table) result = [] for name, infodict in constraints.items(): if column_names == infodict['columns']: if column_names is None or column_names == infodict['columns']: if unique is not None and infodict['unique'] != unique: continue if primary_key is not None and infodict['primary_key'] != unique: if primary_key is not None and infodict['primary_key'] != primary_key: continue if index is not None and infodict['index'] != index: continue Loading tests/modeltests/schema/tests.py +34 −0 Original line number Diff line number Diff line Loading @@ -478,3 +478,37 @@ class SchemaTests(TestCase): "slug", connection.introspection.get_indexes(connection.cursor(), Book._meta.db_table), ) def test_primary_key(self): """ Tests altering of the primary key """ # Create the table editor = connection.schema_editor() editor.start() editor.create_model(Tag) editor.commit() # Ensure the table is there and has the right PK self.assertTrue( connection.introspection.get_indexes(connection.cursor(), Tag._meta.db_table)['id']['primary_key'], ) # Alter to change the PK new_field = SlugField(primary_key=True) new_field.set_attributes_from_name("slug") editor = connection.schema_editor() editor.start() editor.delete_field(Tag, Tag._meta.get_field_by_name("id")[0]) editor.alter_field( Tag, Tag._meta.get_field_by_name("slug")[0], new_field, ) editor.commit() # Ensure the PK changed self.assertNotIn( 'id', connection.introspection.get_indexes(connection.cursor(), Tag._meta.db_table), ) self.assertTrue( connection.introspection.get_indexes(connection.cursor(), Tag._meta.db_table)['slug']['primary_key'], ) Loading
django/db/backends/mysql/introspection.py +6 −1 Original line number Diff line number Diff line Loading @@ -99,7 +99,12 @@ class DatabaseIntrospection(BaseDatabaseIntrospection): for row in rows: if row[2] in multicol_indexes: continue indexes[row[4]] = {'primary_key': (row[2] == 'PRIMARY'), 'unique': not bool(row[1])} if row[4] not in indexes: indexes[row[4]] = {'primary_key': False, 'unique': False} if row[2] == 'PRIMARY': indexes[row[4]]['primary_key'] = True if not bool(row[1]): indexes[row[4]]['unique'] = True return indexes def get_constraints(self, cursor, table_name): Loading
django/db/backends/mysql/schema.py +3 −0 Original line number Diff line number Diff line Loading @@ -21,3 +21,6 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): alter_string_set_null = 'MODIFY %(column)s %(type)s NULL;' alter_string_drop_null = 'MODIFY %(column)s %(type)s NOT NULL;' sql_create_pk = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)" sql_delete_pk = "ALTER TABLE %(table)s DROP PRIMARY KEY"
django/db/backends/schema.py +35 −10 Original line number Diff line number Diff line Loading @@ -24,8 +24,6 @@ class BaseDatabaseSchemaEditor(object): - Repointing of FKs - Repointing of M2Ms - Check constraints (PosIntField) - PK changing - db_index on alter field """ # Overrideable SQL templates Loading Loading @@ -57,8 +55,8 @@ class BaseDatabaseSchemaEditor(object): sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s;" sql_delete_index = "DROP INDEX %(name)s" sql_create_pk = "ALTER TABLE %(table)s ADD CONSTRAINT %(constraint)s PRIMARY KEY (%(columns)s)" sql_delete_pk = "ALTER TABLE %(table)s DROP CONSTRAINT %(constraint)s" sql_create_pk = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)" sql_delete_pk = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s" def __init__(self, connection): self.connection = connection Loading Loading @@ -135,9 +133,10 @@ class BaseDatabaseSchemaEditor(object): elif field.unique: sql += " UNIQUE" # If we were told to include a default value, do so if include_default: default_value = self.effective_default(field) if include_default and default_value is not None: sql += " DEFAULT %s" params += [self.effective_default(field)] params += [default_value] # Return the sql return sql, params Loading Loading @@ -491,6 +490,32 @@ class BaseDatabaseSchemaEditor(object): "extra": "", } ) # Changed to become primary key? # Note that we don't detect unsetting of a PK, as we assume another field # will always come along and replace it. if not old_field.primary_key and new_field.primary_key: # First, drop the old PK constraint_names = self._constraint_names(model, primary_key=True) if strict and len(constraint_names) != 1: raise ValueError("Found wrong number (%s) of PK constraints for %s" % ( len(constraint_names), model._meta.db_table, )) for constraint_name in constraint_names: self.execute( self.sql_delete_pk % { "table": self.quote_name(model._meta.db_table), "name": constraint_name, }, ) # Make the new one self.execute( self.sql_create_pk % { "table": self.quote_name(model._meta.db_table), "name": self._create_index_name(model, [new_field.column], suffix="_pk"), "columns": self.quote_name(new_field.column), } ) def _type_for_alter(self, field): """ Loading Loading @@ -518,16 +543,16 @@ class BaseDatabaseSchemaEditor(object): index_name = '%s%s' % (table_name[:(self.connection.features.max_index_name_length - len(part))], part) return index_name def _constraint_names(self, model, column_names, unique=None, primary_key=None, index=None): def _constraint_names(self, model, column_names=None, unique=None, primary_key=None, index=None): "Returns all constraint names matching the columns and conditions" column_names = set(column_names) column_names = set(column_names) if column_names else None constraints = self.connection.introspection.get_constraints(self.connection.cursor(), model._meta.db_table) result = [] for name, infodict in constraints.items(): if column_names == infodict['columns']: if column_names is None or column_names == infodict['columns']: if unique is not None and infodict['unique'] != unique: continue if primary_key is not None and infodict['primary_key'] != unique: if primary_key is not None and infodict['primary_key'] != primary_key: continue if index is not None and infodict['index'] != index: continue Loading
tests/modeltests/schema/tests.py +34 −0 Original line number Diff line number Diff line Loading @@ -478,3 +478,37 @@ class SchemaTests(TestCase): "slug", connection.introspection.get_indexes(connection.cursor(), Book._meta.db_table), ) def test_primary_key(self): """ Tests altering of the primary key """ # Create the table editor = connection.schema_editor() editor.start() editor.create_model(Tag) editor.commit() # Ensure the table is there and has the right PK self.assertTrue( connection.introspection.get_indexes(connection.cursor(), Tag._meta.db_table)['id']['primary_key'], ) # Alter to change the PK new_field = SlugField(primary_key=True) new_field.set_attributes_from_name("slug") editor = connection.schema_editor() editor.start() editor.delete_field(Tag, Tag._meta.get_field_by_name("id")[0]) editor.alter_field( Tag, Tag._meta.get_field_by_name("slug")[0], new_field, ) editor.commit() # Ensure the PK changed self.assertNotIn( 'id', connection.introspection.get_indexes(connection.cursor(), Tag._meta.db_table), ) self.assertTrue( connection.introspection.get_indexes(connection.cursor(), Tag._meta.db_table)['slug']['primary_key'], )