(ctx context.Context, config *protos.QRepConfig, partitions *protos.QRepPartitionBatch, runUUID string, )
| 617 | } |
| 618 | |
| 619 | func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, |
| 620 | config *protos.QRepConfig, |
| 621 | partitions *protos.QRepPartitionBatch, |
| 622 | runUUID string, |
| 623 | ) error { |
| 624 | shutdown := common.HeartbeatRoutine(ctx, func() string { |
| 625 | return "replicating partitions for job" |
| 626 | }) |
| 627 | defer shutdown() |
| 628 | |
| 629 | ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) |
| 630 | logger := log.With(internal.LoggerFromCtx(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)) |
| 631 | |
| 632 | if err := monitoring.UpdateStartTimeForQRepRun(ctx, a.CatalogPool, runUUID); err != nil { |
| 633 | return fmt.Errorf("failed to update start time for qrep run: %w", err) |
| 634 | } |
| 635 | |
| 636 | numPartitions := len(partitions.Partitions) |
| 637 | logger.Info("replicating partitions for batch", |
| 638 | slog.String("table", config.WatermarkTable), |
| 639 | slog.Int64("batchID", int64(partitions.BatchId)), |
| 640 | slog.Int("totalPartitions", numPartitions)) |
| 641 | |
| 642 | qRepPullCoreConn, qRepPullCoreClose, err := connectors.GetByNameAs[connectors.QRepPullConnectorCore]( |
| 643 | ctx, config.Env, a.CatalogPool, config.SourceName) |
| 644 | if err != nil { |
| 645 | return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get qrep source connector: %w", err)) |
| 646 | } |
| 647 | defer qRepPullCoreClose(ctx) |
| 648 | |
| 649 | dstPeer, qRepSyncCoreConn, qRepSyncCoreClose, err := connectors.LoadPeerAndGetByNameAs[connectors.QRepSyncConnectorCore]( |
| 650 | ctx, |
| 651 | config.Env, |
| 652 | a.CatalogPool, |
| 653 | config.DestinationName, |
| 654 | ) |
| 655 | if err != nil { |
| 656 | return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get qrep destination connector: %w", err)) |
| 657 | } |
| 658 | defer qRepSyncCoreClose(ctx) |
| 659 | |
| 660 | replicatePartitionFunc, err := initializeReplicatePartitionFunc(ctx, a, config, runUUID, dstPeer.Type, qRepPullCoreConn, qRepSyncCoreConn) |
| 661 | if err != nil { |
| 662 | logger.Error("failed to initialize replication method", slog.Any("error", err)) |
| 663 | return a.Alerter.LogFlowError(ctx, config.FlowJobName, err) |
| 664 | } |
| 665 | |
| 666 | for i, partition := range partitions.Partitions { |
| 667 | partLogger := log.With(logger, |
| 668 | slog.Int64("batchID", int64(partitions.BatchId)), |
| 669 | slog.String("partitionId", partition.PartitionId), |
| 670 | slog.String("table", config.WatermarkTable), |
| 671 | slog.Int("partitionNum", i+1), |
| 672 | slog.Int("totalPartitions", numPartitions)) |
| 673 | |
| 674 | startTime := time.Now() |
| 675 | partLogger.Info(fmt.Sprintf("start replicating partition %d/%d of table %s", i+1, numPartitions, config.WatermarkTable)) |
| 676 |
nothing calls this directly
no test coverage detected