MCPcopy
hub / github.com/PeerDB-io/peerdb / RecordSlotSizes

Method RecordSlotSizes

flow/activities/flowable.go:1345–1437  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

1343}
1344
1345func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
1346 logger := log.With(internal.LoggerFromCtx(ctx), slog.String("scheduledTask", "RecordSlotSizes"))
1347 logger.Info("Recording Slot Information")
1348 slotMetricGauges := otel_metrics.SlotMetricGauges{}
1349 slotMetricGauges.SlotLagGauge = a.OtelManager.Metrics.SlotLagGauge
1350 slotMetricGauges.RestartLSNGauge = a.OtelManager.Metrics.RestartLSNGauge
1351 slotMetricGauges.ConfirmedFlushLSNGauge = a.OtelManager.Metrics.ConfirmedFlushLSNGauge
1352 slotMetricGauges.SentLSNGauge = a.OtelManager.Metrics.SentLSNGauge
1353 slotMetricGauges.CurrentWalLSNGauge = a.OtelManager.Metrics.CurrentWalLSNGauge
1354 slotMetricGauges.RestartToConfirmedMBGauge = a.OtelManager.Metrics.RestartToConfirmedMBGauge
1355 slotMetricGauges.ConfirmedToCurrentMBGauge = a.OtelManager.Metrics.ConfirmedToCurrentMBGauge
1356 slotMetricGauges.SafeWalSizeGauge = a.OtelManager.Metrics.SafeWalSizeGauge
1357 slotMetricGauges.SlotActiveGauge = a.OtelManager.Metrics.SlotActiveGauge
1358 slotMetricGauges.WalSenderStateGauge = a.OtelManager.Metrics.WalSenderStateGauge
1359 slotMetricGauges.WalStatusGauge = a.OtelManager.Metrics.WalStatusGauge
1360 slotMetricGauges.LogicalDecodingWorkMemGauge = a.OtelManager.Metrics.LogicalDecodingWorkMemGauge
1361 slotMetricGauges.StatsResetGauge = a.OtelManager.Metrics.StatsResetGauge
1362 slotMetricGauges.SpillTxnsGauge = a.OtelManager.Metrics.SpillTxnsGauge
1363 slotMetricGauges.SpillCountGauge = a.OtelManager.Metrics.SpillCountGauge
1364 slotMetricGauges.SpillBytesGauge = a.OtelManager.Metrics.SpillBytesGauge
1365 slotMetricGauges.OpenConnectionsGauge = a.OtelManager.Metrics.OpenConnectionsGauge
1366 slotMetricGauges.OpenReplicationConnectionsGauge = a.OtelManager.Metrics.OpenReplicationConnectionsGauge
1367 slotMetricGauges.IntervalSinceLastNormalizeGauge = a.OtelManager.Metrics.IntervalSinceLastNormalizeGauge
1368
1369 logger.Info("Querying for flows to emit slot metrics")
1370 rows, err := a.CatalogPool.Query(ctx,
1371 "SELECT DISTINCT ON (name) name, config_proto, workflow_id, updated_at FROM flows WHERE query_string IS NULL")
1372 if err != nil {
1373 logger.Error("failed to query all flows", slog.Any("error", err))
1374 return fmt.Errorf("failed to query all flows for metrics: %w", err)
1375 }
1376
1377 infos, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (flowInformation, error) {
1378 var flowName string
1379 var configProto []byte
1380 var workflowID string
1381 var updatedAt time.Time
1382 if err := rows.Scan(&flowName, &configProto, &workflowID, &updatedAt); err != nil {
1383 return flowInformation{}, err
1384 }
1385
1386 var config protos.FlowConnectionConfigsCore
1387 if err := proto.Unmarshal(configProto, &config); err != nil {
1388 return flowInformation{}, err
1389 }
1390
1391 return flowInformation{
1392 config: &config,
1393 workflowID: workflowID,
1394 updatedAt: updatedAt,
1395 }, nil
1396 })
1397 if err != nil {
1398 logger.Error("failed to process result of all flows", slog.Any("error", err))
1399 return fmt.Errorf("failed to process result of all flows for metrics: %w", err)
1400 }
1401
1402 normalizeLagCtx, normalizeLagCancel := context.WithTimeout(ctx, 10*time.Second)

Callers 1

ScheduledTasksMethod · 0.95

Calls 10

recordSlotInformationMethod · 0.95
emitLogRetentionHoursMethod · 0.95
InfoMethod · 0.80
QueryMethod · 0.65
ErrMethod · 0.65
StringMethod · 0.45
ErrorMethod · 0.45
ScanMethod · 0.45
RecordMethod · 0.45

Tested by

no test coverage detected