MCPcopy
hub / github.com/PeerDB-io/peerdb / cloneTablesWithSlot

Method cloneTablesWithSlot

flow/workflows/snapshot_flow.go:294–330  ·  view source on GitHub ↗
(
	ctx workflow.Context,
	sessionCtx workflow.Context,
	numTablesInParallel int,
)

Source from the content-addressed store, hash-verified

292}
293
294func (s *SnapshotFlowExecution) cloneTablesWithSlot(
295 ctx workflow.Context,
296 sessionCtx workflow.Context,
297 numTablesInParallel int,
298) error {
299 slotInfo, err := s.setupReplication(sessionCtx)
300 if err != nil {
301 return fmt.Errorf("failed to setup replication: %w", err)
302 }
303 defer func() {
304 dCtx, cancel := workflow.NewDisconnectedContext(sessionCtx)
305 defer cancel()
306 if err := s.closeSlotKeepAlive(dCtx); err != nil {
307 s.logger.Error("failed to close slot keep alive", slog.Any("error", err))
308 }
309 }()
310 var slotName string
311 var snapshotName string
312 if slotInfo != nil {
313 slotName = slotInfo.SlotName
314 snapshotName = slotInfo.SnapshotName
315 }
316
317 s.logger.Info("cloning tables in parallel", slog.Int("parallelism", numTablesInParallel))
318 if err := s.cloneTables(ctx,
319 SNAPSHOT_TYPE_SLOT,
320 slotName,
321 snapshotName,
322 "",
323 numTablesInParallel,
324 ); err != nil {
325 s.logger.Error("failed to clone tables", slog.Any("error", err))
326 return fmt.Errorf("failed to clone tables: %w", err)
327 }
328
329 return nil
330}
331
332func SnapshotFlowWorkflow(
333 ctx workflow.Context,

Callers 1

SnapshotFlowWorkflowFunction · 0.95

Calls 5

setupReplicationMethod · 0.95
closeSlotKeepAliveMethod · 0.95
cloneTablesMethod · 0.95
InfoMethod · 0.80
ErrorMethod · 0.45

Tested by

no test coverage detected