MCPcopy Index your code
hub / github.com/riverqueue/river / Work

Method Work

riverlog/river_log.go:171–238  ·  view source on GitHub ↗
(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error)

Source from the content-addressed store, hash-verified

169}
170
171func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error {
172 var logBuf bytes.Buffer
173
174 switch {
175 case m.newCustomContext != nil:
176 ctx = m.newCustomContext(ctx, &logBuf)
177 case m.newSlogHandler != nil:
178 logger := slog.New(m.newSlogHandler(&logBuf))
179 ctx = context.WithValue(ctx, contextKey{}, logger)
180 default:
181 return errors.New("expected either newContextLogger or newSlogHandler to be set")
182 }
183
184 metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx)
185 if !hasMetadataUpdates {
186 return errors.New("expected to find metadata updates in context, but didn't")
187 }
188
189 // This all runs invariant of whether the job panics or returns an error.
190 defer func() {
191 logBytes := logBuf.Bytes()
192
193 // Return early if nothing ended up getting logged.
194 if len(logBytes) < 1 {
195 return
196 }
197
198 // Postgres JSONB is limited to 255MB, but it would be a bad idea to get
199 // anywhere close to that limit here.
200 if len(logBytes) > m.config.MaxSizeBytes {
201 m.Logger.WarnContext(ctx, m.Name+": Logs size exceeded maximum; truncating",
202 slog.Int("logs_size", len(logBytes)),
203 slog.Int("max_size", m.config.MaxSizeBytes),
204 )
205 logBytes = logBytes[0:m.config.MaxSizeBytes]
206 }
207
208 newLogEntryBytes, err := json.Marshal(logAttempt{
209 Attempt: job.Attempt,
210 Log: string(logBytes),
211 })
212 if err != nil {
213 m.Logger.ErrorContext(ctx, m.Name+": Error marshaling log data",
214 slog.Any("error", err),
215 )
216 return
217 }
218
219 allLogDataBytes, numDroppedEntries, err := appendLogDataWithCap(job.Metadata, newLogEntryBytes, m.config.MaxTotalBytes)
220 if err != nil {
221 m.Logger.ErrorContext(ctx, m.Name+": Error marshaling log data",
222 slog.Any("error", err),
223 )
224 return
225 }
226
227 if numDroppedEntries > 0 {
228 m.Logger.WarnContext(ctx, m.Name+": Logs size exceeded total maximum; dropping oldest entries",

Callers 1

Calls 2

appendLogDataWithCapFunction · 0.85

Tested by 1