Move or rename a workspace file/folder while respecting edit locks.
(
db: AsyncSession,
*,
agent_id: uuid.UUID,
base_dir: Path,
source_path: str,
destination_path: str,
actor_type: str,
actor_id: uuid.UUID | None,
session_id: str | None = None,
enforce_human_lock: bool = True,
overwrite: bool = False,
expected_source_version_token: str | None = None,
expected_destination_version_token: str | None = None,
)
| 420 | |
| 421 | |
| 422 | async def move_workspace_path( |
| 423 | db: AsyncSession, |
| 424 | *, |
| 425 | agent_id: uuid.UUID, |
| 426 | base_dir: Path, |
| 427 | source_path: str, |
| 428 | destination_path: str, |
| 429 | actor_type: str, |
| 430 | actor_id: uuid.UUID | None, |
| 431 | session_id: str | None = None, |
| 432 | enforce_human_lock: bool = True, |
| 433 | overwrite: bool = False, |
| 434 | expected_source_version_token: str | None = None, |
| 435 | expected_destination_version_token: str | None = None, |
| 436 | ) -> WorkspaceWriteResult: |
| 437 | """Move or rename a workspace file/folder while respecting edit locks.""" |
| 438 | source_normalized = normalize_workspace_path(source_path) |
| 439 | destination_normalized = normalize_workspace_path(destination_path) |
| 440 | if not source_normalized: |
| 441 | return WorkspaceWriteResult(False, source_normalized, "Missing source path") |
| 442 | if not destination_normalized: |
| 443 | return WorkspaceWriteResult(False, destination_normalized, "Missing destination path") |
| 444 | if source_normalized in {"tasks.json", "soul.md"}: |
| 445 | return WorkspaceWriteResult(False, source_normalized, f"{source_normalized} cannot be moved (protected)") |
| 446 | |
| 447 | storage = get_storage_backend() |
| 448 | source_key = normalize_storage_key(f"{agent_id}/{source_normalized}") |
| 449 | source_exists = await storage.exists(source_key) |
| 450 | source_is_dir = await storage.is_dir(source_key) |
| 451 | if not source_exists and not source_is_dir: |
| 452 | return WorkspaceWriteResult(False, source_normalized, f"File not found: {source_normalized}") |
| 453 | |
| 454 | destination_key = normalize_storage_key(f"{agent_id}/{destination_normalized}") |
| 455 | destination_is_dir = await storage.is_dir(destination_key) |
| 456 | if destination_path.replace("\\", "/").strip().endswith("/") or destination_is_dir: |
| 457 | destination_normalized = normalize_workspace_path(f"{destination_normalized}/{Path(source_normalized).name}") |
| 458 | destination_key = normalize_storage_key(f"{agent_id}/{destination_normalized}") |
| 459 | |
| 460 | if source_normalized == destination_normalized: |
| 461 | return WorkspaceWriteResult(False, source_normalized, "Source and destination are the same") |
| 462 | if source_is_dir and (destination_normalized == source_normalized or destination_normalized.startswith(source_normalized + "/")): |
| 463 | return WorkspaceWriteResult(False, source_normalized, "Cannot move a folder into itself") |
| 464 | |
| 465 | if enforce_human_lock and actor_type != "user": |
| 466 | for locked_path in (source_normalized, destination_normalized): |
| 467 | lock = await get_active_lock(db, agent_id=agent_id, path=locked_path) |
| 468 | if lock: |
| 469 | return WorkspaceWriteResult( |
| 470 | False, |
| 471 | locked_path, |
| 472 | ( |
| 473 | f"Human is currently editing {locked_path}. Do not move it now. " |
| 474 | "Ask the user to finish editing, or choose another path." |
| 475 | ), |
| 476 | locked_by_user_id=str(lock.user_id), |
| 477 | ) |
| 478 | |
| 479 | destination_exists = await storage.exists(destination_key) |
no test coverage detected