Loading django/core/management/commands/loaddata.py +166 −140 Original line number Diff line number Diff line from __future__ import unicode_literals import os import glob import gzip import os import zipfile from optparse import make_option import warnings Loading @@ -12,9 +13,9 @@ from django.core.management.base import BaseCommand, CommandError from django.core.management.color import no_style from django.db import (connections, router, transaction, DEFAULT_DB_ALIAS, IntegrityError, DatabaseError) from django.db.models import get_apps from django.db.models import get_app_paths from django.utils.encoding import force_text from django.utils._os import upath from django.utils.functional import cached_property, memoize from itertools import product try: Loading Loading @@ -44,9 +45,8 @@ class Command(BaseCommand): if not len(fixture_labels): raise CommandError( "No database fixture specified. Please provide the path of at " "least one fixture in the command line." ) "No database fixture specified. Please provide the path " "of at least one fixture in the command line.") self.verbosity = int(options.get('verbosity')) Loading @@ -69,37 +69,18 @@ class Command(BaseCommand): self.fixture_object_count = 0 self.models = set() class SingleZipReader(zipfile.ZipFile): def __init__(self, *args, **kwargs): zipfile.ZipFile.__init__(self, *args, **kwargs) if settings.DEBUG: assert len(self.namelist()) == 1, "Zip-compressed fixtures must contain only one file." def read(self): return zipfile.ZipFile.read(self, self.namelist()[0]) self.compression_types = { self.serialization_formats = serializers.get_public_serializer_formats() self.compression_formats = { None: open, 'gz': gzip.GzipFile, 'zip': SingleZipReader } if has_bz2: self.compression_types['bz2'] = bz2.BZ2File app_module_paths = [] for app in get_apps(): if hasattr(app, '__path__'): # It's a 'models/' subpackage for path in app.__path__: app_module_paths.append(upath(path)) else: # It's a models.py module app_module_paths.append(upath(app.__file__)) app_fixtures = [os.path.join(os.path.dirname(path), 'fixtures') for path in app_module_paths] self.compression_formats['bz2'] = bz2.BZ2File with connection.constraint_checks_disabled(): for fixture_label in fixture_labels: self.load_label(fixture_label, app_fixtures) self.load_label(fixture_label) # Since we disabled constraint checks, we must manually check for # any invalid keys that might have been added Loading @@ -124,97 +105,30 @@ class Command(BaseCommand): if self.verbosity >= 1: if self.fixture_object_count == self.loaded_object_count: self.stdout.write("Installed %d object(s) from %d fixture(s)" % ( self.loaded_object_count, self.fixture_count)) else: self.stdout.write("Installed %d object(s) (of %d) from %d fixture(s)" % ( self.loaded_object_count, self.fixture_object_count, self.fixture_count)) def load_label(self, fixture_label, app_fixtures): parts = fixture_label.split('.') if len(parts) > 1 and parts[-1] in self.compression_types: compression_formats = [parts[-1]] parts = parts[:-1] else: compression_formats = self.compression_types.keys() if len(parts) == 1: fixture_name = parts[0] formats = serializers.get_public_serializer_formats() else: fixture_name, format = '.'.join(parts[:-1]), parts[-1] if format in serializers.get_public_serializer_formats(): formats = [format] else: formats = [] if formats: if self.verbosity >= 2: self.stdout.write("Loading '%s' fixtures..." % fixture_name) else: raise CommandError( "Problem installing fixture '%s': %s is not a known serialization format." % (fixture_name, format)) if os.path.isabs(fixture_name): fixture_dirs = [fixture_name] else: fixture_dirs = app_fixtures + list(settings.FIXTURE_DIRS) + [''] label_found = False for fixture_dir in fixture_dirs: found = self.process_dir(fixture_dir, fixture_name, compression_formats, formats) label_found = label_found or found if fixture_name != 'initial_data' and not label_found: warnings.warn("No fixture named '%s' found." % fixture_name) def process_dir(self, fixture_dir, fixture_name, compression_formats, serialization_formats): humanize = lambda dirname: "'%s'" % dirname if dirname else 'absolute path' if self.verbosity >= 2: self.stdout.write("Checking %s for fixtures..." % humanize(fixture_dir)) label_found = False for combo in product([self.using, None], serialization_formats, compression_formats): database, format, compression_format = combo file_name = '.'.join( p for p in [ fixture_name, database, format, compression_format ] if p ) if self.verbosity >= 3: self.stdout.write("Trying %s for %s fixture '%s'..." % \ (humanize(fixture_dir), file_name, fixture_name)) full_path = os.path.join(fixture_dir, file_name) open_method = self.compression_types[compression_format] try: fixture = open_method(full_path, 'r') except IOError: if self.verbosity >= 2: self.stdout.write("No %s fixture '%s' in %s." % \ (format, fixture_name, humanize(fixture_dir))) self.stdout.write("Installed %d object(s) from %d fixture(s)" % (self.loaded_object_count, self.fixture_count)) else: self.stdout.write("Installed %d object(s) (of %d) from %d fixture(s)" % (self.loaded_object_count, self.fixture_object_count, self.fixture_count)) def load_label(self, fixture_label): """ Loads fixtures files for a given label. """ for fixture_file, fixture_dir, fixture_name in self.find_fixtures(fixture_label): _, ser_fmt, cmp_fmt = self.parse_name(os.path.basename(fixture_file)) open_method = self.compression_formats[cmp_fmt] fixture = open_method(fixture_file, 'r') try: if label_found: raise CommandError("Multiple fixtures named '%s' in %s. Aborting." % (fixture_name, humanize(fixture_dir))) self.fixture_count += 1 objects_in_fixture = 0 loaded_objects_in_fixture = 0 if self.verbosity >= 2: self.stdout.write("Installing %s fixture '%s' from %s." % \ (format, fixture_name, humanize(fixture_dir))) self.stdout.write("Installing %s fixture '%s' from %s." % (ser_fmt, fixture_name, humanize(fixture_dir))) objects = serializers.deserialize(format, fixture, using=self.using, ignorenonexistent=self.ignore) objects = serializers.deserialize(ser_fmt, fixture, using=self.using, ignorenonexistent=self.ignore) for obj in objects: objects_in_fixture += 1 Loading @@ -234,10 +148,9 @@ class Command(BaseCommand): self.loaded_object_count += loaded_objects_in_fixture self.fixture_object_count += objects_in_fixture label_found = True except Exception as e: if not isinstance(e, CommandError): e.args = ("Problem installing fixture '%s': %s" % (full_path, e),) e.args = ("Problem installing fixture '%s': %s" % (fixture_file, e),) raise finally: fixture.close() Loading @@ -246,7 +159,120 @@ class Command(BaseCommand): # error was encountered during fixture loading. if objects_in_fixture == 0: raise CommandError( "No fixture data found for '%s'. (File format may be invalid.)" % (fixture_name)) "No fixture data found for '%s'. " "(File format may be invalid.)" % fixture_name) def _find_fixtures(self, fixture_label): """ Finds fixture files for a given label. """ fixture_name, ser_fmt, cmp_fmt = self.parse_name(fixture_label) databases = [self.using, None] cmp_fmts = list(self.compression_formats.keys()) if cmp_fmt is None else [cmp_fmt] ser_fmts = serializers.get_public_serializer_formats() if ser_fmt is None else [ser_fmt] # Check kept for backwards-compatibility; it doesn't look very useful. if '.' in fixture_name: raise CommandError( "Problem installing fixture '%s': %s is not a known " "serialization format." % tuple(fixture_name.rsplit('.'))) if self.verbosity >= 2: self.stdout.write("Loading '%s' fixtures..." % fixture_name) if os.path.isabs(fixture_name): fixture_dirs = [os.path.dirname(fixture_name)] fixture_name = os.path.basename(fixture_name) else: fixture_dirs = self.fixture_dirs suffixes = ('.'.join(ext for ext in combo if ext) for combo in product(databases, ser_fmts, cmp_fmts)) targets = set('.'.join((fixture_name, suffix)) for suffix in suffixes) fixture_files = [] for fixture_dir in fixture_dirs: if self.verbosity >= 2: self.stdout.write("Checking %s for fixtures..." % humanize(fixture_dir)) fixture_files_in_dir = [] for candidate in glob.iglob(os.path.join(fixture_dir, fixture_name + '*')): if os.path.basename(candidate) in targets: # Save the fixture_dir and fixture_name for future error messages. fixture_files_in_dir.append((candidate, fixture_dir, fixture_name)) if self.verbosity >= 2 and not fixture_files_in_dir: self.stdout.write("No fixture '%s' in %s." % (fixture_name, humanize(fixture_dir))) # Check kept for backwards-compatibility; it isn't clear why # duplicates are only allowed in different directories. if len(fixture_files_in_dir) > 1: raise CommandError( "Multiple fixtures named '%s' in %s. Aborting." % (fixture_name, humanize(fixture_dir))) fixture_files.extend(fixture_files_in_dir) if fixture_name != 'initial_data' and not fixture_files: # Warning kept for backwards-compatibility; why not an exception? warnings.warn("No fixture named '%s' found." % fixture_name) return fixture_files _label_to_fixtures_cache = {} find_fixtures = memoize(_find_fixtures, _label_to_fixtures_cache, 2) @cached_property def fixture_dirs(self): """ Return a list of fixture directories. The list contains the 'fixtures' subdirectory of each installed application, if it exists, the directories in FIXTURE_DIRS, and the current directory. """ dirs = [] for path in get_app_paths(): d = os.path.join(os.path.dirname(path), 'fixtures') if os.path.isdir(d): dirs.append(d) dirs.extend(list(settings.FIXTURE_DIRS)) dirs.append('') dirs = [os.path.abspath(os.path.realpath(d)) for d in dirs] return dirs def parse_name(self, fixture_name): """ Splits fixture name in name, serialization format, compression format. """ parts = fixture_name.rsplit('.', 2) if len(parts) > 1 and parts[-1] in self.compression_formats: cmp_fmt = parts[-1] parts = parts[:-1] else: cmp_fmt = None if len(parts) > 1 and parts[-1] in self.serialization_formats: ser_fmt = parts[-1] parts = parts[:-1] else: ser_fmt = None name = '.'.join(parts) return name, ser_fmt, cmp_fmt class SingleZipReader(zipfile.ZipFile): def __init__(self, *args, **kwargs): zipfile.ZipFile.__init__(self, *args, **kwargs) if len(self.namelist()) != 1: raise ValueError("Zip-compressed fixtures must contain one file.") def read(self): return zipfile.ZipFile.read(self, self.namelist()[0]) return label_found def humanize(dirname): return "'%s'" % dirname if dirname else 'absolute path' tests/fixtures_regress/tests.py +1 −1 Original line number Diff line number Diff line Loading @@ -450,7 +450,7 @@ class TestFixtures(TestCase): commit=False, stdout=stdout_output, ) self.assertTrue("No xml fixture 'this_fixture_doesnt_exist' in" in self.assertTrue("No fixture 'this_fixture_doesnt_exist' in" in force_text(stdout_output.getvalue())) Loading Loading
django/core/management/commands/loaddata.py +166 −140 Original line number Diff line number Diff line from __future__ import unicode_literals import os import glob import gzip import os import zipfile from optparse import make_option import warnings Loading @@ -12,9 +13,9 @@ from django.core.management.base import BaseCommand, CommandError from django.core.management.color import no_style from django.db import (connections, router, transaction, DEFAULT_DB_ALIAS, IntegrityError, DatabaseError) from django.db.models import get_apps from django.db.models import get_app_paths from django.utils.encoding import force_text from django.utils._os import upath from django.utils.functional import cached_property, memoize from itertools import product try: Loading Loading @@ -44,9 +45,8 @@ class Command(BaseCommand): if not len(fixture_labels): raise CommandError( "No database fixture specified. Please provide the path of at " "least one fixture in the command line." ) "No database fixture specified. Please provide the path " "of at least one fixture in the command line.") self.verbosity = int(options.get('verbosity')) Loading @@ -69,37 +69,18 @@ class Command(BaseCommand): self.fixture_object_count = 0 self.models = set() class SingleZipReader(zipfile.ZipFile): def __init__(self, *args, **kwargs): zipfile.ZipFile.__init__(self, *args, **kwargs) if settings.DEBUG: assert len(self.namelist()) == 1, "Zip-compressed fixtures must contain only one file." def read(self): return zipfile.ZipFile.read(self, self.namelist()[0]) self.compression_types = { self.serialization_formats = serializers.get_public_serializer_formats() self.compression_formats = { None: open, 'gz': gzip.GzipFile, 'zip': SingleZipReader } if has_bz2: self.compression_types['bz2'] = bz2.BZ2File app_module_paths = [] for app in get_apps(): if hasattr(app, '__path__'): # It's a 'models/' subpackage for path in app.__path__: app_module_paths.append(upath(path)) else: # It's a models.py module app_module_paths.append(upath(app.__file__)) app_fixtures = [os.path.join(os.path.dirname(path), 'fixtures') for path in app_module_paths] self.compression_formats['bz2'] = bz2.BZ2File with connection.constraint_checks_disabled(): for fixture_label in fixture_labels: self.load_label(fixture_label, app_fixtures) self.load_label(fixture_label) # Since we disabled constraint checks, we must manually check for # any invalid keys that might have been added Loading @@ -124,97 +105,30 @@ class Command(BaseCommand): if self.verbosity >= 1: if self.fixture_object_count == self.loaded_object_count: self.stdout.write("Installed %d object(s) from %d fixture(s)" % ( self.loaded_object_count, self.fixture_count)) else: self.stdout.write("Installed %d object(s) (of %d) from %d fixture(s)" % ( self.loaded_object_count, self.fixture_object_count, self.fixture_count)) def load_label(self, fixture_label, app_fixtures): parts = fixture_label.split('.') if len(parts) > 1 and parts[-1] in self.compression_types: compression_formats = [parts[-1]] parts = parts[:-1] else: compression_formats = self.compression_types.keys() if len(parts) == 1: fixture_name = parts[0] formats = serializers.get_public_serializer_formats() else: fixture_name, format = '.'.join(parts[:-1]), parts[-1] if format in serializers.get_public_serializer_formats(): formats = [format] else: formats = [] if formats: if self.verbosity >= 2: self.stdout.write("Loading '%s' fixtures..." % fixture_name) else: raise CommandError( "Problem installing fixture '%s': %s is not a known serialization format." % (fixture_name, format)) if os.path.isabs(fixture_name): fixture_dirs = [fixture_name] else: fixture_dirs = app_fixtures + list(settings.FIXTURE_DIRS) + [''] label_found = False for fixture_dir in fixture_dirs: found = self.process_dir(fixture_dir, fixture_name, compression_formats, formats) label_found = label_found or found if fixture_name != 'initial_data' and not label_found: warnings.warn("No fixture named '%s' found." % fixture_name) def process_dir(self, fixture_dir, fixture_name, compression_formats, serialization_formats): humanize = lambda dirname: "'%s'" % dirname if dirname else 'absolute path' if self.verbosity >= 2: self.stdout.write("Checking %s for fixtures..." % humanize(fixture_dir)) label_found = False for combo in product([self.using, None], serialization_formats, compression_formats): database, format, compression_format = combo file_name = '.'.join( p for p in [ fixture_name, database, format, compression_format ] if p ) if self.verbosity >= 3: self.stdout.write("Trying %s for %s fixture '%s'..." % \ (humanize(fixture_dir), file_name, fixture_name)) full_path = os.path.join(fixture_dir, file_name) open_method = self.compression_types[compression_format] try: fixture = open_method(full_path, 'r') except IOError: if self.verbosity >= 2: self.stdout.write("No %s fixture '%s' in %s." % \ (format, fixture_name, humanize(fixture_dir))) self.stdout.write("Installed %d object(s) from %d fixture(s)" % (self.loaded_object_count, self.fixture_count)) else: self.stdout.write("Installed %d object(s) (of %d) from %d fixture(s)" % (self.loaded_object_count, self.fixture_object_count, self.fixture_count)) def load_label(self, fixture_label): """ Loads fixtures files for a given label. """ for fixture_file, fixture_dir, fixture_name in self.find_fixtures(fixture_label): _, ser_fmt, cmp_fmt = self.parse_name(os.path.basename(fixture_file)) open_method = self.compression_formats[cmp_fmt] fixture = open_method(fixture_file, 'r') try: if label_found: raise CommandError("Multiple fixtures named '%s' in %s. Aborting." % (fixture_name, humanize(fixture_dir))) self.fixture_count += 1 objects_in_fixture = 0 loaded_objects_in_fixture = 0 if self.verbosity >= 2: self.stdout.write("Installing %s fixture '%s' from %s." % \ (format, fixture_name, humanize(fixture_dir))) self.stdout.write("Installing %s fixture '%s' from %s." % (ser_fmt, fixture_name, humanize(fixture_dir))) objects = serializers.deserialize(format, fixture, using=self.using, ignorenonexistent=self.ignore) objects = serializers.deserialize(ser_fmt, fixture, using=self.using, ignorenonexistent=self.ignore) for obj in objects: objects_in_fixture += 1 Loading @@ -234,10 +148,9 @@ class Command(BaseCommand): self.loaded_object_count += loaded_objects_in_fixture self.fixture_object_count += objects_in_fixture label_found = True except Exception as e: if not isinstance(e, CommandError): e.args = ("Problem installing fixture '%s': %s" % (full_path, e),) e.args = ("Problem installing fixture '%s': %s" % (fixture_file, e),) raise finally: fixture.close() Loading @@ -246,7 +159,120 @@ class Command(BaseCommand): # error was encountered during fixture loading. if objects_in_fixture == 0: raise CommandError( "No fixture data found for '%s'. (File format may be invalid.)" % (fixture_name)) "No fixture data found for '%s'. " "(File format may be invalid.)" % fixture_name) def _find_fixtures(self, fixture_label): """ Finds fixture files for a given label. """ fixture_name, ser_fmt, cmp_fmt = self.parse_name(fixture_label) databases = [self.using, None] cmp_fmts = list(self.compression_formats.keys()) if cmp_fmt is None else [cmp_fmt] ser_fmts = serializers.get_public_serializer_formats() if ser_fmt is None else [ser_fmt] # Check kept for backwards-compatibility; it doesn't look very useful. if '.' in fixture_name: raise CommandError( "Problem installing fixture '%s': %s is not a known " "serialization format." % tuple(fixture_name.rsplit('.'))) if self.verbosity >= 2: self.stdout.write("Loading '%s' fixtures..." % fixture_name) if os.path.isabs(fixture_name): fixture_dirs = [os.path.dirname(fixture_name)] fixture_name = os.path.basename(fixture_name) else: fixture_dirs = self.fixture_dirs suffixes = ('.'.join(ext for ext in combo if ext) for combo in product(databases, ser_fmts, cmp_fmts)) targets = set('.'.join((fixture_name, suffix)) for suffix in suffixes) fixture_files = [] for fixture_dir in fixture_dirs: if self.verbosity >= 2: self.stdout.write("Checking %s for fixtures..." % humanize(fixture_dir)) fixture_files_in_dir = [] for candidate in glob.iglob(os.path.join(fixture_dir, fixture_name + '*')): if os.path.basename(candidate) in targets: # Save the fixture_dir and fixture_name for future error messages. fixture_files_in_dir.append((candidate, fixture_dir, fixture_name)) if self.verbosity >= 2 and not fixture_files_in_dir: self.stdout.write("No fixture '%s' in %s." % (fixture_name, humanize(fixture_dir))) # Check kept for backwards-compatibility; it isn't clear why # duplicates are only allowed in different directories. if len(fixture_files_in_dir) > 1: raise CommandError( "Multiple fixtures named '%s' in %s. Aborting." % (fixture_name, humanize(fixture_dir))) fixture_files.extend(fixture_files_in_dir) if fixture_name != 'initial_data' and not fixture_files: # Warning kept for backwards-compatibility; why not an exception? warnings.warn("No fixture named '%s' found." % fixture_name) return fixture_files _label_to_fixtures_cache = {} find_fixtures = memoize(_find_fixtures, _label_to_fixtures_cache, 2) @cached_property def fixture_dirs(self): """ Return a list of fixture directories. The list contains the 'fixtures' subdirectory of each installed application, if it exists, the directories in FIXTURE_DIRS, and the current directory. """ dirs = [] for path in get_app_paths(): d = os.path.join(os.path.dirname(path), 'fixtures') if os.path.isdir(d): dirs.append(d) dirs.extend(list(settings.FIXTURE_DIRS)) dirs.append('') dirs = [os.path.abspath(os.path.realpath(d)) for d in dirs] return dirs def parse_name(self, fixture_name): """ Splits fixture name in name, serialization format, compression format. """ parts = fixture_name.rsplit('.', 2) if len(parts) > 1 and parts[-1] in self.compression_formats: cmp_fmt = parts[-1] parts = parts[:-1] else: cmp_fmt = None if len(parts) > 1 and parts[-1] in self.serialization_formats: ser_fmt = parts[-1] parts = parts[:-1] else: ser_fmt = None name = '.'.join(parts) return name, ser_fmt, cmp_fmt class SingleZipReader(zipfile.ZipFile): def __init__(self, *args, **kwargs): zipfile.ZipFile.__init__(self, *args, **kwargs) if len(self.namelist()) != 1: raise ValueError("Zip-compressed fixtures must contain one file.") def read(self): return zipfile.ZipFile.read(self, self.namelist()[0]) return label_found def humanize(dirname): return "'%s'" % dirname if dirname else 'absolute path'
tests/fixtures_regress/tests.py +1 −1 Original line number Diff line number Diff line Loading @@ -450,7 +450,7 @@ class TestFixtures(TestCase): commit=False, stdout=stdout_output, ) self.assertTrue("No xml fixture 'this_fixture_doesnt_exist' in" in self.assertTrue("No fixture 'this_fixture_doesnt_exist' in" in force_text(stdout_output.getvalue())) Loading