Register a new cron job. Returns CronJob or error string.
(cron: str, prompt: str, recurring: bool = True,
durable: bool = True)
| 493 | |
| 494 | |
| 495 | def schedule_job(cron: str, prompt: str, recurring: bool = True, |
| 496 | durable: bool = True) -> CronJob | str: |
| 497 | """Register a new cron job. Returns CronJob or error string.""" |
| 498 | err = validate_cron(cron) |
| 499 | if err: |
| 500 | return err |
| 501 | job = CronJob( |
| 502 | id=f"cron_{random.randint(0, 999999):06d}", |
| 503 | cron=cron, prompt=prompt, |
| 504 | recurring=recurring, durable=durable, |
| 505 | ) |
| 506 | with cron_lock: |
| 507 | scheduled_jobs[job.id] = job |
| 508 | if durable: |
| 509 | save_durable_jobs() |
| 510 | print(f" \033[35m[cron register] {job.id} '{cron}' → {prompt[:40]}\033[0m") |
| 511 | return job |
| 512 | |
| 513 | |
| 514 | def cancel_job(job_id: str) -> str: |
no test coverage detected