Run database migrations to the latest version using programmatic Alembic configuration. This function is safe to call from multiple distributed workers simultaneously: - Uses PostgreSQL advisory lock to ensure only one worker runs migrations at a time - Other workers wait for the l
(
database_url: str,
script_location: str | None = None,
schema: str | None = None,
migration_database_url: str | None = None,
)
| 234 | |
| 235 | |
| 236 | def run_migrations( |
| 237 | database_url: str, |
| 238 | script_location: str | None = None, |
| 239 | schema: str | None = None, |
| 240 | migration_database_url: str | None = None, |
| 241 | ) -> None: |
| 242 | """ |
| 243 | Run database migrations to the latest version using programmatic Alembic configuration. |
| 244 | |
| 245 | This function is safe to call from multiple distributed workers simultaneously: |
| 246 | - Uses PostgreSQL advisory lock to ensure only one worker runs migrations at a time |
| 247 | - Other workers wait for the lock, then verify migrations are complete |
| 248 | - If schema is already up-to-date, this is a fast no-op |
| 249 | |
| 250 | Supports multi-tenant schema isolation: when a schema is specified, migrations |
| 251 | run in that schema instead of public. This allows tenant extensions to provision |
| 252 | new tenant schemas with their own isolated tables. |
| 253 | |
| 254 | Args: |
| 255 | database_url: SQLAlchemy database URL (e.g., "postgresql://user:pass@host/db") |
| 256 | script_location: Path to alembic migrations directory (e.g., "/path/to/alembic"). |
| 257 | If None, defaults to hindsight-api/alembic directory. |
| 258 | schema: Target PostgreSQL schema name. If None, uses default (public). |
| 259 | When specified, creates the schema if needed and runs migrations there. |
| 260 | |
| 261 | Raises: |
| 262 | RuntimeError: If migrations fail to complete |
| 263 | FileNotFoundError: If script_location doesn't exist |
| 264 | |
| 265 | Example: |
| 266 | # Using default location and public schema |
| 267 | run_migrations("postgresql://user:pass@host/db") |
| 268 | |
| 269 | # Run migrations for a specific tenant schema |
| 270 | run_migrations("postgresql://user:pass@host/db", schema="tenant_acme") |
| 271 | |
| 272 | # Using custom location (when importing from another project) |
| 273 | run_migrations( |
| 274 | "postgresql://user:pass@host/db", |
| 275 | script_location="/path/to/copied/_alembic" |
| 276 | ) |
| 277 | """ |
| 278 | # Prefer a dedicated migration URL that bypasses connection poolers (e.g. |
| 279 | # PgBouncer in transaction mode). Session-level advisory locks don't |
| 280 | # survive a PgBouncer transaction-mode cycle, so the distributed lock is |
| 281 | # ineffective when the app URL goes through a pooler. Configure |
| 282 | # HINDSIGHT_API_MIGRATION_DATABASE_URL to the direct PostgreSQL endpoint |
| 283 | # (e.g. hindsight-pg-rw) to restore correct locking behaviour. |
| 284 | raw_url = migration_database_url or database_url |
| 285 | # Oracle URLs are passed through to SQLAlchemy unchanged; only PG URLs |
| 286 | # need the libpq normalization (asyncpg → psycopg2 driver, ssl → sslmode). |
| 287 | migration_url = raw_url if is_oracle_url(raw_url) else to_libpq_url(raw_url) |
| 288 | |
| 289 | try: |
| 290 | # Determine script location |
| 291 | if script_location is None: |
| 292 | # Default: use the alembic directory inside the hindsight_api package |
| 293 | # This file is in: hindsight_api/migrations.py |