MCPcopy
hub / github.com/dataelement/Clawith / evaluate_human_relationship_status

Function evaluate_human_relationship_status

backend/app/core/permissions.py:251–297  ·  view source on GitHub ↗

Compute the effective status for an Agent -> Human relationship.

(
    db: AsyncSession,
    rel: AgentRelationship,
    *,
    source_agent: Agent | None = None,
)

Source from the content-addressed store, hash-verified

249
250
251async def evaluate_human_relationship_status(
252 db: AsyncSession,
253 rel: AgentRelationship,
254 *,
255 source_agent: Agent | None = None,
256) -> dict:
257 """Compute the effective status for an Agent -> Human relationship."""
258 if source_agent is None:
259 source_result = await db.execute(select(Agent).where(Agent.id == rel.agent_id))
260 source_agent = source_result.scalar_one_or_none()
261 member = rel.__dict__.get("member")
262 if member is None:
263 member_result = await db.execute(select(OrgMember).where(OrgMember.id == rel.member_id))
264 member = member_result.scalar_one_or_none()
265
266 if not source_agent or not member:
267 return {
268 "access_allowed": False,
269 "access_status": "missing_target",
270 "access_status_reason": "agent_or_member_not_found",
271 }
272 if member.status != "active":
273 return {
274 "access_allowed": False,
275 "access_status": "restricted",
276 "access_status_reason": "member_inactive",
277 }
278 if member.tenant_id and source_agent.tenant_id and member.tenant_id != source_agent.tenant_id:
279 return {
280 "access_allowed": False,
281 "access_status": "restricted",
282 "access_status_reason": "different_tenant",
283 }
284 if member.user_id:
285 access_level = await get_agent_access_level_for_user_id(db, member.user_id, source_agent)
286 if not access_level:
287 return {
288 "access_allowed": False,
289 "access_status": "restricted",
290 "access_status_reason": "platform_user_no_agent_access",
291 }
292
293 return {
294 "access_allowed": True,
295 "access_status": "active",
296 "access_status_reason": None,
297 }
298
299
300async def check_agent_access(db: AsyncSession, user: User, agent_id: uuid.UUID) -> Tuple[Agent, str]:

Callers 7

_send_feishu_messageFunction · 0.90
_send_channel_messageFunction · 0.90
_send_platform_messageFunction · 0.90
get_relationshipsFunction · 0.90
poll_messagesFunction · 0.90
send_messageFunction · 0.90

Calls 5

whereMethod · 0.80
executeMethod · 0.45
scalar_one_or_noneMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected