MCPcopy Index your code
hub / github.com/shareAI-lab/learn-claude-code / BackgroundManager

Class BackgroundManager

agents/s_full.py:328–360  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

326
327# === SECTION: background (s08) ===
328class BackgroundManager:
329 def __init__(self):
330 self.tasks = {}
331 self.notifications = Queue()
332
333 def run(self, command: str, timeout: int = 120) -> str:
334 tid = str(uuid.uuid4())[:8]
335 self.tasks[tid] = {"status": "running", "command": command, "result": None}
336 threading.Thread(target=self._exec, args=(tid, command, timeout), daemon=True).start()
337 return f"Background task {tid} started: {command[:80]}"
338
339 def _exec(self, tid: str, command: str, timeout: int):
340 try:
341 r = subprocess.run(command, shell=True, cwd=WORKDIR,
342 capture_output=True, text=True, timeout=timeout)
343 output = (r.stdout + r.stderr).strip()[:50000]
344 self.tasks[tid].update({"status": "completed", "result": output or "(no output)"})
345 except Exception as e:
346 self.tasks[tid].update({"status": "error", "result": str(e)})
347 self.notifications.put({"task_id": tid, "status": self.tasks[tid]["status"],
348 "result": self.tasks[tid]["result"][:500]})
349
350 def check(self, tid: str = None) -> str:
351 if tid:
352 t = self.tasks.get(tid)
353 return f"[{t['status']}] {t.get('result') or '(running)'}" if t else f"Unknown: {tid}"
354 return "\n".join(f"{k}: [{v['status']}] {v['command'][:60]}" for k, v in self.tasks.items()) or "No bg tasks."
355
356 def drain(self) -> list:
357 notifs = []
358 while not self.notifications.empty():
359 notifs.append(self.notifications.get_nowait())
360 return notifs
361
362
363# === SECTION: messaging (s09) ===

Callers 1

s_full.pyFile · 0.70

Calls

no outgoing calls

Tested by

no test coverage detected