Return platform users who can access an agent under current policy.
(db: AsyncSession, agent: Agent)
| 121 | |
| 122 | |
| 123 | async def get_agent_accessible_user_ids(db: AsyncSession, agent: Agent) -> set[uuid.UUID]: |
| 124 | """Return platform users who can access an agent under current policy.""" |
| 125 | access_mode = getattr(agent, "access_mode", None) or "company" |
| 126 | ids: set[uuid.UUID] = set() |
| 127 | if agent.creator_id: |
| 128 | ids.add(agent.creator_id) |
| 129 | |
| 130 | if access_mode == "company": |
| 131 | result = await db.execute( |
| 132 | select(User.id).where( |
| 133 | User.tenant_id == agent.tenant_id, |
| 134 | User.is_active == True, # noqa: E712 |
| 135 | ) |
| 136 | ) |
| 137 | ids.update(row[0] for row in result.fetchall()) |
| 138 | return ids |
| 139 | |
| 140 | if access_mode == "custom": |
| 141 | result = await db.execute( |
| 142 | select(AgentPermission.scope_id).where( |
| 143 | AgentPermission.agent_id == agent.id, |
| 144 | AgentPermission.scope_type == "user", |
| 145 | AgentPermission.scope_id.isnot(None), |
| 146 | ) |
| 147 | ) |
| 148 | ids.update(row[0] for row in result.fetchall() if row[0]) |
| 149 | admin_result = await db.execute( |
| 150 | select(User.id).where( |
| 151 | User.tenant_id == agent.tenant_id, |
| 152 | User.is_active == True, # noqa: E712 |
| 153 | User.role.in_(["platform_admin", "org_admin"]), |
| 154 | ) |
| 155 | ) |
| 156 | ids.update(row[0] for row in admin_result.fetchall()) |
| 157 | |
| 158 | return ids |
| 159 | |
| 160 | |
| 161 | def _agent_available(agent: Agent | None) -> tuple[bool, str | None]: |
no test coverage detected