( ctx workflow.Context, snapshotType snapshotType, slotName string, snapshotName string, stagingPathOverride string, maxParallelClones int, )
| 229 | } |
| 230 | |
| 231 | func (s *SnapshotFlowExecution) cloneTables( |
| 232 | ctx workflow.Context, |
| 233 | snapshotType snapshotType, |
| 234 | slotName string, |
| 235 | snapshotName string, |
| 236 | stagingPathOverride string, |
| 237 | maxParallelClones int, |
| 238 | ) error { |
| 239 | if snapshotType == SNAPSHOT_TYPE_SLOT { |
| 240 | s.logger.Info("cloning tables for slot", slog.String("slot", slotName), slog.String("snapshot", snapshotName)) |
| 241 | } else if snapshotType == SNAPSHOT_TYPE_TX { |
| 242 | s.logger.Info("cloning tables in tx snapshot mode", slog.String("snapshot", snapshotName)) |
| 243 | } |
| 244 | |
| 245 | getParallelLoadKeyForTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ |
| 246 | StartToCloseTimeout: 10 * time.Minute, |
| 247 | RetryPolicy: &temporal.RetryPolicy{ |
| 248 | InitialInterval: 1 * time.Minute, |
| 249 | }, |
| 250 | }) |
| 251 | |
| 252 | var res *protos.GetDefaultPartitionKeyForTablesOutput |
| 253 | if err := workflow.ExecuteActivity(getParallelLoadKeyForTablesCtx, |
| 254 | snapshot.GetDefaultPartitionKeyForTables, s.config).Get(ctx, &res); err != nil { |
| 255 | return fmt.Errorf("failed to get default partition keys for tables: %w", err) |
| 256 | } |
| 257 | |
| 258 | boundSelector := shared.NewBoundSelector(ctx, "CloneTablesSelector", maxParallelClones) |
| 259 | |
| 260 | sourcePeerType, err := getPeerType(ctx, s.config.SourceName) |
| 261 | if err != nil { |
| 262 | return err |
| 263 | } |
| 264 | destinationPeerType, err := getPeerType(ctx, s.config.DestinationName) |
| 265 | if err != nil { |
| 266 | return err |
| 267 | } |
| 268 | |
| 269 | for _, v := range s.config.TableMappings { |
| 270 | source := v.SourceTableIdentifier |
| 271 | destination := v.DestinationTableIdentifier |
| 272 | s.logger.Info( |
| 273 | fmt.Sprintf("Cloning table with source table %s and destination table name %s", source, destination), |
| 274 | slog.String("snapshotName", snapshotName), |
| 275 | ) |
| 276 | if v.PartitionKey == "" { |
| 277 | v.PartitionKey = res.TableDefaultPartitionKeyMapping[source] |
| 278 | } |
| 279 | if err := s.cloneTable(ctx, boundSelector, snapshotName, stagingPathOverride, v, sourcePeerType, destinationPeerType); err != nil { |
| 280 | s.logger.Error("failed to start clone child workflow", slog.Any("error", err)) |
| 281 | return err |
| 282 | } |
| 283 | } |
| 284 | |
| 285 | if err := boundSelector.Wait(ctx); err != nil { |
| 286 | s.logger.Error("failed to clone some tables", slog.Any("error", err)) |
| 287 | return err |
| 288 | } |
no test coverage detected