Flush worker heartbeat updates to the database periodically. Uses a single UPDATE statement to update all workers with the same timestamp.
()
| 23 | |
| 24 | |
| 25 | async def flush_heartbeats(): |
| 26 | """ |
| 27 | Flush worker heartbeat updates to the database periodically. |
| 28 | Uses a single UPDATE statement to update all workers with the same timestamp. |
| 29 | """ |
| 30 | if not heartbeat_flush_buffer: |
| 31 | return |
| 32 | |
| 33 | # Copy buffer and clear it atomically |
| 34 | async with heartbeat_flush_buffer_lock: |
| 35 | local_buffer = set(heartbeat_flush_buffer) |
| 36 | heartbeat_flush_buffer.clear() |
| 37 | |
| 38 | try: |
| 39 | async with async_session() as session: |
| 40 | # Single UPDATE for all workers with the same timestamp |
| 41 | # UPDATE workers SET heartbeat_time = '2024-01-27 10:00:00' WHERE id IN (1, 2, 3, ...) |
| 42 | heartbeat_time = datetime.datetime.now(datetime.timezone.utc).replace( |
| 43 | microsecond=0 |
| 44 | ) |
| 45 | |
| 46 | stmt = ( |
| 47 | update(Worker) |
| 48 | .where(Worker.id.in_(local_buffer)) |
| 49 | .values(heartbeat_time=heartbeat_time) |
| 50 | ) |
| 51 | |
| 52 | await session.execute(stmt) |
| 53 | await session.commit() |
| 54 | except Exception as e: |
| 55 | logger.error(f"Error flushing heartbeats to DB: {e}") |
| 56 | |
| 57 | |
| 58 | async def flush_worker_status(): |
no test coverage detected