| 37 | |
| 38 | |
| 39 | class MigrationExecutor: |
| 40 | def __init__(self, connection: BaseDBAsyncClient, apps_config: dict[str, dict]) -> None: |
| 41 | self.connection = connection |
| 42 | self.recorder = MigrationRecorder(connection) |
| 43 | self.loader = MigrationLoader(apps_config, self.recorder, load=False) |
| 44 | self._full_plan_cache: list[MigrationKey] | None = None |
| 45 | self._logger = logging.getLogger(__name__) |
| 46 | |
| 47 | async def migrate( |
| 48 | self, |
| 49 | targets: Iterable[MigrationTarget] | None = None, |
| 50 | *, |
| 51 | fake: bool = False, |
| 52 | dry_run: bool = False, |
| 53 | direction: str = "both", |
| 54 | progress: Callable[[str, str, str], object] | None = None, |
| 55 | ) -> None: |
| 56 | self._logger.debug("Building migration graph") |
| 57 | await self.loader.build_graph() |
| 58 | |
| 59 | # Create a non-atomic schema editor for recorder operations only |
| 60 | recorder_schema_editor = self._schema_editor(atomic=False) |
| 61 | self._logger.debug("Ensuring migration schema") |
| 62 | await self.recorder.ensure_schema(recorder_schema_editor) |
| 63 | |
| 64 | self._logger.debug("Loading applied migrations") |
| 65 | applied = set(await self.recorder.applied_migrations()) |
| 66 | |
| 67 | self._logger.debug("Building migration plan") |
| 68 | plan = self._migration_plan(targets, applied, self.loader.graph) |
| 69 | self._validate_plan_direction(plan, direction) |
| 70 | |
| 71 | state_cache_by_key: dict[MigrationKey, State] | None = None |
| 72 | if any(step.backward for step in plan): |
| 73 | self._logger.debug("Building rollback state cache") |
| 74 | state_cache_by_key = await self._project_state_cache(applied) |
| 75 | |
| 76 | state_cache: State | None = None |
| 77 | for step in plan: |
| 78 | key = MigrationKey(app_label=step.migration.app_label, name=step.migration.name) |
| 79 | if step.backward: |
| 80 | if state_cache_by_key is not None: |
| 81 | state_before = state_cache_by_key[key] |
| 82 | else: |
| 83 | state_before = await self._project_state(applied, upto=key) |
| 84 | if not fake: |
| 85 | self._emit(progress, "rollback_start", key) |
| 86 | schema_editor = self._schema_editor(atomic=step.migration.atomic) |
| 87 | if schema_editor.atomic_migration: |
| 88 | async with in_transaction(self.connection.connection_name) as txn_client: |
| 89 | schema_editor.client = txn_client |
| 90 | await step.migration.unapply( |
| 91 | state_before, dry_run=dry_run, schema_editor=schema_editor |
| 92 | ) |
| 93 | else: |
| 94 | await step.migration.unapply( |
| 95 | state_before, dry_run=dry_run, schema_editor=schema_editor |
| 96 | ) |
no outgoing calls
searching dependent graphs…