| 122 | parameters: dict = field(default_factory=lambda: param_schema) |
| 123 | |
| 124 | async def call( |
| 125 | self, |
| 126 | context: ContextWrapper[AstrAgentContext], |
| 127 | code: str, |
| 128 | silent: bool = False, |
| 129 | timeout: int = 30, |
| 130 | ) -> ToolExecResult: |
| 131 | if permission_error := check_admin_permission(context, "Python execution"): |
| 132 | return permission_error |
| 133 | sb = get_local_booter() |
| 134 | effective_timeout = ( |
| 135 | min(timeout, context.tool_call_timeout) |
| 136 | if timeout > 0 |
| 137 | else context.tool_call_timeout |
| 138 | ) |
| 139 | try: |
| 140 | current_workspace_root = workspace_root( |
| 141 | context.context.event.unified_msg_origin |
| 142 | ) |
| 143 | current_workspace_root.mkdir(parents=True, exist_ok=True) |
| 144 | result = await sb.python.exec( |
| 145 | code, |
| 146 | timeout=effective_timeout, |
| 147 | silent=silent, |
| 148 | cwd=str(current_workspace_root), |
| 149 | ) |
| 150 | return await handle_result(result, context.context.event) |
| 151 | except Exception as e: |
| 152 | return f"Error executing code: {str(e)}" |