(quicStream quic.Stream)
| 167 | } |
| 168 | |
| 169 | func (q *quicConnection) runStream(quicStream quic.Stream) { |
| 170 | ctx := quicStream.Context() |
| 171 | stream := cfdquic.NewSafeStreamCloser(quicStream, q.streamWriteTimeout, q.logger) |
| 172 | defer func() { _ = stream.Close() }() |
| 173 | |
| 174 | // we are going to fuse readers/writers from stream <- cloudflared -> origin, and we want to guarantee that |
| 175 | // code executed in the code path of handleStream don't trigger an earlier close to the downstream write stream. |
| 176 | // So, we wrap the stream with a no-op write closer and only this method can actually close write side of the stream. |
| 177 | // A call to close will simulate a close to the read-side, which will fail subsequent reads. |
| 178 | noCloseStream := &nopCloserReadWriter{ReadWriteCloser: stream} |
| 179 | ss := rpcquic.NewCloudflaredServer(q.handleDataStream, q.datagramHandler, q, q.rpcTimeout) |
| 180 | if err := ss.Serve(ctx, noCloseStream); err != nil { |
| 181 | q.logger.Debug().Err(err).Msg("Failed to handle QUIC stream") |
| 182 | |
| 183 | // if we received an error at this level, then close write side of stream with an error, which will result in |
| 184 | // RST_STREAM frame. |
| 185 | quicStream.CancelWrite(0) |
| 186 | } |
| 187 | } |
| 188 | |
| 189 | func (q *quicConnection) handleDataStream(ctx context.Context, stream *rpcquic.RequestServerStream) error { |
| 190 | request, err := stream.ReadConnectRequestData() |
no test coverage detected