Load users from a JSON file. Expected JSON format: { "users": [ { "username": "user@example.com", "email": "user@example.com", "password": "password123", "role": "User",
(input_file: str,
sqlite_path: Optional[str] = None)
| 166 | @app.command() |
| 167 | @update_sqlite_path |
| 168 | def load_users(input_file: str, |
| 169 | sqlite_path: Optional[str] = None): |
| 170 | """Load users from a JSON file. |
| 171 | |
| 172 | Expected JSON format: |
| 173 | { |
| 174 | "users": [ |
| 175 | { |
| 176 | "username": "user@example.com", |
| 177 | "email": "user@example.com", |
| 178 | "password": "password123", |
| 179 | "role": "User", |
| 180 | "active": true, |
| 181 | "auth_source": "internal" |
| 182 | }, |
| 183 | { |
| 184 | "username": "ldap_user", |
| 185 | "email": "ldap@example.com", |
| 186 | "role": "Administrator", |
| 187 | "active": true, |
| 188 | "auth_source": "ldap" |
| 189 | } |
| 190 | ] |
| 191 | } |
| 192 | """ |
| 193 | from urllib.parse import unquote |
| 194 | |
| 195 | print('----------') |
| 196 | print('Loading users from:', input_file) |
| 197 | print('SQLite pgAdmin config:', config.SQLITE_PATH) |
| 198 | print('----------') |
| 199 | |
| 200 | # Parse the input file path |
| 201 | try: |
| 202 | file_path = unquote(input_file) |
| 203 | except Exception as e: |
| 204 | return _handle_error(str(e), True) |
| 205 | |
| 206 | # Read and parse JSON file |
| 207 | try: |
| 208 | with open(file_path) as f: |
| 209 | data = jsonlib.load(f) |
| 210 | except jsonlib.decoder.JSONDecodeError as e: |
| 211 | return _handle_error( |
| 212 | gettext("Error parsing input file %s: %s" % (file_path, e)), |
| 213 | True) |
| 214 | except Exception as e: |
| 215 | return _handle_error( |
| 216 | gettext("Error reading input file %s: [%d] %s" % |
| 217 | (file_path, e.errno, e.strerror)), True) |
| 218 | |
| 219 | # Validate JSON structure |
| 220 | if 'users' not in data: |
| 221 | return _handle_error( |
| 222 | gettext("Invalid JSON format: 'users' key not found"), True) |
| 223 | |
| 224 | users_data = data['users'] |
| 225 | if not isinstance(users_data, list): |
nothing calls this directly
no test coverage detected