MCPcopy
hub / github.com/PeerDB-io/peerdb / createQRepJobEntry

Method createQRepJobEntry

flow/cmd/handler.go:119–152  ·  view source on GitHub ↗
(ctx context.Context,
	req *protos.CreateQRepFlowRequest, workflowID string,
)

Source from the content-addressed store, hash-verified

117}
118
119func (h *FlowRequestHandler) createQRepJobEntry(ctx context.Context,
120 req *protos.CreateQRepFlowRequest, workflowID string,
121) error {
122 sourcePeerName := req.QrepConfig.SourceName
123 sourcePeerID, srcErr := h.getPeerID(ctx, sourcePeerName)
124 if srcErr != nil {
125 return fmt.Errorf("unable to get peer id for source peer %s: %w",
126 sourcePeerName, srcErr)
127 }
128
129 destinationPeerName := req.QrepConfig.DestinationName
130 destinationPeerID, dstErr := h.getPeerID(ctx, destinationPeerName)
131 if dstErr != nil {
132 return fmt.Errorf("unable to get peer id for target peer %s: %w",
133 destinationPeerName, dstErr)
134 }
135
136 cfgBytes, err := proto.Marshal(req.QrepConfig)
137 if err != nil {
138 return fmt.Errorf("unable to marshal qrep config: %w", err)
139 }
140
141 flowName := req.QrepConfig.FlowJobName
142 if _, err := h.pool.Exec(ctx, `INSERT INTO flows(workflow_id,name,source_peer,destination_peer,config_proto,status,
143 description, query_string) VALUES ($1,$2,$3,$4,$5,$6,'gRPC',$7)
144 `, workflowID, flowName, sourcePeerID, destinationPeerID, cfgBytes, protos.FlowStatus_STATUS_RUNNING,
145 req.QrepConfig.Query,
146 ); err != nil {
147 return fmt.Errorf("unable to insert into flows table for flow %s with source table %s: %w",
148 flowName, req.QrepConfig.WatermarkTable, err)
149 }
150
151 return nil
152}
153
154func getWorkflowID(flowName string) string {
155 return flowName + "-peerflow"

Callers 1

CreateQRepFlowMethod · 0.95

Calls 2

getPeerIDMethod · 0.95
ExecMethod · 0.65

Tested by

no test coverage detected