Run migrations for configured apps.
(
*,
config: dict[str, Any] | TortoiseConfig | None = None,
config_file: str | None = None,
app_labels: Sequence[str] | None = None,
target: str | None = None,
fake: bool = False,
dry_run: bool = False,
direction: str = "both",
reporter: Callable[[str, list[PlanStep], bool, bool], object] | None = None,
progress: Callable[[str, str, str], object] | None = None,
)
| 11 | |
| 12 | |
| 13 | async def migrate( |
| 14 | *, |
| 15 | config: dict[str, Any] | TortoiseConfig | None = None, |
| 16 | config_file: str | None = None, |
| 17 | app_labels: Sequence[str] | None = None, |
| 18 | target: str | None = None, |
| 19 | fake: bool = False, |
| 20 | dry_run: bool = False, |
| 21 | direction: str = "both", |
| 22 | reporter: Callable[[str, list[PlanStep], bool, bool], object] | None = None, |
| 23 | progress: Callable[[str, str, str], object] | None = None, |
| 24 | ) -> None: |
| 25 | """Run migrations for configured apps.""" |
| 26 | if isinstance(config, TortoiseConfig): |
| 27 | config = config.to_dict() |
| 28 | if config_file: |
| 29 | config = Tortoise._get_config_from_config_file(config_file) |
| 30 | if not config: |
| 31 | raise ValueError("migrate requires a config or config_file") |
| 32 | |
| 33 | await Tortoise.init(config=config, init_connections=False) |
| 34 | |
| 35 | configured_apps = config.get("apps", {}) |
| 36 | selected_apps = list(app_labels) if app_labels else list(configured_apps.keys()) |
| 37 | for label in selected_apps: |
| 38 | if label not in configured_apps: |
| 39 | raise ValueError(f"Unknown app label {label}") |
| 40 | |
| 41 | apps_config = {label: configured_apps[label] for label in selected_apps} |
| 42 | apps_by_connection: dict[str, dict[str, dict[str, Any]]] = {} |
| 43 | for label, app_config in apps_config.items(): |
| 44 | connection_name = app_config.get("default_connection", "default") |
| 45 | apps_by_connection.setdefault(connection_name, {})[label] = app_config |
| 46 | |
| 47 | targets = _parse_targets(target, selected_apps) |
| 48 | for connection_name, subset in apps_by_connection.items(): |
| 49 | connection = get_connection(connection_name) |
| 50 | executor = MigrationExecutor(connection, subset) |
| 51 | executor_targets = [t for t in targets if t.app_label in subset] |
| 52 | if reporter is not None: |
| 53 | plan = await executor.plan(executor_targets if executor_targets else None) |
| 54 | result = reporter(connection_name, plan, fake, dry_run) |
| 55 | if inspect.isawaitable(result): |
| 56 | await result |
| 57 | await executor.migrate( |
| 58 | executor_targets if executor_targets else None, |
| 59 | fake=fake, |
| 60 | dry_run=dry_run, |
| 61 | direction=direction, |
| 62 | progress=progress, |
| 63 | ) |
| 64 | |
| 65 | |
| 66 | def _parse_targets(target: str | None, app_labels: Sequence[str]) -> list[MigrationTarget]: |
searching dependent graphs…