( ctx workflow.Context, sessionCtx workflow.Context, numTablesInParallel int, )
| 292 | } |
| 293 | |
| 294 | func (s *SnapshotFlowExecution) cloneTablesWithSlot( |
| 295 | ctx workflow.Context, |
| 296 | sessionCtx workflow.Context, |
| 297 | numTablesInParallel int, |
| 298 | ) error { |
| 299 | slotInfo, err := s.setupReplication(sessionCtx) |
| 300 | if err != nil { |
| 301 | return fmt.Errorf("failed to setup replication: %w", err) |
| 302 | } |
| 303 | defer func() { |
| 304 | dCtx, cancel := workflow.NewDisconnectedContext(sessionCtx) |
| 305 | defer cancel() |
| 306 | if err := s.closeSlotKeepAlive(dCtx); err != nil { |
| 307 | s.logger.Error("failed to close slot keep alive", slog.Any("error", err)) |
| 308 | } |
| 309 | }() |
| 310 | var slotName string |
| 311 | var snapshotName string |
| 312 | if slotInfo != nil { |
| 313 | slotName = slotInfo.SlotName |
| 314 | snapshotName = slotInfo.SnapshotName |
| 315 | } |
| 316 | |
| 317 | s.logger.Info("cloning tables in parallel", slog.Int("parallelism", numTablesInParallel)) |
| 318 | if err := s.cloneTables(ctx, |
| 319 | SNAPSHOT_TYPE_SLOT, |
| 320 | slotName, |
| 321 | snapshotName, |
| 322 | "", |
| 323 | numTablesInParallel, |
| 324 | ); err != nil { |
| 325 | s.logger.Error("failed to clone tables", slog.Any("error", err)) |
| 326 | return fmt.Errorf("failed to clone tables: %w", err) |
| 327 | } |
| 328 | |
| 329 | return nil |
| 330 | } |
| 331 | |
| 332 | func SnapshotFlowWorkflow( |
| 333 | ctx workflow.Context, |
no test coverage detected