MCPcopy
hub / github.com/dataelement/Clawith / move_workspace_path

Function move_workspace_path

backend/app/services/workspace_collaboration.py:422–573  ·  view source on GitHub ↗

Move or rename a workspace file/folder while respecting edit locks.

(
    db: AsyncSession,
    *,
    agent_id: uuid.UUID,
    base_dir: Path,
    source_path: str,
    destination_path: str,
    actor_type: str,
    actor_id: uuid.UUID | None,
    session_id: str | None = None,
    enforce_human_lock: bool = True,
    overwrite: bool = False,
    expected_source_version_token: str | None = None,
    expected_destination_version_token: str | None = None,
)

Source from the content-addressed store, hash-verified

420
421
422async def move_workspace_path(
423 db: AsyncSession,
424 *,
425 agent_id: uuid.UUID,
426 base_dir: Path,
427 source_path: str,
428 destination_path: str,
429 actor_type: str,
430 actor_id: uuid.UUID | None,
431 session_id: str | None = None,
432 enforce_human_lock: bool = True,
433 overwrite: bool = False,
434 expected_source_version_token: str | None = None,
435 expected_destination_version_token: str | None = None,
436) -> WorkspaceWriteResult:
437 """Move or rename a workspace file/folder while respecting edit locks."""
438 source_normalized = normalize_workspace_path(source_path)
439 destination_normalized = normalize_workspace_path(destination_path)
440 if not source_normalized:
441 return WorkspaceWriteResult(False, source_normalized, "Missing source path")
442 if not destination_normalized:
443 return WorkspaceWriteResult(False, destination_normalized, "Missing destination path")
444 if source_normalized in {"tasks.json", "soul.md"}:
445 return WorkspaceWriteResult(False, source_normalized, f"{source_normalized} cannot be moved (protected)")
446
447 storage = get_storage_backend()
448 source_key = normalize_storage_key(f"{agent_id}/{source_normalized}")
449 source_exists = await storage.exists(source_key)
450 source_is_dir = await storage.is_dir(source_key)
451 if not source_exists and not source_is_dir:
452 return WorkspaceWriteResult(False, source_normalized, f"File not found: {source_normalized}")
453
454 destination_key = normalize_storage_key(f"{agent_id}/{destination_normalized}")
455 destination_is_dir = await storage.is_dir(destination_key)
456 if destination_path.replace("\\", "/").strip().endswith("/") or destination_is_dir:
457 destination_normalized = normalize_workspace_path(f"{destination_normalized}/{Path(source_normalized).name}")
458 destination_key = normalize_storage_key(f"{agent_id}/{destination_normalized}")
459
460 if source_normalized == destination_normalized:
461 return WorkspaceWriteResult(False, source_normalized, "Source and destination are the same")
462 if source_is_dir and (destination_normalized == source_normalized or destination_normalized.startswith(source_normalized + "/")):
463 return WorkspaceWriteResult(False, source_normalized, "Cannot move a folder into itself")
464
465 if enforce_human_lock and actor_type != "user":
466 for locked_path in (source_normalized, destination_normalized):
467 lock = await get_active_lock(db, agent_id=agent_id, path=locked_path)
468 if lock:
469 return WorkspaceWriteResult(
470 False,
471 locked_path,
472 (
473 f"Human is currently editing {locked_path}. Do not move it now. "
474 "Ask the user to finish editing, or choose another path."
475 ),
476 locked_by_user_id=str(lock.user_id),
477 )
478
479 destination_exists = await storage.exists(destination_key)

Callers 1

Calls 15

get_storage_backendFunction · 0.90
normalize_storage_keyFunction · 0.90
workspace_locksFunction · 0.90
WriteConditionClass · 0.90
normalize_workspace_pathFunction · 0.85
get_active_lockFunction · 0.85
safe_agent_pathFunction · 0.85
record_revisionFunction · 0.85
delete_if_matchMethod · 0.80

Tested by

no test coverage detected