(ctx context.Context, wg *sync.WaitGroup)
| 1031 | } |
| 1032 | |
| 1033 | func (p *producer) reportQueueStatusLoop(ctx context.Context, wg *sync.WaitGroup) { |
| 1034 | defer wg.Done() |
| 1035 | |
| 1036 | serviceutil.CancellableSleep(ctx, randutil.DurationBetween(0, time.Second)) |
| 1037 | reportTicker := time.NewTicker(p.config.QueueReportInterval) |
| 1038 | for { |
| 1039 | select { |
| 1040 | case <-ctx.Done(): |
| 1041 | reportTicker.Stop() |
| 1042 | return |
| 1043 | case <-reportTicker.C: |
| 1044 | p.reportQueueStatusOnce(ctx) |
| 1045 | } |
| 1046 | } |
| 1047 | } |
| 1048 | |
| 1049 | func (p *producer) reportQueueStatusOnce(ctx context.Context) { |
| 1050 | ctx, cancel := context.WithTimeout(ctx, 10*time.Second) |
no test coverage detected