检查数据库连接
(self)
| 68 | return True |
| 69 | |
| 70 | def check_database_connection(self) -> bool: |
| 71 | """检查数据库连接""" |
| 72 | logger.info("检查数据库连接...") |
| 73 | |
| 74 | def build_async_url() -> str: |
| 75 | dialect = (settings.DB_DIALECT or "mysql").lower() |
| 76 | if dialect == "postgresql": |
| 77 | return f"postgresql+asyncpg://{settings.DB_USER}:{quote_plus(settings.DB_PASSWORD)}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}" |
| 78 | # 默认使用 mysql 异步驱动 asyncmy |
| 79 | return ( |
| 80 | f"mysql+asyncmy://{settings.DB_USER}:{quote_plus(settings.DB_PASSWORD)}" |
| 81 | f"@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}?charset={settings.DB_CHARSET}" |
| 82 | ) |
| 83 | |
| 84 | async def _test_connection(db_url: str) -> None: |
| 85 | engine: AsyncEngine = create_async_engine(db_url, pool_pre_ping=True) |
| 86 | try: |
| 87 | async with engine.connect() as conn: |
| 88 | await conn.execute(text("SELECT 1")) |
| 89 | finally: |
| 90 | await engine.dispose() |
| 91 | |
| 92 | try: |
| 93 | db_url: str = build_async_url() |
| 94 | asyncio.run(_test_connection(db_url)) |
| 95 | logger.info("数据库连接正常") |
| 96 | return True |
| 97 | except Exception as e: |
| 98 | logger.exception(f"数据库连接失败: {e}") |
| 99 | return False |
| 100 | |
| 101 | def check_database_tables(self) -> bool: |
| 102 | """检查数据库表是否存在""" |
no test coverage detected