StreamProgress opens a gRPC progress stream and calls the callback for each update.
(ctx context.Context, userID, jobID string, callback func(event *schema.QuantizationProgressEvent))
| 379 | |
| 380 | // StreamProgress opens a gRPC progress stream and calls the callback for each update. |
| 381 | func (s *QuantizationService) StreamProgress(ctx context.Context, userID, jobID string, callback func(event *schema.QuantizationProgressEvent)) error { |
| 382 | s.mu.Lock() |
| 383 | job, ok := s.jobs.Get(jobID) |
| 384 | if !ok { |
| 385 | s.mu.Unlock() |
| 386 | return fmt.Errorf("job not found: %s", jobID) |
| 387 | } |
| 388 | if userID != "" && job.UserID != userID { |
| 389 | s.mu.Unlock() |
| 390 | return fmt.Errorf("job not found: %s", jobID) |
| 391 | } |
| 392 | s.mu.Unlock() |
| 393 | |
| 394 | streamModelID := job.ModelID |
| 395 | if streamModelID == "" { |
| 396 | streamModelID = job.Backend + "-quantize" |
| 397 | } |
| 398 | backendModel, err := s.modelLoader.Load( |
| 399 | model.WithBackendString(job.Backend), |
| 400 | model.WithModel(job.Backend), |
| 401 | model.WithModelID(streamModelID), |
| 402 | ) |
| 403 | if err != nil { |
| 404 | return fmt.Errorf("failed to load backend: %w", err) |
| 405 | } |
| 406 | |
| 407 | return backendModel.QuantizationProgress(ctx, &pb.QuantizationProgressRequest{ |
| 408 | JobId: jobID, |
| 409 | }, func(update *pb.QuantizationProgressUpdate) { |
| 410 | // Update job status and persist |
| 411 | s.mu.Lock() |
| 412 | if j, ok := s.jobs.Get(jobID); ok { |
| 413 | // Don't let progress updates overwrite terminal states |
| 414 | isTerminal := j.Status == "stopped" || j.Status == "completed" || j.Status == "failed" |
| 415 | if !isTerminal { |
| 416 | j.Status = update.Status |
| 417 | } |
| 418 | if update.Message != "" { |
| 419 | j.Message = update.Message |
| 420 | } |
| 421 | if update.OutputFile != "" { |
| 422 | j.OutputFile = update.OutputFile |
| 423 | } |
| 424 | if err := s.jobs.Set(ctx, j); err != nil { |
| 425 | xlog.Warn("Failed to persist progress update", "job_id", jobID, "error", err) |
| 426 | } |
| 427 | s.saveJobState(j) |
| 428 | } |
| 429 | s.mu.Unlock() |
| 430 | |
| 431 | // Convert extra metrics |
| 432 | extraMetrics := make(map[string]float32) |
| 433 | for k, v := range update.ExtraMetrics { |
| 434 | extraMetrics[k] = v |
| 435 | } |
| 436 | |
| 437 | event := &schema.QuantizationProgressEvent{ |
| 438 | JobID: update.JobId, |
nothing calls this directly
no test coverage detected