Poll the server synchronously until a task becomes available. Returns: The next [`Task`][agentlightning.Task] available for execution, or ``None`` if polling fails.
(self)
| 214 | return None |
| 215 | |
| 216 | def poll_next_task(self) -> Optional[Task]: |
| 217 | """Poll the server synchronously until a task becomes available. |
| 218 | |
| 219 | Returns: |
| 220 | The next [`Task`][agentlightning.Task] available for execution, or |
| 221 | ``None`` if polling fails. |
| 222 | """ |
| 223 | url = urllib.parse.urljoin(self.endpoint, self._next_task_uri) |
| 224 | while True: |
| 225 | response = self._request_json(url) |
| 226 | if response: |
| 227 | task_if_any = TaskIfAny.model_validate(response) |
| 228 | if task_if_any.is_available and task_if_any.task: |
| 229 | self.task_count += 1 |
| 230 | logger.info(f"[Task {self.task_count} Received] ID: {task_if_any.task.rollout_id}") |
| 231 | return task_if_any.task |
| 232 | logger.debug(f"No task available yet. Retrying in {self.poll_interval} seconds...") |
| 233 | time.sleep(self.poll_interval) |
| 234 | |
| 235 | def get_resources_by_id(self, resource_id: str) -> Optional[ResourcesUpdate]: |
| 236 | """Fetch a specific resource bundle by identifier. |
nothing calls this directly
no test coverage detected