MCPcopy
hub / github.com/PeerDB-io/peerdb / ReplicateQRepPartitions

Method ReplicateQRepPartitions

flow/activities/flowable.go:619–694  ·  view source on GitHub ↗
(ctx context.Context,
	config *protos.QRepConfig,
	partitions *protos.QRepPartitionBatch,
	runUUID string,
)

Source from the content-addressed store, hash-verified

617}
618
619func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
620 config *protos.QRepConfig,
621 partitions *protos.QRepPartitionBatch,
622 runUUID string,
623) error {
624 shutdown := common.HeartbeatRoutine(ctx, func() string {
625 return "replicating partitions for job"
626 })
627 defer shutdown()
628
629 ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
630 logger := log.With(internal.LoggerFromCtx(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName))
631
632 if err := monitoring.UpdateStartTimeForQRepRun(ctx, a.CatalogPool, runUUID); err != nil {
633 return fmt.Errorf("failed to update start time for qrep run: %w", err)
634 }
635
636 numPartitions := len(partitions.Partitions)
637 logger.Info("replicating partitions for batch",
638 slog.String("table", config.WatermarkTable),
639 slog.Int64("batchID", int64(partitions.BatchId)),
640 slog.Int("totalPartitions", numPartitions))
641
642 qRepPullCoreConn, qRepPullCoreClose, err := connectors.GetByNameAs[connectors.QRepPullConnectorCore](
643 ctx, config.Env, a.CatalogPool, config.SourceName)
644 if err != nil {
645 return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get qrep source connector: %w", err))
646 }
647 defer qRepPullCoreClose(ctx)
648
649 dstPeer, qRepSyncCoreConn, qRepSyncCoreClose, err := connectors.LoadPeerAndGetByNameAs[connectors.QRepSyncConnectorCore](
650 ctx,
651 config.Env,
652 a.CatalogPool,
653 config.DestinationName,
654 )
655 if err != nil {
656 return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get qrep destination connector: %w", err))
657 }
658 defer qRepSyncCoreClose(ctx)
659
660 replicatePartitionFunc, err := initializeReplicatePartitionFunc(ctx, a, config, runUUID, dstPeer.Type, qRepPullCoreConn, qRepSyncCoreConn)
661 if err != nil {
662 logger.Error("failed to initialize replication method", slog.Any("error", err))
663 return a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
664 }
665
666 for i, partition := range partitions.Partitions {
667 partLogger := log.With(logger,
668 slog.Int64("batchID", int64(partitions.BatchId)),
669 slog.String("partitionId", partition.PartitionId),
670 slog.String("table", config.WatermarkTable),
671 slog.Int("partitionNum", i+1),
672 slog.Int("totalPartitions", numPartitions))
673
674 startTime := time.Now()
675 partLogger.Info(fmt.Sprintf("start replicating partition %d/%d of table %s", i+1, numPartitions, config.WatermarkTable))
676

Callers

nothing calls this directly

Calls 6

InfoMethod · 0.80
LogFlowErrorMethod · 0.80
StringMethod · 0.45
ErrorMethod · 0.45
LogFlowInfoMethod · 0.45

Tested by

no test coverage detected