MCPcopy
hub / github.com/mudler/LocalAI / StreamProgress

Method StreamProgress

core/services/quantization/service.go:381–447  ·  view source on GitHub ↗

StreamProgress opens a gRPC progress stream and calls the callback for each update.

(ctx context.Context, userID, jobID string, callback func(event *schema.QuantizationProgressEvent))

Source from the content-addressed store, hash-verified

379
380// StreamProgress opens a gRPC progress stream and calls the callback for each update.
381func (s *QuantizationService) StreamProgress(ctx context.Context, userID, jobID string, callback func(event *schema.QuantizationProgressEvent)) error {
382 s.mu.Lock()
383 job, ok := s.jobs.Get(jobID)
384 if !ok {
385 s.mu.Unlock()
386 return fmt.Errorf("job not found: %s", jobID)
387 }
388 if userID != "" && job.UserID != userID {
389 s.mu.Unlock()
390 return fmt.Errorf("job not found: %s", jobID)
391 }
392 s.mu.Unlock()
393
394 streamModelID := job.ModelID
395 if streamModelID == "" {
396 streamModelID = job.Backend + "-quantize"
397 }
398 backendModel, err := s.modelLoader.Load(
399 model.WithBackendString(job.Backend),
400 model.WithModel(job.Backend),
401 model.WithModelID(streamModelID),
402 )
403 if err != nil {
404 return fmt.Errorf("failed to load backend: %w", err)
405 }
406
407 return backendModel.QuantizationProgress(ctx, &pb.QuantizationProgressRequest{
408 JobId: jobID,
409 }, func(update *pb.QuantizationProgressUpdate) {
410 // Update job status and persist
411 s.mu.Lock()
412 if j, ok := s.jobs.Get(jobID); ok {
413 // Don't let progress updates overwrite terminal states
414 isTerminal := j.Status == "stopped" || j.Status == "completed" || j.Status == "failed"
415 if !isTerminal {
416 j.Status = update.Status
417 }
418 if update.Message != "" {
419 j.Message = update.Message
420 }
421 if update.OutputFile != "" {
422 j.OutputFile = update.OutputFile
423 }
424 if err := s.jobs.Set(ctx, j); err != nil {
425 xlog.Warn("Failed to persist progress update", "job_id", jobID, "error", err)
426 }
427 s.saveJobState(j)
428 }
429 s.mu.Unlock()
430
431 // Convert extra metrics
432 extraMetrics := make(map[string]float32)
433 for k, v := range update.ExtraMetrics {
434 extraMetrics[k] = v
435 }
436
437 event := &schema.QuantizationProgressEvent{
438 JobID: update.JobId,

Callers

nothing calls this directly

Calls 10

saveJobStateMethod · 0.95
WithBackendStringFunction · 0.92
WithModelFunction · 0.92
WithModelIDFunction · 0.92
LockMethod · 0.65
GetMethod · 0.65
UnlockMethod · 0.65
LoadMethod · 0.65
QuantizationProgressMethod · 0.65
SetMethod · 0.65

Tested by

no test coverage detected