MCPcopy
hub / github.com/shareAI-lab/learn-claude-code / load_durable_jobs

Function load_durable_jobs

s15_agent_teams/code.py:475–492  ·  view source on GitHub ↗

Load durable jobs from disk on startup.

()

Source from the content-addressed store, hash-verified

473
474
475def load_durable_jobs():
476 """Load durable jobs from disk on startup."""
477 if not DURABLE_PATH.exists():
478 return
479 try:
480 jobs = json.loads(DURABLE_PATH.read_text())
481 for j in jobs:
482 job = CronJob(**j)
483 err = validate_cron(job.cron)
484 if err:
485 print(f" \033[31m[cron] skipping invalid job {job.id}: {err}\033[0m")
486 continue
487 scheduled_jobs[job.id] = job
488 valid = [j for j in jobs if j["id"] in scheduled_jobs]
489 if valid:
490 print(f" \033[35m[cron] loaded {len(valid)} durable job(s)\033[0m")
491 except Exception:
492 pass
493
494
495def schedule_job(cron: str, prompt: str, recurring: bool = True,

Callers 1

code.pyFile · 0.70

Calls 3

existsMethod · 0.80
CronJobClass · 0.70
validate_cronFunction · 0.70

Tested by

no test coverage detected