Create a Task from a background callback's initial dispatch response.
(dispatch_response: dict[str, Any], callback)
| 37 | |
| 38 | |
| 39 | def create_task(dispatch_response: dict[str, Any], callback) -> CreateTaskResult: |
| 40 | """Create a Task from a background callback's initial dispatch response.""" |
| 41 | cache_key = dispatch_response["cacheKey"] |
| 42 | job_id = str(dispatch_response["job"]) |
| 43 | now = datetime.now(timezone.utc) |
| 44 | # taskId encodes creation time so subsequent polls return a stable |
| 45 | # createdAt without server-side storage (works across gunicorn workers). |
| 46 | task_id = f"{callback.tool_name}:{job_id}:{cache_key}:{int(now.timestamp())}" |
| 47 | # pylint: disable-next=protected-access |
| 48 | interval = callback._cb_info.get("background", {}).get("interval", 1000) |
| 49 | return CreateTaskResult( |
| 50 | task=Task( |
| 51 | taskId=task_id, |
| 52 | status="working", |
| 53 | createdAt=now, |
| 54 | lastUpdatedAt=now, |
| 55 | ttl=None, |
| 56 | pollInterval=interval, |
| 57 | ), |
| 58 | ) |
| 59 | |
| 60 | |
| 61 | def get_task(task_id: str) -> GetTaskResult: |