Forward shovels bytes between a Forward gRPC stream and an upstream HTTP request. First request message carries path/method/headers and the initial body chunk; subsequent messages append body chunks. The first reply carries upstream status + response headers; subsequent replies stream body chunks un
(ctx context.Context, in <-chan *pb.ForwardRequest, out chan<- *pb.ForwardReply)
| 266 | // Cancellation of ctx (the gRPC stream context) closes the upstream |
| 267 | // connection. |
| 268 | func (c *CloudProxy) Forward(ctx context.Context, in <-chan *pb.ForwardRequest, out chan<- *pb.ForwardReply) error { |
| 269 | defer close(out) |
| 270 | |
| 271 | cfg := c.cfg.Load() |
| 272 | if cfg == nil { |
| 273 | return grpcerrors.ModelNotLoaded("cloud-proxy") |
| 274 | } |
| 275 | if cfg.mode != modePassthrough { |
| 276 | return fmt.Errorf("cloud-proxy: Forward only valid in passthrough mode (have %s)", cfg.mode) |
| 277 | } |
| 278 | |
| 279 | first, ok := <-in |
| 280 | if !ok { |
| 281 | return errors.New("cloud-proxy: Forward stream closed before first request") |
| 282 | } |
| 283 | |
| 284 | // Honour the per-request path only when the configured upstream_url |
| 285 | // has no path of its own — gallery convention is to put the |
| 286 | // canonical path in upstream_url. |
| 287 | fullURL, err := composeURL(cfg.upstreamURL, first.GetPath()) |
| 288 | if err != nil { |
| 289 | return err |
| 290 | } |
| 291 | |
| 292 | method := first.GetMethod() |
| 293 | if method == "" { |
| 294 | method = http.MethodPost |
| 295 | } |
| 296 | |
| 297 | // Pipe the body in from the gRPC stream so the HTTP request can |
| 298 | // start before the client finishes sending. The pipe-reader is |
| 299 | // closed via CloseWithError on the error paths so the writer |
| 300 | // goroutine doesn't block forever. |
| 301 | pr, pw := io.Pipe() |
| 302 | |
| 303 | go func() { |
| 304 | var writeErr error |
| 305 | defer func() { _ = pw.CloseWithError(writeErr) }() |
| 306 | if len(first.GetBodyChunk()) > 0 { |
| 307 | if _, writeErr = pw.Write(first.GetBodyChunk()); writeErr != nil { |
| 308 | return |
| 309 | } |
| 310 | } |
| 311 | for req := range in { |
| 312 | if len(req.GetBodyChunk()) == 0 { |
| 313 | continue |
| 314 | } |
| 315 | if _, writeErr = pw.Write(req.GetBodyChunk()); writeErr != nil { |
| 316 | return |
| 317 | } |
| 318 | } |
| 319 | }() |
| 320 | |
| 321 | req, err := http.NewRequestWithContext(ctx, method, fullURL, pr) |
| 322 | if err != nil { |
| 323 | _ = pr.CloseWithError(err) // unblocks the body-pump's pw.Write |
| 324 | return fmt.Errorf("cloud-proxy: build request: %w", err) |
| 325 | } |
nothing calls this directly
no test coverage detected