(ctx context.Context)
| 1047 | } |
| 1048 | |
| 1049 | func (p *producer) reportQueueStatusOnce(ctx context.Context) { |
| 1050 | ctx, cancel := context.WithTimeout(ctx, 10*time.Second) |
| 1051 | defer cancel() |
| 1052 | |
| 1053 | p.Logger.DebugContext(ctx, p.Name+": Reporting queue status", slog.String("queue", p.config.Queue)) |
| 1054 | _, err := p.exec.QueueCreateOrSetUpdatedAt(ctx, &riverdriver.QueueCreateOrSetUpdatedAtParams{ |
| 1055 | Metadata: []byte("{}"), |
| 1056 | Name: p.config.Queue, |
| 1057 | Now: p.Time.NowOrNil(), |
| 1058 | Schema: p.config.Schema, |
| 1059 | }) |
| 1060 | if err != nil && errors.Is(context.Cause(ctx), startstop.ErrStop) { |
| 1061 | return |
| 1062 | } |
| 1063 | if err != nil { |
| 1064 | p.Logger.ErrorContext(ctx, p.Name+": Queue status update, error updating in database", slog.String("err", err.Error())) |
| 1065 | return |
| 1066 | } |
| 1067 | p.testSignals.ReportedQueueStatus.Signal(struct{}{}) |
| 1068 | } |
| 1069 | |
| 1070 | type producerFetchResult struct { |
| 1071 | jobs []*rivertype.JobRow |
no test coverage detected