(username: str, password: str)
| 22 | |
| 23 | |
| 24 | async def register_admin(username: str, password: str) -> Admin: |
| 25 | admin_id = Admin.generate_random_id() |
| 26 | salt, password_hash = _generate_password_hash(password) |
| 27 | token = generate_token(admin_id, []) |
| 28 | |
| 29 | # 1. create admin in database |
| 30 | async with postgres_pool.get_db_connection() as conn: |
| 31 | async with conn.transaction(): |
| 32 | row = await conn.fetchrow( |
| 33 | """ |
| 34 | INSERT INTO app_admin (admin_id, username, password_hash, salt, token) |
| 35 | VALUES ($1, $2, $3, $4, $5) |
| 36 | RETURNING * |
| 37 | """, |
| 38 | admin_id, |
| 39 | username, |
| 40 | password_hash, |
| 41 | salt, |
| 42 | token, |
| 43 | ) |
| 44 | |
| 45 | # 2. write to redis and return |
| 46 | admin = Admin.build(row) |
| 47 | await admin_ops.redis.set(admin) |
| 48 | |
| 49 | logger.info(f"Registered admin {username}") |
| 50 | |
| 51 | return admin |
nothing calls this directly
no test coverage detected