| 344 | |
| 345 | |
| 346 | async def update_usergroup_by_id( |
| 347 | request: Request, |
| 348 | db_session: AsyncSession, |
| 349 | current_user: PublicUser | AnonymousUser, |
| 350 | usergroup_id: int, |
| 351 | usergroup_update: UserGroupUpdate, |
| 352 | ) -> UserGroupRead: |
| 353 | |
| 354 | statement = select(UserGroup).where(UserGroup.id == usergroup_id) |
| 355 | usergroup = (await db_session.execute(statement)).scalars().first() |
| 356 | |
| 357 | if not usergroup: |
| 358 | raise HTTPException( |
| 359 | status_code=404, |
| 360 | detail="UserGroup not found", |
| 361 | ) |
| 362 | |
| 363 | # RBAC check — scoped to the usergroup's org to prevent cross-org IDOR |
| 364 | await rbac_check( |
| 365 | request, |
| 366 | usergroup_uuid=usergroup.usergroup_uuid, |
| 367 | current_user=current_user, |
| 368 | action="update", |
| 369 | db_session=db_session, |
| 370 | org_id=usergroup.org_id, |
| 371 | ) |
| 372 | |
| 373 | if usergroup_update.name is not None: |
| 374 | usergroup.name = usergroup_update.name |
| 375 | if usergroup_update.description is not None: |
| 376 | usergroup.description = usergroup_update.description |
| 377 | usergroup.update_date = str(datetime.now()) |
| 378 | |
| 379 | db_session.add(usergroup) |
| 380 | await db_session.commit() |
| 381 | await db_session.refresh(usergroup) |
| 382 | |
| 383 | usergroup = UserGroupRead.model_validate(usergroup) |
| 384 | |
| 385 | return usergroup |
| 386 | |
| 387 | |
| 388 | async def delete_usergroup_by_id( |