(ctx workflow.Context, input *protos.DropFlowInput)
| 124 | } |
| 125 | |
| 126 | func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { |
| 127 | activeSignal := model.NoopSignal |
| 128 | if input.Resync { |
| 129 | activeSignal = model.ResyncSignal |
| 130 | } else { |
| 131 | activeSignal = model.TerminateSignal |
| 132 | } |
| 133 | |
| 134 | if err := workflow.SetQueryHandler(ctx, shared.CDCFlowStateQuery, func() (cdc_state.CDCFlowWorkflowState, error) { |
| 135 | state := cdc_state.CDCFlowWorkflowState{DropFlowInput: input} |
| 136 | if input.Resync { |
| 137 | state.CurrentFlowStatus = protos.FlowStatus_STATUS_RESYNC |
| 138 | state.ActiveSignal = activeSignal |
| 139 | } else { |
| 140 | state.CurrentFlowStatus = protos.FlowStatus_STATUS_TERMINATING |
| 141 | state.ActiveSignal = activeSignal |
| 142 | } |
| 143 | return state, nil |
| 144 | }); err != nil { |
| 145 | return fmt.Errorf("failed to set `%s` query handler: %w", shared.CDCFlowStateQuery, err) |
| 146 | } |
| 147 | |
| 148 | logger := workflow.GetLogger(ctx) |
| 149 | |
| 150 | finalDrop := false |
| 151 | if input.Resync { |
| 152 | flowSignalChan := model.FlowSignal.GetSignalChannel(ctx) |
| 153 | flowSignalStateChangeChan := model.FlowSignalStateChange.GetSignalChannel(ctx) |
| 154 | workflow.Go(ctx, func(gCtx workflow.Context) { |
| 155 | sigSelector := workflow.NewNamedSelector(gCtx, input.FlowJobName+"-drop-signals") |
| 156 | sigSelector.AddReceive(gCtx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) |
| 157 | |
| 158 | flowSignalChan.AddToSelector(sigSelector, func(val model.CDCFlowSignal, _ bool) { |
| 159 | prev := activeSignal |
| 160 | activeSignal = model.FlowSignalHandler(activeSignal, val, logger) |
| 161 | if prev != model.TerminateSignal && activeSignal == model.TerminateSignal { |
| 162 | finalDrop = true |
| 163 | } |
| 164 | }) |
| 165 | |
| 166 | flowSignalStateChangeChan.AddToSelector(sigSelector, func(req *protos.FlowStateChangeRequest, _ bool) { |
| 167 | if req.RequestedFlowState == protos.FlowStatus_STATUS_TERMINATING { |
| 168 | activeSignal = model.TerminateSignal |
| 169 | input.DropFlowStats = req.DropMirrorStats |
| 170 | input.SkipDestinationDrop = req.SkipDestinationDrop |
| 171 | finalDrop = true |
| 172 | } |
| 173 | }) |
| 174 | |
| 175 | for gCtx.Err() == nil { |
| 176 | sigSelector.Select(gCtx) |
| 177 | } |
| 178 | }) |
| 179 | } |
| 180 | |
| 181 | status := protos.FlowStatus_STATUS_TERMINATING |
| 182 | if input.Resync { |
| 183 | status = protos.FlowStatus_STATUS_RESYNC |
nothing calls this directly
no test coverage detected