MCPcopy
hub / github.com/PromtEngineer/localGPT / RealtimeProgressTracker

Class RealtimeProgressTracker

rag_system/api_server_with_progress.py:63–143  ·  view source on GitHub ↗

Enhanced ProgressTracker that sends updates via Server-Sent Events

Source from the content-addressed store, hash-verified

61 cls.remove_connection(session_id)
62
63class RealtimeProgressTracker(ProgressTracker):
64 """Enhanced ProgressTracker that sends updates via Server-Sent Events"""
65
66 def __init__(self, total_items: int, operation_name: str, session_id: str):
67 super().__init__(total_items, operation_name)
68 self.session_id = session_id
69 self.last_update = 0
70 self.update_interval = 1 # Update every 1 second
71
72 # Initialize session progress
73 ACTIVE_PROGRESS_SESSIONS[session_id] = {
74 "operation_name": operation_name,
75 "total_items": total_items,
76 "processed_items": 0,
77 "errors_encountered": 0,
78 "start_time": self.start_time,
79 "status": "running",
80 "current_step": "",
81 "eta_seconds": 0,
82 "throughput": 0,
83 "progress_percentage": 0
84 }
85
86 # Send initial progress update
87 self._send_progress_update()
88
89 def update(self, items_processed: int, errors: int = 0, current_step: str = ""):
90 """Update progress and send notification"""
91 super().update(items_processed, errors)
92
93 # Update session data
94 session_data = ACTIVE_PROGRESS_SESSIONS.get(self.session_id)
95 if session_data:
96 session_data.update({
97 "processed_items": self.processed_items,
98 "errors_encountered": self.errors_encountered,
99 "current_step": current_step,
100 "progress_percentage": (self.processed_items / self.total_items) * 100,
101 })
102
103 # Calculate throughput and ETA
104 elapsed = time.time() - self.start_time
105 if elapsed > 0:
106 session_data["throughput"] = self.processed_items / elapsed
107 remaining = self.total_items - self.processed_items
108 session_data["eta_seconds"] = remaining / session_data["throughput"] if session_data["throughput"] > 0 else 0
109
110 # Send update if enough time has passed
111 current_time = time.time()
112 if current_time - self.last_update >= self.update_interval:
113 self._send_progress_update()
114 self.last_update = current_time
115
116 def finish(self):
117 """Mark progress as finished and send final update"""
118 super().finish()
119
120 # Update session status

Callers 1

Calls

no outgoing calls

Tested by

no test coverage detected