Enhanced ProgressTracker that sends updates via Server-Sent Events
| 61 | cls.remove_connection(session_id) |
| 62 | |
| 63 | class RealtimeProgressTracker(ProgressTracker): |
| 64 | """Enhanced ProgressTracker that sends updates via Server-Sent Events""" |
| 65 | |
| 66 | def __init__(self, total_items: int, operation_name: str, session_id: str): |
| 67 | super().__init__(total_items, operation_name) |
| 68 | self.session_id = session_id |
| 69 | self.last_update = 0 |
| 70 | self.update_interval = 1 # Update every 1 second |
| 71 | |
| 72 | # Initialize session progress |
| 73 | ACTIVE_PROGRESS_SESSIONS[session_id] = { |
| 74 | "operation_name": operation_name, |
| 75 | "total_items": total_items, |
| 76 | "processed_items": 0, |
| 77 | "errors_encountered": 0, |
| 78 | "start_time": self.start_time, |
| 79 | "status": "running", |
| 80 | "current_step": "", |
| 81 | "eta_seconds": 0, |
| 82 | "throughput": 0, |
| 83 | "progress_percentage": 0 |
| 84 | } |
| 85 | |
| 86 | # Send initial progress update |
| 87 | self._send_progress_update() |
| 88 | |
| 89 | def update(self, items_processed: int, errors: int = 0, current_step: str = ""): |
| 90 | """Update progress and send notification""" |
| 91 | super().update(items_processed, errors) |
| 92 | |
| 93 | # Update session data |
| 94 | session_data = ACTIVE_PROGRESS_SESSIONS.get(self.session_id) |
| 95 | if session_data: |
| 96 | session_data.update({ |
| 97 | "processed_items": self.processed_items, |
| 98 | "errors_encountered": self.errors_encountered, |
| 99 | "current_step": current_step, |
| 100 | "progress_percentage": (self.processed_items / self.total_items) * 100, |
| 101 | }) |
| 102 | |
| 103 | # Calculate throughput and ETA |
| 104 | elapsed = time.time() - self.start_time |
| 105 | if elapsed > 0: |
| 106 | session_data["throughput"] = self.processed_items / elapsed |
| 107 | remaining = self.total_items - self.processed_items |
| 108 | session_data["eta_seconds"] = remaining / session_data["throughput"] if session_data["throughput"] > 0 else 0 |
| 109 | |
| 110 | # Send update if enough time has passed |
| 111 | current_time = time.time() |
| 112 | if current_time - self.last_update >= self.update_interval: |
| 113 | self._send_progress_update() |
| 114 | self.last_update = current_time |
| 115 | |
| 116 | def finish(self): |
| 117 | """Mark progress as finished and send final update""" |
| 118 | super().finish() |
| 119 | |
| 120 | # Update session status |
no outgoing calls
no test coverage detected