MCPcopy
hub / github.com/agent0ai/agent-zero / update_task

Method update_task

tools/scheduler.py:251–302  ·  view source on GitHub ↗
(self, **kwargs)

Source from the content-addressed store, hash-verified

249 return Response(message=f"Task failed to delete: {task_uuid}", break_loop=False)
250
251 async def update_task(self, **kwargs) -> Response:
252 task_uuid: str = kwargs.get("uuid", "")
253 if not task_uuid:
254 return Response(message="Task UUID is required", break_loop=False)
255
256 scheduler = TaskScheduler.get()
257 await scheduler.reload()
258 task: ScheduledTask | AdHocTask | PlannedTask | None = scheduler.get_task_by_uuid(task_uuid)
259 if not task:
260 return Response(message=f"Task not found: {task_uuid}", break_loop=False)
261
262 update_params: dict[str, Any] = {}
263 for field in ("name", "system_prompt", "prompt", "attachments"):
264 if field in kwargs:
265 update_params[field] = kwargs[field]
266
267 if "state" in kwargs:
268 update_params["state"] = TaskState(kwargs.get("state", TaskState.IDLE))
269
270 if "dedicated_context" in kwargs:
271 dedicated_context = bool(kwargs.get("dedicated_context"))
272 update_params["context_id"] = task.uuid if dedicated_context else self.agent.context.id
273
274 try:
275 timezone = _schedule_timezone(kwargs)
276 if isinstance(task, ScheduledTask) and ("schedule" in kwargs or timezone):
277 task_schedule = _task_schedule_from_input(
278 kwargs.get("schedule") or serialize_task(task).get("schedule") or {},
279 timezone=timezone,
280 )
281 if err := _validate_task_schedule(task_schedule):
282 return Response(message=err, break_loop=False)
283 update_params["schedule"] = task_schedule
284 except ValueError as exc:
285 return Response(message=str(exc), break_loop=False)
286
287 if isinstance(task, ScheduledTask) and "schedule" in update_params:
288 task_schedule = update_params["schedule"]
289 if err := _validate_task_schedule(task_schedule):
290 return Response(message=err, break_loop=False)
291 elif isinstance(task, PlannedTask) and "plan" in kwargs:
292 task_plan, err = _task_plan_from_input(kwargs.get("plan") or [])
293 if err:
294 return Response(message=err, break_loop=False)
295 update_params["plan"] = task_plan
296
297 updated_task = await scheduler.update_task(task_uuid, **update_params)
298 await scheduler.save()
299 if not updated_task:
300 return Response(message=f"Task failed to update: {task_uuid}", break_loop=False)
301
302 return Response(message=json.dumps(serialize_task(updated_task), indent=4), break_loop=False)
303
304 async def create_scheduled_task(self, **kwargs) -> Response:
305 # "name": "XXX",

Callers 4

executeMethod · 0.95
delete_taskMethod · 0.45
processMethod · 0.45
processMethod · 0.45

Calls 11

ResponseClass · 0.90
TaskStateClass · 0.90
serialize_taskFunction · 0.90
_schedule_timezoneFunction · 0.85
_validate_task_scheduleFunction · 0.85
_task_plan_from_inputFunction · 0.85
getMethod · 0.45
reloadMethod · 0.45
get_task_by_uuidMethod · 0.45
saveMethod · 0.45

Tested by

no test coverage detected