(ctx context.Context)
| 1343 | } |
| 1344 | |
| 1345 | func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error { |
| 1346 | logger := log.With(internal.LoggerFromCtx(ctx), slog.String("scheduledTask", "RecordSlotSizes")) |
| 1347 | logger.Info("Recording Slot Information") |
| 1348 | slotMetricGauges := otel_metrics.SlotMetricGauges{} |
| 1349 | slotMetricGauges.SlotLagGauge = a.OtelManager.Metrics.SlotLagGauge |
| 1350 | slotMetricGauges.RestartLSNGauge = a.OtelManager.Metrics.RestartLSNGauge |
| 1351 | slotMetricGauges.ConfirmedFlushLSNGauge = a.OtelManager.Metrics.ConfirmedFlushLSNGauge |
| 1352 | slotMetricGauges.SentLSNGauge = a.OtelManager.Metrics.SentLSNGauge |
| 1353 | slotMetricGauges.CurrentWalLSNGauge = a.OtelManager.Metrics.CurrentWalLSNGauge |
| 1354 | slotMetricGauges.RestartToConfirmedMBGauge = a.OtelManager.Metrics.RestartToConfirmedMBGauge |
| 1355 | slotMetricGauges.ConfirmedToCurrentMBGauge = a.OtelManager.Metrics.ConfirmedToCurrentMBGauge |
| 1356 | slotMetricGauges.SafeWalSizeGauge = a.OtelManager.Metrics.SafeWalSizeGauge |
| 1357 | slotMetricGauges.SlotActiveGauge = a.OtelManager.Metrics.SlotActiveGauge |
| 1358 | slotMetricGauges.WalSenderStateGauge = a.OtelManager.Metrics.WalSenderStateGauge |
| 1359 | slotMetricGauges.WalStatusGauge = a.OtelManager.Metrics.WalStatusGauge |
| 1360 | slotMetricGauges.LogicalDecodingWorkMemGauge = a.OtelManager.Metrics.LogicalDecodingWorkMemGauge |
| 1361 | slotMetricGauges.StatsResetGauge = a.OtelManager.Metrics.StatsResetGauge |
| 1362 | slotMetricGauges.SpillTxnsGauge = a.OtelManager.Metrics.SpillTxnsGauge |
| 1363 | slotMetricGauges.SpillCountGauge = a.OtelManager.Metrics.SpillCountGauge |
| 1364 | slotMetricGauges.SpillBytesGauge = a.OtelManager.Metrics.SpillBytesGauge |
| 1365 | slotMetricGauges.OpenConnectionsGauge = a.OtelManager.Metrics.OpenConnectionsGauge |
| 1366 | slotMetricGauges.OpenReplicationConnectionsGauge = a.OtelManager.Metrics.OpenReplicationConnectionsGauge |
| 1367 | slotMetricGauges.IntervalSinceLastNormalizeGauge = a.OtelManager.Metrics.IntervalSinceLastNormalizeGauge |
| 1368 | |
| 1369 | logger.Info("Querying for flows to emit slot metrics") |
| 1370 | rows, err := a.CatalogPool.Query(ctx, |
| 1371 | "SELECT DISTINCT ON (name) name, config_proto, workflow_id, updated_at FROM flows WHERE query_string IS NULL") |
| 1372 | if err != nil { |
| 1373 | logger.Error("failed to query all flows", slog.Any("error", err)) |
| 1374 | return fmt.Errorf("failed to query all flows for metrics: %w", err) |
| 1375 | } |
| 1376 | |
| 1377 | infos, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (flowInformation, error) { |
| 1378 | var flowName string |
| 1379 | var configProto []byte |
| 1380 | var workflowID string |
| 1381 | var updatedAt time.Time |
| 1382 | if err := rows.Scan(&flowName, &configProto, &workflowID, &updatedAt); err != nil { |
| 1383 | return flowInformation{}, err |
| 1384 | } |
| 1385 | |
| 1386 | var config protos.FlowConnectionConfigsCore |
| 1387 | if err := proto.Unmarshal(configProto, &config); err != nil { |
| 1388 | return flowInformation{}, err |
| 1389 | } |
| 1390 | |
| 1391 | return flowInformation{ |
| 1392 | config: &config, |
| 1393 | workflowID: workflowID, |
| 1394 | updatedAt: updatedAt, |
| 1395 | }, nil |
| 1396 | }) |
| 1397 | if err != nil { |
| 1398 | logger.Error("failed to process result of all flows", slog.Any("error", err)) |
| 1399 | return fmt.Errorf("failed to process result of all flows for metrics: %w", err) |
| 1400 | } |
| 1401 | |
| 1402 | normalizeLagCtx, normalizeLagCancel := context.WithTimeout(ctx, 10*time.Second) |
no test coverage detected