param db_session: Optional session to use, if None, a new session will be created. Provide a transactional scope around a series of operations.
(db_session=None)
| 302 | |
| 303 | @contextmanager |
| 304 | def get_db_session(db_session=None): |
| 305 | """ |
| 306 | param db_session: Optional session to use, if None, a new session will be created. |
| 307 | Provide a transactional scope around a series of operations. |
| 308 | """ |
| 309 | session = db_client.session_maker() if db_session is None else db_session |
| 310 | try: |
| 311 | yield session |
| 312 | if db_session is None: |
| 313 | session.commit() |
| 314 | except Exception as e: |
| 315 | if db_session is None: |
| 316 | session.rollback() |
| 317 | logger.error(f"Database operation failed: {str(e)}") |
| 318 | raise e |
| 319 | finally: |
| 320 | if db_session is None: |
| 321 | session.close() |
| 322 | |
| 323 | |
| 324 | def as_dict(obj): |