MCPcopy Index your code
hub / github.com/agent0ai/agent-zero / process

Method process

api/scheduler_tick.py:22–55  ·  view source on GitHub ↗
(self, input: Input, request: Request)

Source from the content-addressed store, hash-verified

20 return False
21
22 async def process(self, input: Input, request: Request) -> Output:
23 # Get timezone from input (do not set if not provided, we then rely on poll() to set it)
24 if timezone := input.get("timezone", None):
25 Localization.get().set_timezone(timezone)
26
27 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
28 printer = PrintStyle(font_color="green", padding=False)
29 printer.print(f"Scheduler tick - API: {timestamp}")
30
31 # Get the task scheduler instance and print detailed debug info
32 scheduler = TaskScheduler.get()
33 await scheduler.reload()
34
35 tasks = scheduler.get_tasks()
36 tasks_count = len(tasks)
37
38 # Log information about the tasks
39 printer.print(f"Scheduler has {tasks_count} task(s)")
40 if tasks_count > 0:
41 for task in tasks:
42 printer.print(f"Task: {task.name} (UUID: {task.uuid}, State: {task.state})")
43
44 # Run the scheduler tick
45 await scheduler.tick()
46
47 # Get updated tasks after tick
48 serialized_tasks = scheduler.serialize_all_tasks()
49
50 return {
51 "scheduler": "tick",
52 "timestamp": timestamp,
53 "tasks_count": tasks_count,
54 "tasks": serialized_tasks
55 }

Callers

nothing calls this directly

Calls 9

printMethod · 0.95
PrintStyleClass · 0.90
set_timezoneMethod · 0.80
tickMethod · 0.80
serialize_all_tasksMethod · 0.80
getMethod · 0.45
nowMethod · 0.45
reloadMethod · 0.45
get_tasksMethod · 0.45

Tested by

no test coverage detected