MCPcopy
hub / github.com/PeerDB-io/peerdb / cloneTables

Method cloneTables

flow/workflows/snapshot_flow.go:231–292  ·  view source on GitHub ↗
(
	ctx workflow.Context,
	snapshotType snapshotType,
	slotName string,
	snapshotName string,
	stagingPathOverride string,
	maxParallelClones int,
)

Source from the content-addressed store, hash-verified

229}
230
231func (s *SnapshotFlowExecution) cloneTables(
232 ctx workflow.Context,
233 snapshotType snapshotType,
234 slotName string,
235 snapshotName string,
236 stagingPathOverride string,
237 maxParallelClones int,
238) error {
239 if snapshotType == SNAPSHOT_TYPE_SLOT {
240 s.logger.Info("cloning tables for slot", slog.String("slot", slotName), slog.String("snapshot", snapshotName))
241 } else if snapshotType == SNAPSHOT_TYPE_TX {
242 s.logger.Info("cloning tables in tx snapshot mode", slog.String("snapshot", snapshotName))
243 }
244
245 getParallelLoadKeyForTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
246 StartToCloseTimeout: 10 * time.Minute,
247 RetryPolicy: &temporal.RetryPolicy{
248 InitialInterval: 1 * time.Minute,
249 },
250 })
251
252 var res *protos.GetDefaultPartitionKeyForTablesOutput
253 if err := workflow.ExecuteActivity(getParallelLoadKeyForTablesCtx,
254 snapshot.GetDefaultPartitionKeyForTables, s.config).Get(ctx, &res); err != nil {
255 return fmt.Errorf("failed to get default partition keys for tables: %w", err)
256 }
257
258 boundSelector := shared.NewBoundSelector(ctx, "CloneTablesSelector", maxParallelClones)
259
260 sourcePeerType, err := getPeerType(ctx, s.config.SourceName)
261 if err != nil {
262 return err
263 }
264 destinationPeerType, err := getPeerType(ctx, s.config.DestinationName)
265 if err != nil {
266 return err
267 }
268
269 for _, v := range s.config.TableMappings {
270 source := v.SourceTableIdentifier
271 destination := v.DestinationTableIdentifier
272 s.logger.Info(
273 fmt.Sprintf("Cloning table with source table %s and destination table name %s", source, destination),
274 slog.String("snapshotName", snapshotName),
275 )
276 if v.PartitionKey == "" {
277 v.PartitionKey = res.TableDefaultPartitionKeyMapping[source]
278 }
279 if err := s.cloneTable(ctx, boundSelector, snapshotName, stagingPathOverride, v, sourcePeerType, destinationPeerType); err != nil {
280 s.logger.Error("failed to start clone child workflow", slog.Any("error", err))
281 return err
282 }
283 }
284
285 if err := boundSelector.Wait(ctx); err != nil {
286 s.logger.Error("failed to clone some tables", slog.Any("error", err))
287 return err
288 }

Callers 2

cloneTablesWithSlotMethod · 0.95
SnapshotFlowWorkflowFunction · 0.95

Calls 8

cloneTableMethod · 0.95
WaitMethod · 0.95
InfoMethod · 0.80
ExecuteActivityMethod · 0.80
getPeerTypeFunction · 0.70
StringMethod · 0.45
GetMethod · 0.45
ErrorMethod · 0.45

Tested by

no test coverage detected