MCPcopy
hub / github.com/PeerDB-io/peerdb / CreateCDCFlow

Method CreateCDCFlow

flow/cmd/handler.go:158–221  ·  view source on GitHub ↗
(
	ctx context.Context, req *protos.CreateCDCFlowRequest,
)

Source from the content-addressed store, hash-verified

156}
157
158func (h *FlowRequestHandler) CreateCDCFlow(
159 ctx context.Context, req *protos.CreateCDCFlowRequest,
160) (*protos.CreateCDCFlowResponse, APIError) {
161 cfg := req.ConnectionConfigs
162 if cfg == nil {
163 return nil, NewInvalidArgumentApiError(fmt.Errorf("connection configs cannot be nil"))
164 }
165 if internalVersion, err := internal.PeerDBForceInternalVersion(ctx, cfg.Env); err != nil {
166 return nil, NewInternalApiError(err)
167 } else {
168 cfg.Version = internalVersion
169 }
170 if flags, err := h.determineFlags(ctx, cfg.Env, cfg.DestinationName); err != nil {
171 return nil, NewInternalApiError(err)
172 } else {
173 cfg.Flags = flags
174 }
175
176 if !req.AttachToExisting {
177 if exists, err := h.cdcJobEntryExists(ctx, cfg.FlowJobName); err != nil {
178 return nil, NewInternalApiError(fmt.Errorf("unable to check flow job entry: %w", err))
179 } else if exists {
180 return nil, NewAlreadyExistsApiError(fmt.Errorf("flow already exists: %s", cfg.FlowJobName))
181 }
182 }
183
184 workflowID := getWorkflowID(cfg.FlowJobName)
185 desc, err := h.temporalClient.DescribeWorkflow(ctx, workflowID, "")
186 if _, ok := errors.AsType[*serviceerror.NotFound](err); err != nil && !ok {
187 return nil, NewInternalApiError(fmt.Errorf("failed to query the workflow execution: %w", err))
188 } else if err == nil {
189 // If workflow is actively running, handle based on AttachToExisting
190 // Workflows in terminal states are fine
191 if desc.WorkflowExecutionMetadata.Status == tEnums.WORKFLOW_EXECUTION_STATUS_RUNNING ||
192 desc.WorkflowExecutionMetadata.Status == tEnums.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW {
193 if req.AttachToExisting {
194 // Idempotent attach to running workflow
195 return &protos.CreateCDCFlowResponse{
196 WorkflowId: workflowID,
197 }, nil
198 } else {
199 // Can't create duplicate of running workflow
200 return nil, NewAlreadyExistsApiError(fmt.Errorf("workflow already exists for flow: %s", cfg.FlowJobName))
201 }
202 }
203 }
204 // No running workflow, do the validations and start a new one
205
206 // Use idempotent validation that skips mirror existence check
207 connectionConfigsCore := pconv.FlowConnectionConfigsToCore(req.ConnectionConfigs, 0)
208 if connectionConfigsCore.SkipValidation == nil || !*connectionConfigsCore.SkipValidation {
209 if _, err := h.validateCDCMirrorImpl(ctx, connectionConfigsCore, true); err != nil {
210 slog.ErrorContext(ctx, "validate mirror error", slog.Any("error", err))
211 return nil, NewInternalApiError(fmt.Errorf("invalid mirror: %w", err))
212 }
213 }
214
215 if resp, err := h.createCDCFlow(ctx, connectionConfigsCore, workflowID); err != nil {

Calls 8

determineFlagsMethod · 0.95
cdcJobEntryExistsMethod · 0.95
validateCDCMirrorImplMethod · 0.95
createCDCFlowMethod · 0.95
NewInternalApiErrorFunction · 0.85
NewAlreadyExistsApiErrorFunction · 0.85
getWorkflowIDFunction · 0.85