(self)
| 31 | return self.apps_config.get(app_label, {}).get("migrations") |
| 32 | |
| 33 | def load_disk(self) -> None: |
| 34 | self.disk_migrations = {} |
| 35 | self.unmigrated_apps = set() |
| 36 | self.migrated_apps = set() |
| 37 | for app_label in self.apps_config: |
| 38 | module_name = self.migrations_module(app_label) |
| 39 | if not module_name: |
| 40 | self.unmigrated_apps.add(app_label) |
| 41 | continue |
| 42 | |
| 43 | was_loaded = module_name in sys.modules |
| 44 | try: |
| 45 | module = import_module(module_name) |
| 46 | except ModuleNotFoundError: |
| 47 | raise |
| 48 | else: |
| 49 | if not hasattr(module, "__path__"): |
| 50 | self.unmigrated_apps.add(app_label) |
| 51 | continue |
| 52 | if was_loaded: |
| 53 | reload(module) |
| 54 | |
| 55 | self.migrated_apps.add(app_label) |
| 56 | migration_names = [ |
| 57 | name |
| 58 | for _, name, is_pkg in pkgutil.iter_modules(module.__path__) |
| 59 | if not is_pkg and name[0] not in "_~" |
| 60 | ] |
| 61 | for migration_name in migration_names: |
| 62 | migration_path = f"{module_name}.{migration_name}" |
| 63 | migration_module = import_module(migration_path) |
| 64 | if not hasattr(migration_module, "Migration"): |
| 65 | raise ValueError( |
| 66 | f"Migration {migration_name} in app {app_label} has no Migration class" |
| 67 | ) |
| 68 | migration_cls = migration_module.Migration |
| 69 | migration_obj = migration_cls(migration_name, app_label) |
| 70 | key = MigrationKey(app_label=app_label, name=migration_name) |
| 71 | self.disk_migrations[key] = migration_obj |
| 72 | |
| 73 | async def build_graph(self) -> None: |
| 74 | self.load_disk() |
no test coverage detected