MCPcopy
hub / github.com/PeerDB-io/peerdb / createCdcJobEntry

Method createCdcJobEntry

flow/cmd/handler.go:87–117  ·  view source on GitHub ↗
(ctx context.Context,
	connectionConfigs *protos.FlowConnectionConfigsCore, workflowID string, idempotent bool,
)

Source from the content-addressed store, hash-verified

85}
86
87func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
88 connectionConfigs *protos.FlowConnectionConfigsCore, workflowID string, idempotent bool,
89) error {
90 sourcePeerID, srcErr := h.getPeerID(ctx, connectionConfigs.SourceName)
91 if srcErr != nil {
92 return fmt.Errorf("unable to get peer id for source peer %s: %w",
93 connectionConfigs.SourceName, srcErr)
94 }
95
96 destinationPeerID, dstErr := h.getPeerID(ctx, connectionConfigs.DestinationName)
97 if dstErr != nil {
98 return fmt.Errorf("unable to get peer id for target peer %s: %w",
99 connectionConfigs.DestinationName, dstErr)
100 }
101
102 cfgBytes, err := proto.Marshal(connectionConfigs)
103 if err != nil {
104 return fmt.Errorf("unable to marshal flow config: %w", err)
105 }
106
107 if _, err = h.pool.Exec(ctx,
108 `INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, description)
109 VALUES ($1,$2,$3,$4,$5,$6,'gRPC')`,
110 workflowID, connectionConfigs.FlowJobName, sourcePeerID, destinationPeerID, cfgBytes, protos.FlowStatus_STATUS_SETUP,
111 ); err != nil && !(idempotent && shared.IsSQLStateError(err, pgerrcode.UniqueViolation)) {
112 return fmt.Errorf("unable to insert into flows table for flow %s: %w",
113 connectionConfigs.FlowJobName, err)
114 }
115
116 return nil
117}
118
119func (h *FlowRequestHandler) createQRepJobEntry(ctx context.Context,
120 req *protos.CreateQRepFlowRequest, workflowID string,

Callers 1

createCDCFlowMethod · 0.95

Calls 2

getPeerIDMethod · 0.95
ExecMethod · 0.65

Tested by

no test coverage detected