MCPcopy Index your code
hub / github.com/shareAI-lab/learn-claude-code / TaskManager

Class TaskManager

agents/s_full.py:262–324  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

260
261# === SECTION: file_tasks (s07) ===
262class TaskManager:
263 def __init__(self):
264 TASKS_DIR.mkdir(exist_ok=True)
265
266 def _next_id(self) -> int:
267 ids = [int(f.stem.split("_")[1]) for f in TASKS_DIR.glob("task_*.json")]
268 return max(ids, default=0) + 1
269
270 def _load(self, tid: int) -> dict:
271 p = TASKS_DIR / f"task_{tid}.json"
272 if not p.exists(): raise ValueError(f"Task {tid} not found")
273 return json.loads(p.read_text())
274
275 def _save(self, task: dict):
276 (TASKS_DIR / f"task_{task['id']}.json").write_text(json.dumps(task, indent=2))
277
278 def create(self, subject: str, description: str = "") -> str:
279 task = {"id": self._next_id(), "subject": subject, "description": description,
280 "status": "pending", "owner": None, "blockedBy": []}
281 self._save(task)
282 return json.dumps(task, indent=2)
283
284 def get(self, tid: int) -> str:
285 return json.dumps(self._load(tid), indent=2)
286
287 def update(self, tid: int, status: str = None,
288 add_blocked_by: list = None, remove_blocked_by: list = None) -> str:
289 task = self._load(tid)
290 if status:
291 task["status"] = status
292 if status == "completed":
293 for f in TASKS_DIR.glob("task_*.json"):
294 t = json.loads(f.read_text())
295 if tid in t.get("blockedBy", []):
296 t["blockedBy"].remove(tid)
297 self._save(t)
298 if status == "deleted":
299 (TASKS_DIR / f"task_{tid}.json").unlink(missing_ok=True)
300 return f"Task {tid} deleted"
301 if add_blocked_by:
302 task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by))
303 if remove_blocked_by:
304 task["blockedBy"] = [x for x in task["blockedBy"] if x not in remove_blocked_by]
305 self._save(task)
306 return json.dumps(task, indent=2)
307
308 def list_all(self) -> str:
309 tasks = [json.loads(f.read_text()) for f in sorted(TASKS_DIR.glob("task_*.json"))]
310 if not tasks: return "No tasks."
311 lines = []
312 for t in tasks:
313 m = {"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get(t["status"], "[?]")
314 owner = f" @{t['owner']}" if t.get("owner") else ""
315 blocked = f" (blocked by: {t['blockedBy']})" if t.get("blockedBy") else ""
316 lines.append(f"{m} #{t['id']}: {t['subject']}{owner}{blocked}")
317 return "\n".join(lines)
318
319 def claim(self, tid: int, owner: str) -> str:

Callers 1

s_full.pyFile · 0.70

Calls

no outgoing calls

Tested by

no test coverage detected