(ctx context.Context, connectionConfigs *protos.FlowConnectionConfigsCore, workflowID string, idempotent bool, )
| 85 | } |
| 86 | |
| 87 | func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context, |
| 88 | connectionConfigs *protos.FlowConnectionConfigsCore, workflowID string, idempotent bool, |
| 89 | ) error { |
| 90 | sourcePeerID, srcErr := h.getPeerID(ctx, connectionConfigs.SourceName) |
| 91 | if srcErr != nil { |
| 92 | return fmt.Errorf("unable to get peer id for source peer %s: %w", |
| 93 | connectionConfigs.SourceName, srcErr) |
| 94 | } |
| 95 | |
| 96 | destinationPeerID, dstErr := h.getPeerID(ctx, connectionConfigs.DestinationName) |
| 97 | if dstErr != nil { |
| 98 | return fmt.Errorf("unable to get peer id for target peer %s: %w", |
| 99 | connectionConfigs.DestinationName, dstErr) |
| 100 | } |
| 101 | |
| 102 | cfgBytes, err := proto.Marshal(connectionConfigs) |
| 103 | if err != nil { |
| 104 | return fmt.Errorf("unable to marshal flow config: %w", err) |
| 105 | } |
| 106 | |
| 107 | if _, err = h.pool.Exec(ctx, |
| 108 | `INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, description) |
| 109 | VALUES ($1,$2,$3,$4,$5,$6,'gRPC')`, |
| 110 | workflowID, connectionConfigs.FlowJobName, sourcePeerID, destinationPeerID, cfgBytes, protos.FlowStatus_STATUS_SETUP, |
| 111 | ); err != nil && !(idempotent && shared.IsSQLStateError(err, pgerrcode.UniqueViolation)) { |
| 112 | return fmt.Errorf("unable to insert into flows table for flow %s: %w", |
| 113 | connectionConfigs.FlowJobName, err) |
| 114 | } |
| 115 | |
| 116 | return nil |
| 117 | } |
| 118 | |
| 119 | func (h *FlowRequestHandler) createQRepJobEntry(ctx context.Context, |
| 120 | req *protos.CreateQRepFlowRequest, workflowID string, |
no test coverage detected