(ctx context.Context, stream *rpcquic.RequestServerStream)
| 187 | } |
| 188 | |
| 189 | func (q *quicConnection) handleDataStream(ctx context.Context, stream *rpcquic.RequestServerStream) error { |
| 190 | request, err := stream.ReadConnectRequestData() |
| 191 | if err != nil { |
| 192 | return err |
| 193 | } |
| 194 | |
| 195 | if err, connectResponseSent := q.dispatchRequest(ctx, stream, request); err != nil { |
| 196 | q.logger.Err(err).Str("type", request.Type.String()).Str("dest", request.Dest).Msg("Request failed") |
| 197 | |
| 198 | // if the connectResponse was already sent and we had an error, we need to propagate it up, so that the stream is |
| 199 | // closed with an RST_STREAM frame |
| 200 | if connectResponseSent { |
| 201 | return err |
| 202 | } |
| 203 | |
| 204 | var metadata []pogs.Metadata |
| 205 | // Check the type of error that was throw and add metadata that will help identify it on OTD. |
| 206 | if errors.Is(err, cfdflow.ErrTooManyActiveFlows) { |
| 207 | metadata = append(metadata, pogs.ErrorFlowConnectRateLimitedMetadata) |
| 208 | } |
| 209 | |
| 210 | if writeRespErr := stream.WriteConnectResponseData(err, metadata...); writeRespErr != nil { |
| 211 | return writeRespErr |
| 212 | } |
| 213 | } |
| 214 | |
| 215 | return nil |
| 216 | } |
| 217 | |
| 218 | // dispatchRequest will dispatch the request to the origin depending on the type and returns an error if it occurs. |
| 219 | // Also returns if the connect response was sent to the downstream during processing of the origin request. |
nothing calls this directly
no test coverage detected