MCPcopy
hub / github.com/tortoise/tortoise-orm / MigrationExecutor

Class MigrationExecutor

tortoise/migrations/executor.py:39–364  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

37
38
39class MigrationExecutor:
40 def __init__(self, connection: BaseDBAsyncClient, apps_config: dict[str, dict]) -> None:
41 self.connection = connection
42 self.recorder = MigrationRecorder(connection)
43 self.loader = MigrationLoader(apps_config, self.recorder, load=False)
44 self._full_plan_cache: list[MigrationKey] | None = None
45 self._logger = logging.getLogger(__name__)
46
47 async def migrate(
48 self,
49 targets: Iterable[MigrationTarget] | None = None,
50 *,
51 fake: bool = False,
52 dry_run: bool = False,
53 direction: str = "both",
54 progress: Callable[[str, str, str], object] | None = None,
55 ) -> None:
56 self._logger.debug("Building migration graph")
57 await self.loader.build_graph()
58
59 # Create a non-atomic schema editor for recorder operations only
60 recorder_schema_editor = self._schema_editor(atomic=False)
61 self._logger.debug("Ensuring migration schema")
62 await self.recorder.ensure_schema(recorder_schema_editor)
63
64 self._logger.debug("Loading applied migrations")
65 applied = set(await self.recorder.applied_migrations())
66
67 self._logger.debug("Building migration plan")
68 plan = self._migration_plan(targets, applied, self.loader.graph)
69 self._validate_plan_direction(plan, direction)
70
71 state_cache_by_key: dict[MigrationKey, State] | None = None
72 if any(step.backward for step in plan):
73 self._logger.debug("Building rollback state cache")
74 state_cache_by_key = await self._project_state_cache(applied)
75
76 state_cache: State | None = None
77 for step in plan:
78 key = MigrationKey(app_label=step.migration.app_label, name=step.migration.name)
79 if step.backward:
80 if state_cache_by_key is not None:
81 state_before = state_cache_by_key[key]
82 else:
83 state_before = await self._project_state(applied, upto=key)
84 if not fake:
85 self._emit(progress, "rollback_start", key)
86 schema_editor = self._schema_editor(atomic=step.migration.atomic)
87 if schema_editor.atomic_migration:
88 async with in_transaction(self.connection.connection_name) as txn_client:
89 schema_editor.client = txn_client
90 await step.migration.unapply(
91 state_before, dry_run=dry_run, schema_editor=schema_editor
92 )
93 else:
94 await step.migration.unapply(
95 state_before, dry_run=dry_run, schema_editor=schema_editor
96 )

Calls

no outgoing calls

Used in the wild real call sites across dependent graphs

searching dependent graphs…