(ctx context.Context, req *protos.CreateQRepFlowRequest, workflowID string, )
| 117 | } |
| 118 | |
| 119 | func (h *FlowRequestHandler) createQRepJobEntry(ctx context.Context, |
| 120 | req *protos.CreateQRepFlowRequest, workflowID string, |
| 121 | ) error { |
| 122 | sourcePeerName := req.QrepConfig.SourceName |
| 123 | sourcePeerID, srcErr := h.getPeerID(ctx, sourcePeerName) |
| 124 | if srcErr != nil { |
| 125 | return fmt.Errorf("unable to get peer id for source peer %s: %w", |
| 126 | sourcePeerName, srcErr) |
| 127 | } |
| 128 | |
| 129 | destinationPeerName := req.QrepConfig.DestinationName |
| 130 | destinationPeerID, dstErr := h.getPeerID(ctx, destinationPeerName) |
| 131 | if dstErr != nil { |
| 132 | return fmt.Errorf("unable to get peer id for target peer %s: %w", |
| 133 | destinationPeerName, dstErr) |
| 134 | } |
| 135 | |
| 136 | cfgBytes, err := proto.Marshal(req.QrepConfig) |
| 137 | if err != nil { |
| 138 | return fmt.Errorf("unable to marshal qrep config: %w", err) |
| 139 | } |
| 140 | |
| 141 | flowName := req.QrepConfig.FlowJobName |
| 142 | if _, err := h.pool.Exec(ctx, `INSERT INTO flows(workflow_id,name,source_peer,destination_peer,config_proto,status, |
| 143 | description, query_string) VALUES ($1,$2,$3,$4,$5,$6,'gRPC',$7) |
| 144 | `, workflowID, flowName, sourcePeerID, destinationPeerID, cfgBytes, protos.FlowStatus_STATUS_RUNNING, |
| 145 | req.QrepConfig.Query, |
| 146 | ); err != nil { |
| 147 | return fmt.Errorf("unable to insert into flows table for flow %s with source table %s: %w", |
| 148 | flowName, req.QrepConfig.WatermarkTable, err) |
| 149 | } |
| 150 | |
| 151 | return nil |
| 152 | } |
| 153 | |
| 154 | func getWorkflowID(flowName string) string { |
| 155 | return flowName + "-peerflow" |
no test coverage detected