Load durable jobs from disk on startup.
()
| 473 | |
| 474 | |
| 475 | def load_durable_jobs(): |
| 476 | """Load durable jobs from disk on startup.""" |
| 477 | if not DURABLE_PATH.exists(): |
| 478 | return |
| 479 | try: |
| 480 | jobs = json.loads(DURABLE_PATH.read_text()) |
| 481 | for j in jobs: |
| 482 | job = CronJob(**j) |
| 483 | err = validate_cron(job.cron) |
| 484 | if err: |
| 485 | print(f" \033[31m[cron] skipping invalid job {job.id}: {err}\033[0m") |
| 486 | continue |
| 487 | scheduled_jobs[job.id] = job |
| 488 | valid = [j for j in jobs if j["id"] in scheduled_jobs] |
| 489 | if valid: |
| 490 | print(f" \033[35m[cron] loaded {len(valid)} durable job(s)\033[0m") |
| 491 | except Exception: |
| 492 | pass |
| 493 | |
| 494 | |
| 495 | def schedule_job(cron: str, prompt: str, recurring: bool = True, |
no test coverage detected