MCPcopy
hub / github.com/agent0ai/agent-zero / process

Method process

api/scheduler_task_create.py:13–163  ·  view source on GitHub ↗

Create a new task in the scheduler

(self, input: Input, request: Request)

Source from the content-addressed store, hash-verified

11
12class SchedulerTaskCreate(ApiHandler):
13 async def process(self, input: Input, request: Request) -> Output:
14 """
15 Create a new task in the scheduler
16 """
17 printer = PrintStyle(italic=True, font_color="blue", padding=False)
18
19 # Get timezone from input (do not set if not provided, we then rely on poll() to set it)
20 if timezone := input.get("timezone", None):
21 Localization.get().set_timezone(timezone)
22
23 scheduler = TaskScheduler.get()
24 await scheduler.reload()
25
26 # Get common fields from input
27 name = input.get("name")
28 system_prompt = input.get("system_prompt", "")
29 prompt = input.get("prompt")
30 attachments = input.get("attachments", [])
31
32 requested_project_slug = input.get("project_name")
33 if isinstance(requested_project_slug, str):
34 requested_project_slug = requested_project_slug.strip() or None
35 else:
36 requested_project_slug = None
37
38 project_slug = requested_project_slug
39 project_color = None
40
41 if project_slug:
42 try:
43 metadata = load_basic_project_data(requested_project_slug)
44 project_color = metadata.get("color") or None
45 except Exception as exc:
46 printer.error(f"SchedulerTaskCreate: failed to load project '{project_slug}': {exc}")
47 return {"error": f"Saving project failed: {project_slug}"}
48
49 # Always dedicated context for scheduler tasks created by ui
50 task_context_id = None
51
52 # Check if schedule is provided (for ScheduledTask)
53 schedule = input.get("schedule", {})
54 token: str = input.get("token", "")
55
56 # Debug log the token value
57 printer.print(f"Token received from frontend: '{token}' (type: {type(token)}, length: {len(token) if token else 0})")
58
59 # Generate a random token if empty or not provided
60 if not token:
61 token = str(random.randint(1000000000000000000, 9999999999999999999))
62 printer.print(f"Generated new token: '{token}'")
63
64 plan = input.get("plan", {})
65
66 # Validate required fields
67 if not name or not prompt:
68 # return {"error": "Missing required fields: name, system_prompt, prompt"}
69 raise ValueError("Missing required fields: name, system_prompt, prompt")
70

Callers

nothing calls this directly

Calls 15

errorMethod · 0.95
printMethod · 0.95
PrintStyleClass · 0.90
load_basic_project_dataFunction · 0.90
TaskScheduleClass · 0.90
parse_task_scheduleFunction · 0.90
parse_task_planFunction · 0.90
serialize_taskFunction · 0.90
typeFunction · 0.85
set_timezoneMethod · 0.80
getMethod · 0.45
reloadMethod · 0.45

Tested by

no test coverage detected