MCPcopy Index your code
hub / github.com/PeerDB-io/peerdb / dropFlow

Method dropFlow

flow/cmd/handler.go:301–344  ·  view source on GitHub ↗
(
	ctx context.Context,
	flowJobName string,
	deleteStats bool,
)

Source from the content-addressed store, hash-verified

299}
300
301func (h *FlowRequestHandler) dropFlow(
302 ctx context.Context,
303 flowJobName string,
304 deleteStats bool,
305) error {
306 logs := slog.Group("shutdown-log", slog.String(string(shared.FlowNameKey), flowJobName))
307
308 isCdc, err := h.isCDCFlow(ctx, flowJobName)
309 if err != nil {
310 slog.ErrorContext(ctx, "unable to check if workflow is cdc", logs, slog.Any("error", err))
311 return fmt.Errorf("unable to determine if workflow is cdc: %w", err)
312 }
313 var cdcConfig *protos.FlowConnectionConfigs
314 if isCdc {
315 cdcConfig, err = h.getFlowConfigFromCatalog(ctx, flowJobName)
316 if err != nil {
317 slog.ErrorContext(ctx, "unable to get cdc config from catalog", logs, slog.Any("error", err))
318 return fmt.Errorf("unable to get cdc config from catalog: %w", err)
319 }
320 }
321
322 dropFlowWorkflowID := fmt.Sprintf("%s-dropflow-%s", flowJobName, uuid.New())
323 workflowOptions := client.StartWorkflowOptions{
324 ID: dropFlowWorkflowID,
325 TaskQueue: h.peerflowTaskQueueID,
326 TypedSearchAttributes: shared.NewSearchAttributes(flowJobName),
327 }
328
329 if dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.DropFlowWorkflow, &protos.DropFlowInput{
330 FlowJobName: flowJobName,
331 DropFlowStats: deleteStats,
332 FlowConnectionConfigs: pconv.FlowConnectionConfigsToCore(cdcConfig, 0),
333 SkipDestinationDrop: true,
334 SkipSourceDrop: true,
335 }); err != nil {
336 slog.ErrorContext(ctx, "unable to start DropFlow workflow", logs, slog.Any("error", err))
337 return fmt.Errorf("unable to start DropFlow workflow: %w", err)
338 } else if err := dropFlowHandle.Get(ctx, nil); err != nil {
339 slog.ErrorContext(ctx, "DropFlow workflow did not execute successfully", logs, slog.Any("error", err))
340 return fmt.Errorf("DropFlow workflow did not execute successfully: %w", err)
341 }
342
343 return nil
344}
345
346func (h *FlowRequestHandler) shutdownFlow(
347 ctx context.Context,

Callers 1

FlowStateChangeMethod · 0.95

Calls 5

isCDCFlowMethod · 0.95
ExecuteWorkflowMethod · 0.80
StringMethod · 0.45
GetMethod · 0.45

Tested by

no test coverage detected