(self, config: dict, context: ExecutionContext)
| 185 | return "db_insert" |
| 186 | |
| 187 | async def execute(self, config: dict, context: ExecutionContext) -> ModuleResult: |
| 188 | connection_name = context.resolve_value(config.get('connectionName', 'default')) |
| 189 | table = context.resolve_value(config.get('table', '')) |
| 190 | data = config.get('data', {}) |
| 191 | variable_name = context.resolve_value(config.get('variableName', '')) |
| 192 | |
| 193 | connections = get_db_connections(context) |
| 194 | conn = connections.get(connection_name) |
| 195 | |
| 196 | if not conn: |
| 197 | return ModuleResult( |
| 198 | success=False, |
| 199 | error=f"数据库连接 '{connection_name}' 不存在,请先使用「连接数据库」模块" |
| 200 | ) |
| 201 | |
| 202 | if not table: |
| 203 | return ModuleResult(success=False, error="请指定表名") |
| 204 | |
| 205 | try: |
| 206 | # 解析数据 |
| 207 | if isinstance(data, str): |
| 208 | import json |
| 209 | data = json.loads(context.resolve_value(data)) |
| 210 | |
| 211 | # 解析数据中的变量 |
| 212 | resolved_data = {} |
| 213 | for key, value in data.items(): |
| 214 | if isinstance(value, str): |
| 215 | resolved_data[key] = context.resolve_value(value) |
| 216 | else: |
| 217 | resolved_data[key] = value |
| 218 | |
| 219 | if not resolved_data: |
| 220 | return ModuleResult(success=False, error="插入数据不能为空") |
| 221 | |
| 222 | # 构建INSERT语句 |
| 223 | columns = ', '.join(f'`{k}`' for k in resolved_data.keys()) |
| 224 | placeholders = ', '.join(['%s'] * len(resolved_data)) |
| 225 | sql = f"INSERT INTO `{table}` ({columns}) VALUES ({placeholders})" |
| 226 | |
| 227 | with conn.cursor() as cursor: |
| 228 | cursor.execute(sql, list(resolved_data.values())) |
| 229 | last_id = cursor.lastrowid |
| 230 | |
| 231 | # 保存插入ID到变量 |
| 232 | if variable_name: |
| 233 | context.variables[variable_name] = last_id |
| 234 | |
| 235 | return ModuleResult( |
| 236 | success=True, |
| 237 | message=f"插入成功,ID: {last_id}", |
| 238 | data={"lastInsertId": last_id} |
| 239 | ) |
| 240 | except Exception as e: |
| 241 | return ModuleResult(success=False, error=f"插入失败: {str(e)}") |
| 242 | |
| 243 | |
| 244 | @register_executor |
nothing calls this directly
no test coverage detected