(self, name: str, task_id: int = None, base_ref: str = "HEAD")
| 282 | ) |
| 283 | |
| 284 | def create(self, name: str, task_id: int = None, base_ref: str = "HEAD") -> str: |
| 285 | self._validate_name(name) |
| 286 | if self._find(name): |
| 287 | raise ValueError(f"Worktree '{name}' already exists in index") |
| 288 | if task_id is not None and not self.tasks.exists(task_id): |
| 289 | raise ValueError(f"Task {task_id} not found") |
| 290 | |
| 291 | path = self.dir / name |
| 292 | branch = f"wt/{name}" |
| 293 | self.events.emit( |
| 294 | "worktree.create.before", |
| 295 | task={"id": task_id} if task_id is not None else {}, |
| 296 | worktree={"name": name, "base_ref": base_ref}, |
| 297 | ) |
| 298 | try: |
| 299 | self._run_git(["worktree", "add", "-b", branch, str(path), base_ref]) |
| 300 | |
| 301 | entry = { |
| 302 | "name": name, |
| 303 | "path": str(path), |
| 304 | "branch": branch, |
| 305 | "task_id": task_id, |
| 306 | "status": "active", |
| 307 | "created_at": time.time(), |
| 308 | } |
| 309 | |
| 310 | idx = self._load_index() |
| 311 | idx["worktrees"].append(entry) |
| 312 | self._save_index(idx) |
| 313 | |
| 314 | if task_id is not None: |
| 315 | self.tasks.bind_worktree(task_id, name) |
| 316 | |
| 317 | self.events.emit( |
| 318 | "worktree.create.after", |
| 319 | task={"id": task_id} if task_id is not None else {}, |
| 320 | worktree={ |
| 321 | "name": name, |
| 322 | "path": str(path), |
| 323 | "branch": branch, |
| 324 | "status": "active", |
| 325 | }, |
| 326 | ) |
| 327 | return json.dumps(entry, indent=2) |
| 328 | except Exception as e: |
| 329 | self.events.emit( |
| 330 | "worktree.create.failed", |
| 331 | task={"id": task_id} if task_id is not None else {}, |
| 332 | worktree={"name": name, "base_ref": base_ref}, |
| 333 | error=str(e), |
| 334 | ) |
| 335 | raise |
| 336 | |
| 337 | def list_all(self) -> str: |
| 338 | idx = self._load_index() |
nothing calls this directly
no test coverage detected