(self, config: dict, context: ExecutionContext)
| 27 | return "db_connect" |
| 28 | |
| 29 | async def execute(self, config: dict, context: ExecutionContext) -> ModuleResult: |
| 30 | import pymysql |
| 31 | from pymysql.cursors import DictCursor |
| 32 | host = context.resolve_value(config.get('host', 'localhost')) |
| 33 | port = to_int(config.get('port', 3306), 3306, context) |
| 34 | user = context.resolve_value(config.get('user', 'root')) |
| 35 | password = context.resolve_value(config.get('password', '')) |
| 36 | database = context.resolve_value(config.get('database', '')) |
| 37 | charset = context.resolve_value(config.get('charset', 'utf8mb4')) |
| 38 | connection_name = context.resolve_value(config.get('connectionName', 'default')) |
| 39 | |
| 40 | connections = get_db_connections(context) |
| 41 | |
| 42 | try: |
| 43 | # 如果已有同名连接,先关闭 |
| 44 | if connection_name in connections: |
| 45 | try: |
| 46 | connections[connection_name].close() |
| 47 | except Exception: |
| 48 | pass |
| 49 | del connections[connection_name] |
| 50 | |
| 51 | # 创建新连接 |
| 52 | conn = pymysql.connect( |
| 53 | host=host, |
| 54 | port=port, |
| 55 | user=user, |
| 56 | password=password, |
| 57 | database=database if database else None, |
| 58 | charset=charset, |
| 59 | cursorclass=DictCursor, |
| 60 | autocommit=True |
| 61 | ) |
| 62 | |
| 63 | connections[connection_name] = conn |
| 64 | |
| 65 | db_info = f"{host}:{port}" |
| 66 | if database: |
| 67 | db_info += f"/{database}" |
| 68 | |
| 69 | return ModuleResult( |
| 70 | success=True, |
| 71 | message=f"成功连接到数据库 {db_info}", |
| 72 | data={"connectionName": connection_name} |
| 73 | ) |
| 74 | except Exception as e: |
| 75 | return ModuleResult(success=False, error=f"连接数据库失败: {str(e)}") |
| 76 | |
| 77 | |
| 78 | @register_executor |
nothing calls this directly
no test coverage detected