(config, scope="module")
| 88 | |
| 89 | |
| 90 | def init_tests(config, scope="module"): |
| 91 | @pytest_asyncio.fixture(autouse=True, scope=scope) |
| 92 | async def create_database(): |
| 93 | |
| 94 | # Drop and create tables in a single connection to avoid event loop issues |
| 95 | async with config.engine.begin() as conn: |
| 96 | |
| 97 | def setup_tables(connection): # pragma: no cover |
| 98 | validate_cross_schema_constraints( |
| 99 | config.metadata, connection.dialect.name |
| 100 | ) |
| 101 | drop_tables(connection, config) |
| 102 | drop_schemas(connection, config) |
| 103 | create_schemas(connection, config) |
| 104 | config.metadata.create_all(connection) |
| 105 | |
| 106 | await conn.run_sync(setup_tables) |
| 107 | |
| 108 | # For PostgreSQL and MySQL, recreate engine to avoid event loop conflicts |
| 109 | # asyncpg and aiomysql are strict about event loops |
| 110 | if config.engine.dialect.name in ("postgresql", "mysql"): # pragma: no cover |
| 111 | await config.engine.dispose() |
| 112 | config._original_engine = config.engine |
| 113 | config.engine = create_async_engine(ASYNC_DATABASE_URL) |
| 114 | |
| 115 | yield |
| 116 | |
| 117 | # Restore the original engine if it was swapped |
| 118 | if hasattr(config, "_original_engine"): # pragma: no cover |
| 119 | await config.engine.dispose() |
| 120 | config.engine = config._original_engine |
| 121 | delattr(config, "_original_engine") |
| 122 | |
| 123 | async with config.engine.begin() as conn: |
| 124 | |
| 125 | def teardown_tables(connection): # pragma: no cover |
| 126 | drop_tables(connection, config) |
| 127 | drop_schemas(connection, config) |
| 128 | |
| 129 | await conn.run_sync(teardown_tables) |
| 130 | |
| 131 | await config.engine.dispose() |
| 132 | |
| 133 | return create_database |
no outgoing calls
no test coverage detected