MCPcopy
hub / github.com/dataelement/Clawith / write_workspace_file

Function write_workspace_file

backend/app/services/workspace_collaboration.py:269–341  ·  view source on GitHub ↗

Write text content, enforcing human locks for agent/system actors.

(
    db: AsyncSession,
    *,
    agent_id: uuid.UUID,
    base_dir: Path,
    path: str,
    content: str,
    actor_type: str,
    actor_id: uuid.UUID | None,
    operation: str = "write",
    session_id: str | None = None,
    enforce_human_lock: bool = True,
    merge_user_autosave: bool = False,
    expected_version_token: str | None = None,
)

Source from the content-addressed store, hash-verified

267
268
269async def write_workspace_file(
270 db: AsyncSession,
271 *,
272 agent_id: uuid.UUID,
273 base_dir: Path,
274 path: str,
275 content: str,
276 actor_type: str,
277 actor_id: uuid.UUID | None,
278 operation: str = "write",
279 session_id: str | None = None,
280 enforce_human_lock: bool = True,
281 merge_user_autosave: bool = False,
282 expected_version_token: str | None = None,
283) -> WorkspaceWriteResult:
284 """Write text content, enforcing human locks for agent/system actors."""
285 normalized = normalize_workspace_path(path)
286 if not normalized:
287 return WorkspaceWriteResult(False, normalized, "Missing file path")
288
289 if enforce_human_lock and actor_type != "user":
290 lock = await get_active_lock(db, agent_id=agent_id, path=normalized)
291 if lock:
292 return WorkspaceWriteResult(
293 False,
294 normalized,
295 (
296 f"Human is currently editing {normalized}. Do not modify it now. "
297 "Ask the user to finish editing, or work on another file."
298 ),
299 locked_by_user_id=str(lock.user_id),
300 )
301
302 storage = get_storage_backend()
303 storage_key = normalize_storage_key(f"{agent_id}/{normalized}")
304 local_base_available = _should_mirror_to_local_filesystem(storage)
305 try:
306 target = safe_agent_path(base_dir, normalized)
307 except Exception:
308 target = None
309 local_base_available = False
310 before = await storage.read_text(storage_key, encoding="utf-8", errors="replace") if await storage.exists(storage_key) else None
311 write_result = await storage.write_bytes_if_match(
312 storage_key,
313 content.encode("utf-8"),
314 condition=WriteCondition(version_token=expected_version_token) if expected_version_token is not None else None,
315 content_type="text/plain; charset=utf-8",
316 )
317 if not write_result.ok:
318 return WorkspaceWriteResult(False, normalized, f"Conflict detected while writing {normalized}")
319 if local_base_available and target is not None:
320 target.parent.mkdir(parents=True, exist_ok=True)
321 async with aiofiles.open(target, "w", encoding="utf-8") as f:
322 await f.write(content)
323
324 revision = await record_revision(
325 db,
326 agent_id=agent_id,

Callers 3

write_fileFunction · 0.90
restore_file_revisionFunction · 0.90

Calls 12

get_storage_backendFunction · 0.90
normalize_storage_keyFunction · 0.90
WriteConditionClass · 0.90
normalize_workspace_pathFunction · 0.85
get_active_lockFunction · 0.85
safe_agent_pathFunction · 0.85
record_revisionFunction · 0.85
read_textMethod · 0.80
existsMethod · 0.45
write_bytes_if_matchMethod · 0.45

Tested by

no test coverage detected