MCPcopy
hub / github.com/PeerDB-io/peerdb / cloneTable

Method cloneTable

flow/workflows/snapshot_flow.go:102–229  ·  view source on GitHub ↗
(
	ctx workflow.Context,
	boundSelector *shared.BoundSelector,
	snapshotName string,
	stagingPathOverride string,
	mapping *protos.TableMapping,
	sourcePeerType protos.DBType,
	destinationPeerType protos.DBType,
)

Source from the content-addressed store, hash-verified

100}
101
102func (s *SnapshotFlowExecution) cloneTable(
103 ctx workflow.Context,
104 boundSelector *shared.BoundSelector,
105 snapshotName string,
106 stagingPathOverride string,
107 mapping *protos.TableMapping,
108 sourcePeerType protos.DBType,
109 destinationPeerType protos.DBType,
110) error {
111 flowName := s.config.FlowJobName
112 cloneLog := slog.Group("clone-log",
113 slog.String(string(shared.FlowNameKey), flowName),
114 slog.String("snapshotName", snapshotName))
115
116 srcName := mapping.SourceTableIdentifier
117 dstName := mapping.DestinationTableIdentifier
118 originalRunID := workflow.GetInfo(ctx).OriginalRunID
119
120 childWorkflowID := fmt.Sprintf("clone_%s_%s_%s", flowName, srcName, originalRunID)
121 childWorkflowID = shared.ReplaceIllegalCharactersWithUnderscores(childWorkflowID)
122
123 s.logger.Info(fmt.Sprintf("Obtained child id %s for source table %s and destination table %s",
124 childWorkflowID, srcName, dstName), cloneLog)
125
126 taskQueue := internal.PeerFlowTaskQueueName(shared.PeerFlowTaskQueue)
127 childCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
128 WorkflowID: childWorkflowID,
129 WorkflowTaskTimeout: 5 * time.Minute,
130 TaskQueue: taskQueue,
131 RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1},
132 })
133
134 var tableSchema *protos.TableSchema
135 initTableSchema := func() error {
136 if tableSchema != nil {
137 return nil
138 }
139
140 schemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
141 StartToCloseTimeout: time.Minute,
142 WaitForCancellation: true,
143 RetryPolicy: &temporal.RetryPolicy{
144 InitialInterval: 1 * time.Minute,
145 },
146 })
147 return workflow.ExecuteActivity(
148 schemaCtx,
149 snapshot.LoadTableSchema,
150 s.config.FlowJobName,
151 dstName,
152 ).Get(ctx, &tableSchema)
153 }
154
155 numWorkers := uint32(8)
156 if s.config.SnapshotMaxParallelWorkers > 0 {
157 numWorkers = s.config.SnapshotMaxParallelWorkers
158 }
159

Callers 1

cloneTablesMethod · 0.95

Calls 5

InfoMethod · 0.80
ExecuteActivityMethod · 0.80
SpawnChildMethod · 0.80
StringMethod · 0.45
GetMethod · 0.45

Tested by

no test coverage detected