MCPcopy
hub / github.com/PeerDB-io/peerdb / createCDCFlow

Method createCDCFlow

flow/cmd/handler.go:223–247  ·  view source on GitHub ↗
(
	ctx context.Context, connectionConfigs *protos.FlowConnectionConfigsCore, workflowID string,
)

Source from the content-addressed store, hash-verified

221}
222
223func (h *FlowRequestHandler) createCDCFlow(
224 ctx context.Context, connectionConfigs *protos.FlowConnectionConfigsCore, workflowID string,
225) (*protos.CreateCDCFlowResponse, error) {
226 workflowOptions := client.StartWorkflowOptions{
227 ID: workflowID,
228 TaskQueue: h.peerflowTaskQueueID,
229 TypedSearchAttributes: shared.NewSearchAttributes(connectionConfigs.FlowJobName),
230 WorkflowIDConflictPolicy: tEnums.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING, // two racing requests end up with the same workflow
231 WorkflowIDReusePolicy: tEnums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, // but creating the same id as a completed one is allowed
232 }
233
234 if err := h.createCdcJobEntry(ctx, connectionConfigs, workflowID, true); err != nil {
235 slog.ErrorContext(ctx, "unable to create flow job entry", slog.Any("error", err))
236 return nil, fmt.Errorf("unable to create flow job entry: %w", err)
237 }
238
239 if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.CDCFlowWorkflow, connectionConfigs, nil); err != nil {
240 slog.ErrorContext(ctx, "unable to start PeerFlow workflow", slog.Any("error", err))
241 return nil, fmt.Errorf("unable to start PeerFlow workflow: %w", err)
242 }
243
244 return &protos.CreateCDCFlowResponse{
245 WorkflowId: workflowID,
246 }, nil
247}
248
249func (h *FlowRequestHandler) CreateQRepFlow(
250 ctx context.Context, req *protos.CreateQRepFlowRequest,

Callers 2

CreateCDCFlowMethod · 0.95

Calls 2

createCdcJobEntryMethod · 0.95
ExecuteWorkflowMethod · 0.80

Tested by

no test coverage detected