Perform a blocking ``POST`` request with a JSON payload. Args: url: Fully qualified URL that accepts the payload. payload: Dictionary that will be serialized and sent as JSON. Returns: Parsed JSON body as a dictionary if the request succeeds; oth
(self, url: str, payload: Dict[str, Any])
| 196 | return None |
| 197 | |
| 198 | def _post_json(self, url: str, payload: Dict[str, Any]) -> Optional[Dict[str, Any]]: |
| 199 | """Perform a blocking ``POST`` request with a JSON payload. |
| 200 | |
| 201 | Args: |
| 202 | url: Fully qualified URL that accepts the payload. |
| 203 | payload: Dictionary that will be serialized and sent as JSON. |
| 204 | |
| 205 | Returns: |
| 206 | Parsed JSON body as a dictionary if the request succeeds; otherwise ``None``. |
| 207 | """ |
| 208 | try: |
| 209 | response = requests.post(url, json=payload, timeout=self.timeout, headers=self._default_headers) |
| 210 | response.raise_for_status() |
| 211 | return response.json() |
| 212 | except requests.exceptions.RequestException as e: |
| 213 | logger.debug(f"Sync POST request failed for {url}: {e}") |
| 214 | return None |
| 215 | |
| 216 | def poll_next_task(self) -> Optional[Task]: |
| 217 | """Poll the server synchronously until a task becomes available. |
no test coverage detected