(ctx context.Context, r io.Reader, p events.Publisher[Event])
| 454 | } |
| 455 | |
| 456 | func (d *Client) sendPullLogs(ctx context.Context, r io.Reader, p events.Publisher[Event]) error { |
| 457 | plf := pullLogFormatter{Known: map[string]*pullInfo{}} |
| 458 | |
| 459 | scanner := bufio.NewScanner(r) |
| 460 | for scanner.Scan() { |
| 461 | log := jsonmessage.JSONMessage{} |
| 462 | if err := json.Unmarshal(scanner.Bytes(), &log); err != nil { |
| 463 | return fmt.Errorf("error parsing log message: %#v: %w", log, err) |
| 464 | } |
| 465 | |
| 466 | logMsg := plf.Update(log) |
| 467 | if logMsg == nil { |
| 468 | continue |
| 469 | } |
| 470 | |
| 471 | if err := p.Publish(ctx, NewLogEvent(model.LogLevelInfo, *logMsg)); err != nil { |
| 472 | return err |
| 473 | } |
| 474 | } |
| 475 | // Always print the complete progress bar, regardless of the backoff time. |
| 476 | finalLogMsg := plf.RenderProgress() |
| 477 | if err := p.Publish(ctx, NewLogEvent(model.LogLevelInfo, finalLogMsg)); err != nil { |
| 478 | return err |
| 479 | } |
| 480 | return scanner.Err() |
| 481 | } |
no test coverage detected