Provision a fresh non-default schema with the full migration tree, then swap the memory engine's tenant extension so all subsequent operations resolve to it. Drops the schema on teardown.
(memory: MemoryEngine, pg0_db_url)
| 1016 | |
| 1017 | @pytest_asyncio.fixture |
| 1018 | async def isolated_schema(memory: MemoryEngine, pg0_db_url): |
| 1019 | """Provision a fresh non-default schema with the full migration tree, then |
| 1020 | swap the memory engine's tenant extension so all subsequent operations |
| 1021 | resolve to it. Drops the schema on teardown. |
| 1022 | """ |
| 1023 | import asyncpg |
| 1024 | |
| 1025 | from hindsight_api.migrations import run_migrations |
| 1026 | |
| 1027 | schema_name = f"tenant_wh_iso_{uuid.uuid4().hex[:8]}" |
| 1028 | |
| 1029 | # Run migrations to provision the schema with all tables (webhooks, banks, |
| 1030 | # async_operations, ...). This is the same path a real multi-tenant |
| 1031 | # extension would take to provision a new tenant schema. |
| 1032 | run_migrations(pg0_db_url, schema=schema_name) |
| 1033 | |
| 1034 | original_ext = memory._tenant_extension |
| 1035 | memory._tenant_extension = _NonDefaultSchemaTenantExtension(schema_name) |
| 1036 | |
| 1037 | try: |
| 1038 | yield schema_name |
| 1039 | finally: |
| 1040 | memory._tenant_extension = original_ext |
| 1041 | # Drop the test schema. Use a dedicated connection so we don't depend |
| 1042 | # on the pool's state. |
| 1043 | conn = await asyncpg.connect(pg0_db_url) |
| 1044 | try: |
| 1045 | await conn.execute(f'DROP SCHEMA IF EXISTS "{schema_name}" CASCADE') |
| 1046 | finally: |
| 1047 | await conn.close() |
| 1048 | |
| 1049 | |
| 1050 | class TestWebhookSchemaIsolation: |
nothing calls this directly
no test coverage detected