( ctx workflow.Context, boundSelector *shared.BoundSelector, snapshotName string, stagingPathOverride string, mapping *protos.TableMapping, sourcePeerType protos.DBType, destinationPeerType protos.DBType, )
| 100 | } |
| 101 | |
| 102 | func (s *SnapshotFlowExecution) cloneTable( |
| 103 | ctx workflow.Context, |
| 104 | boundSelector *shared.BoundSelector, |
| 105 | snapshotName string, |
| 106 | stagingPathOverride string, |
| 107 | mapping *protos.TableMapping, |
| 108 | sourcePeerType protos.DBType, |
| 109 | destinationPeerType protos.DBType, |
| 110 | ) error { |
| 111 | flowName := s.config.FlowJobName |
| 112 | cloneLog := slog.Group("clone-log", |
| 113 | slog.String(string(shared.FlowNameKey), flowName), |
| 114 | slog.String("snapshotName", snapshotName)) |
| 115 | |
| 116 | srcName := mapping.SourceTableIdentifier |
| 117 | dstName := mapping.DestinationTableIdentifier |
| 118 | originalRunID := workflow.GetInfo(ctx).OriginalRunID |
| 119 | |
| 120 | childWorkflowID := fmt.Sprintf("clone_%s_%s_%s", flowName, srcName, originalRunID) |
| 121 | childWorkflowID = shared.ReplaceIllegalCharactersWithUnderscores(childWorkflowID) |
| 122 | |
| 123 | s.logger.Info(fmt.Sprintf("Obtained child id %s for source table %s and destination table %s", |
| 124 | childWorkflowID, srcName, dstName), cloneLog) |
| 125 | |
| 126 | taskQueue := internal.PeerFlowTaskQueueName(shared.PeerFlowTaskQueue) |
| 127 | childCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{ |
| 128 | WorkflowID: childWorkflowID, |
| 129 | WorkflowTaskTimeout: 5 * time.Minute, |
| 130 | TaskQueue: taskQueue, |
| 131 | RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1}, |
| 132 | }) |
| 133 | |
| 134 | var tableSchema *protos.TableSchema |
| 135 | initTableSchema := func() error { |
| 136 | if tableSchema != nil { |
| 137 | return nil |
| 138 | } |
| 139 | |
| 140 | schemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ |
| 141 | StartToCloseTimeout: time.Minute, |
| 142 | WaitForCancellation: true, |
| 143 | RetryPolicy: &temporal.RetryPolicy{ |
| 144 | InitialInterval: 1 * time.Minute, |
| 145 | }, |
| 146 | }) |
| 147 | return workflow.ExecuteActivity( |
| 148 | schemaCtx, |
| 149 | snapshot.LoadTableSchema, |
| 150 | s.config.FlowJobName, |
| 151 | dstName, |
| 152 | ).Get(ctx, &tableSchema) |
| 153 | } |
| 154 | |
| 155 | numWorkers := uint32(8) |
| 156 | if s.config.SnapshotMaxParallelWorkers > 0 { |
| 157 | numWorkers = s.config.SnapshotMaxParallelWorkers |
| 158 | } |
| 159 |
no test coverage detected