MCPcopy Index your code
hub / github.com/riverqueue/river / reportQueueStatusOnce

Method reportQueueStatusOnce

producer.go:1049–1068  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

1047}
1048
1049func (p *producer) reportQueueStatusOnce(ctx context.Context) {
1050 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
1051 defer cancel()
1052
1053 p.Logger.DebugContext(ctx, p.Name+": Reporting queue status", slog.String("queue", p.config.Queue))
1054 _, err := p.exec.QueueCreateOrSetUpdatedAt(ctx, &riverdriver.QueueCreateOrSetUpdatedAtParams{
1055 Metadata: []byte("{}"),
1056 Name: p.config.Queue,
1057 Now: p.Time.NowOrNil(),
1058 Schema: p.config.Schema,
1059 })
1060 if err != nil && errors.Is(context.Cause(ctx), startstop.ErrStop) {
1061 return
1062 }
1063 if err != nil {
1064 p.Logger.ErrorContext(ctx, p.Name+": Queue status update, error updating in database", slog.String("err", err.Error()))
1065 return
1066 }
1067 p.testSignals.ReportedQueueStatus.Signal(struct{}{})
1068}
1069
1070type producerFetchResult struct {
1071 jobs []*rivertype.JobRow

Callers 1

reportQueueStatusLoopMethod · 0.95

Calls 5

SignalMethod · 0.80
NowOrNilMethod · 0.65
IsMethod · 0.45
ErrorMethod · 0.45

Tested by

no test coverage detected