MCPcopy
hub / github.com/PeerDB-io/peerdb / DropFlowWorkflow

Function DropFlowWorkflow

flow/workflows/drop_flow.go:126–276  ·  view source on GitHub ↗
(ctx workflow.Context, input *protos.DropFlowInput)

Source from the content-addressed store, hash-verified

124}
125
126func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error {
127 activeSignal := model.NoopSignal
128 if input.Resync {
129 activeSignal = model.ResyncSignal
130 } else {
131 activeSignal = model.TerminateSignal
132 }
133
134 if err := workflow.SetQueryHandler(ctx, shared.CDCFlowStateQuery, func() (cdc_state.CDCFlowWorkflowState, error) {
135 state := cdc_state.CDCFlowWorkflowState{DropFlowInput: input}
136 if input.Resync {
137 state.CurrentFlowStatus = protos.FlowStatus_STATUS_RESYNC
138 state.ActiveSignal = activeSignal
139 } else {
140 state.CurrentFlowStatus = protos.FlowStatus_STATUS_TERMINATING
141 state.ActiveSignal = activeSignal
142 }
143 return state, nil
144 }); err != nil {
145 return fmt.Errorf("failed to set `%s` query handler: %w", shared.CDCFlowStateQuery, err)
146 }
147
148 logger := workflow.GetLogger(ctx)
149
150 finalDrop := false
151 if input.Resync {
152 flowSignalChan := model.FlowSignal.GetSignalChannel(ctx)
153 flowSignalStateChangeChan := model.FlowSignalStateChange.GetSignalChannel(ctx)
154 workflow.Go(ctx, func(gCtx workflow.Context) {
155 sigSelector := workflow.NewNamedSelector(gCtx, input.FlowJobName+"-drop-signals")
156 sigSelector.AddReceive(gCtx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})
157
158 flowSignalChan.AddToSelector(sigSelector, func(val model.CDCFlowSignal, _ bool) {
159 prev := activeSignal
160 activeSignal = model.FlowSignalHandler(activeSignal, val, logger)
161 if prev != model.TerminateSignal && activeSignal == model.TerminateSignal {
162 finalDrop = true
163 }
164 })
165
166 flowSignalStateChangeChan.AddToSelector(sigSelector, func(req *protos.FlowStateChangeRequest, _ bool) {
167 if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATING {
168 activeSignal = model.TerminateSignal
169 input.DropFlowStats = req.DropMirrorStats
170 input.SkipDestinationDrop = req.SkipDestinationDrop
171 finalDrop = true
172 }
173 })
174
175 for gCtx.Err() == nil {
176 sigSelector.Select(gCtx)
177 }
178 })
179 }
180
181 status := protos.FlowStatus_STATUS_TERMINATING
182 if input.Resync {
183 status = protos.FlowStatus_STATUS_RESYNC

Callers

nothing calls this directly

Calls 10

GetFlowMetadataContextFunction · 0.85
executeCDCDropActivitiesFunction · 0.85
GetSignalChannelMethod · 0.80
AddToSelectorMethod · 0.80
InfoMethod · 0.80
ExecuteActivityMethod · 0.80
ErrMethod · 0.65
StringMethod · 0.45
GetMethod · 0.45
ErrorMethod · 0.45

Tested by

no test coverage detected