Perform POST request to agent. Args: path: URL path (e.g., /sensor/start) data: Optional JSON body Returns: Parsed JSON response Raises: AgentHTTPError: On HTTP errors AgentConnectionError: If
(self, path: str, data: dict | None = None, timeout: float | None = None)
| 97 | raise AgentHTTPError(f"Request failed: {e}") |
| 98 | |
| 99 | def _post(self, path: str, data: dict | None = None, timeout: float | None = None) -> dict: |
| 100 | """ |
| 101 | Perform POST request to agent. |
| 102 | |
| 103 | Args: |
| 104 | path: URL path (e.g., /sensor/start) |
| 105 | data: Optional JSON body |
| 106 | |
| 107 | Returns: |
| 108 | Parsed JSON response |
| 109 | |
| 110 | Raises: |
| 111 | AgentHTTPError: On HTTP errors |
| 112 | AgentConnectionError: If agent is unreachable |
| 113 | """ |
| 114 | url = f"{self.base_url}{path}" |
| 115 | request_timeout = self.timeout if timeout is None else timeout |
| 116 | try: |
| 117 | response = requests.post( |
| 118 | url, |
| 119 | json=data or {}, |
| 120 | headers=self._headers(), |
| 121 | timeout=request_timeout |
| 122 | ) |
| 123 | response.raise_for_status() |
| 124 | return response.json() if response.content else {} |
| 125 | except requests.ConnectionError as e: |
| 126 | raise AgentConnectionError(f"Cannot connect to agent at {self.base_url}: {e}") |
| 127 | except requests.Timeout: |
| 128 | raise AgentConnectionError(f"Request to agent timed out after {request_timeout}s") |
| 129 | except requests.HTTPError as e: |
| 130 | # Try to extract error message from response body |
| 131 | error_msg = f"Agent returned error: {e.response.status_code}" |
| 132 | try: |
| 133 | error_data = e.response.json() |
| 134 | if 'message' in error_data: |
| 135 | error_msg = error_data['message'] |
| 136 | elif 'error' in error_data: |
| 137 | error_msg = error_data['error'] |
| 138 | except Exception: |
| 139 | pass |
| 140 | raise AgentHTTPError(error_msg, status_code=e.response.status_code) |
| 141 | except requests.RequestException as e: |
| 142 | raise AgentHTTPError(f"Request failed: {e}") |
| 143 | |
| 144 | def post(self, path: str, data: dict | None = None, timeout: float | None = None) -> dict: |
| 145 | """Public POST method for arbitrary endpoints.""" |
no test coverage detected