(queryset)
| 83 | |
| 84 | @staticmethod |
| 85 | def set_users_roles_for_cache(queryset): |
| 86 | # Todo: 未来有机会用 SQL 实现 |
| 87 | queryset_list = queryset |
| 88 | user_ids = [u.id for u in queryset_list] |
| 89 | role_bindings = RoleBinding.objects.filter(user__in=user_ids) \ |
| 90 | .values('user_id', 'role_id', 'scope') |
| 91 | |
| 92 | role_mapper = {r.id: r for r in Role.objects.all()} |
| 93 | user_org_role_mapper = defaultdict(set) |
| 94 | user_system_role_mapper = defaultdict(set) |
| 95 | |
| 96 | for binding in role_bindings: |
| 97 | role_id = binding['role_id'] |
| 98 | user_id = binding['user_id'] |
| 99 | if binding['scope'] == RoleBinding.Scope.system: |
| 100 | user_system_role_mapper[user_id].add(role_mapper[role_id]) |
| 101 | else: |
| 102 | user_org_role_mapper[user_id].add(role_mapper[role_id]) |
| 103 | |
| 104 | for u in queryset_list: |
| 105 | system_roles = user_system_role_mapper[u.id] |
| 106 | org_roles = user_org_role_mapper[u.id] |
| 107 | u.org_roles.cache_set(org_roles) |
| 108 | u.system_roles.cache_set(system_roles) |
| 109 | return queryset_list |
| 110 | |
| 111 | @staticmethod |
| 112 | def set_users_orgs_roles(queryset): |
no test coverage detected