(self, input: Input, request: Request)
| 20 | return False |
| 21 | |
| 22 | async def process(self, input: Input, request: Request) -> Output: |
| 23 | # Get timezone from input (do not set if not provided, we then rely on poll() to set it) |
| 24 | if timezone := input.get("timezone", None): |
| 25 | Localization.get().set_timezone(timezone) |
| 26 | |
| 27 | timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| 28 | printer = PrintStyle(font_color="green", padding=False) |
| 29 | printer.print(f"Scheduler tick - API: {timestamp}") |
| 30 | |
| 31 | # Get the task scheduler instance and print detailed debug info |
| 32 | scheduler = TaskScheduler.get() |
| 33 | await scheduler.reload() |
| 34 | |
| 35 | tasks = scheduler.get_tasks() |
| 36 | tasks_count = len(tasks) |
| 37 | |
| 38 | # Log information about the tasks |
| 39 | printer.print(f"Scheduler has {tasks_count} task(s)") |
| 40 | if tasks_count > 0: |
| 41 | for task in tasks: |
| 42 | printer.print(f"Task: {task.name} (UUID: {task.uuid}, State: {task.state})") |
| 43 | |
| 44 | # Run the scheduler tick |
| 45 | await scheduler.tick() |
| 46 | |
| 47 | # Get updated tasks after tick |
| 48 | serialized_tasks = scheduler.serialize_all_tasks() |
| 49 | |
| 50 | return { |
| 51 | "scheduler": "tick", |
| 52 | "timestamp": timestamp, |
| 53 | "tasks_count": tasks_count, |
| 54 | "tasks": serialized_tasks |
| 55 | } |
nothing calls this directly
no test coverage detected