MCPcopy
hub / github.com/dataelement/Clawith / evaluate_agent_relationship_status

Function evaluate_agent_relationship_status

backend/app/core/permissions.py:171–248  ·  view source on GitHub ↗

Compute the effective status for an Agent -> Agent relationship.

(
    db: AsyncSession,
    rel: AgentAgentRelationship,
    *,
    current_user_id: uuid.UUID | None = None,
)

Source from the content-addressed store, hash-verified

169
170
171async def evaluate_agent_relationship_status(
172 db: AsyncSession,
173 rel: AgentAgentRelationship,
174 *,
175 current_user_id: uuid.UUID | None = None,
176) -> dict:
177 """Compute the effective status for an Agent -> Agent relationship."""
178 source_result = await db.execute(select(Agent).where(Agent.id == rel.agent_id))
179 source = source_result.scalar_one_or_none()
180 target = rel.__dict__.get("target_agent")
181 if target is None:
182 target_result = await db.execute(select(Agent).where(Agent.id == rel.target_agent_id))
183 target = target_result.scalar_one_or_none()
184
185 if not source or not target:
186 return {
187 "access_allowed": False,
188 "access_status": "missing_target",
189 "access_status_reason": "source_or_target_not_found",
190 }
191 if source.tenant_id != target.tenant_id:
192 return {
193 "access_allowed": False,
194 "access_status": "restricted",
195 "access_status_reason": "different_tenant",
196 }
197
198 available, reason = _agent_available(target)
199 if not available:
200 return {
201 "access_allowed": False,
202 "access_status": "restricted",
203 "access_status_reason": reason or "target_unavailable",
204 }
205
206 created_by_user_id = getattr(rel, "created_by_user_id", None)
207 if created_by_user_id:
208 if await user_can_manage_agent_id(db, created_by_user_id, source) and await user_can_manage_agent_id(db, created_by_user_id, target):
209 return {
210 "access_allowed": True,
211 "access_status": "active",
212 "access_status_reason": None,
213 }
214 return {
215 "access_allowed": False,
216 "access_status": "restricted",
217 "access_status_reason": "relationship_creator_no_longer_manages_both_agents",
218 }
219
220 target_mode = getattr(target, "access_mode", None) or "company"
221 if target_mode == "company":
222 return {
223 "access_allowed": True,
224 "access_status": "active",
225 "access_status_reason": None,
226 }
227
228 candidate_user_ids = [

Callers 6

_send_file_to_agentFunction · 0.90
_build_a2a_contextFunction · 0.90
get_agent_relationshipsFunction · 0.90
poll_messagesFunction · 0.90
send_messageFunction · 0.90

Calls 7

_agent_availableFunction · 0.85
user_can_manage_agent_idFunction · 0.85
whereMethod · 0.80
executeMethod · 0.45
scalar_one_or_noneMethod · 0.45
getMethod · 0.45
addMethod · 0.45

Tested by

no test coverage detected