MCPcopy
hub / github.com/PeerDB-io/peerdb / shutdownFlow

Method shutdownFlow

flow/cmd/handler.go:346–421  ·  view source on GitHub ↗
(
	ctx context.Context,
	flowJobName string,
	deleteStats bool,
	skipDestinationDrop bool,
)

Source from the content-addressed store, hash-verified

344}
345
346func (h *FlowRequestHandler) shutdownFlow(
347 ctx context.Context,
348 flowJobName string,
349 deleteStats bool,
350 skipDestinationDrop bool,
351) error {
352 workflowID, err := h.getWorkflowID(ctx, flowJobName)
353 if err != nil {
354 return err
355 }
356
357 logs := slog.Group("shutdown-log",
358 slog.String(string(shared.FlowNameKey), flowJobName),
359 slog.String("workflowId", workflowID),
360 )
361
362 if err := h.handleCancelWorkflow(ctx, workflowID, ""); err != nil {
363 slog.ErrorContext(ctx, "unable to cancel workflow", logs, slog.Any("error", err))
364 return fmt.Errorf("unable to wait for PeerFlow workflow to close: %w", err)
365 }
366
367 isCdc, err := h.isCDCFlow(ctx, flowJobName)
368 if err != nil {
369 slog.ErrorContext(ctx, "unable to check if workflow is cdc", logs, slog.Any("error", err))
370 return fmt.Errorf("unable to determine if workflow is cdc: %w", err)
371 }
372 var cdcConfig *protos.FlowConnectionConfigs
373 if isCdc {
374 cdcConfig, err = h.getFlowConfigFromCatalog(ctx, flowJobName)
375 if err != nil {
376 slog.ErrorContext(ctx, "unable to get cdc config from catalog", logs, slog.Any("error", err))
377 return fmt.Errorf("unable to get cdc config from catalog: %w", err)
378 }
379 }
380 dropFlowWorkflowID := fmt.Sprintf("%s-dropflow-%s", flowJobName, uuid.New())
381 workflowOptions := client.StartWorkflowOptions{
382 ID: dropFlowWorkflowID,
383 TaskQueue: h.peerflowTaskQueueID,
384 TypedSearchAttributes: shared.NewSearchAttributes(flowJobName),
385 }
386
387 dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.DropFlowWorkflow, &protos.DropFlowInput{
388 FlowJobName: flowJobName,
389 DropFlowStats: deleteStats,
390 FlowConnectionConfigs: pconv.FlowConnectionConfigsToCore(cdcConfig, 0),
391 SkipDestinationDrop: skipDestinationDrop,
392 // NOTE: Resync is false here during snapshot-only resync
393 })
394 if err != nil {
395 slog.ErrorContext(ctx, "unable to start DropFlow workflow", logs, slog.Any("error", err))
396 return fmt.Errorf("unable to start DropFlow workflow: %w", err)
397 }
398
399 cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
400 defer cancel()
401
402 errLatch := concurrency.NewLatch[error]()
403 go func() {

Callers 2

FlowStateChangeMethod · 0.95

Calls 10

getWorkflowIDMethod · 0.95
handleCancelWorkflowMethod · 0.95
isCDCFlowMethod · 0.95
ExecuteWorkflowMethod · 0.80
ChanMethod · 0.80
StringMethod · 0.45
SetMethod · 0.45
GetMethod · 0.45
WaitMethod · 0.45

Tested by

no test coverage detected