MCPcopy
hub / github.com/mudler/LocalAI / Forward

Method Forward

backend/go/cloud-proxy/proxy.go:268–390  ·  view source on GitHub ↗

Forward shovels bytes between a Forward gRPC stream and an upstream HTTP request. First request message carries path/method/headers and the initial body chunk; subsequent messages append body chunks. The first reply carries upstream status + response headers; subsequent replies stream body chunks un

(ctx context.Context, in <-chan *pb.ForwardRequest, out chan<- *pb.ForwardReply)

Source from the content-addressed store, hash-verified

266// Cancellation of ctx (the gRPC stream context) closes the upstream
267// connection.
268func (c *CloudProxy) Forward(ctx context.Context, in <-chan *pb.ForwardRequest, out chan<- *pb.ForwardReply) error {
269 defer close(out)
270
271 cfg := c.cfg.Load()
272 if cfg == nil {
273 return grpcerrors.ModelNotLoaded("cloud-proxy")
274 }
275 if cfg.mode != modePassthrough {
276 return fmt.Errorf("cloud-proxy: Forward only valid in passthrough mode (have %s)", cfg.mode)
277 }
278
279 first, ok := <-in
280 if !ok {
281 return errors.New("cloud-proxy: Forward stream closed before first request")
282 }
283
284 // Honour the per-request path only when the configured upstream_url
285 // has no path of its own — gallery convention is to put the
286 // canonical path in upstream_url.
287 fullURL, err := composeURL(cfg.upstreamURL, first.GetPath())
288 if err != nil {
289 return err
290 }
291
292 method := first.GetMethod()
293 if method == "" {
294 method = http.MethodPost
295 }
296
297 // Pipe the body in from the gRPC stream so the HTTP request can
298 // start before the client finishes sending. The pipe-reader is
299 // closed via CloseWithError on the error paths so the writer
300 // goroutine doesn't block forever.
301 pr, pw := io.Pipe()
302
303 go func() {
304 var writeErr error
305 defer func() { _ = pw.CloseWithError(writeErr) }()
306 if len(first.GetBodyChunk()) > 0 {
307 if _, writeErr = pw.Write(first.GetBodyChunk()); writeErr != nil {
308 return
309 }
310 }
311 for req := range in {
312 if len(req.GetBodyChunk()) == 0 {
313 continue
314 }
315 if _, writeErr = pw.Write(req.GetBodyChunk()); writeErr != nil {
316 return
317 }
318 }
319 }()
320
321 req, err := http.NewRequestWithContext(ctx, method, fullURL, pr)
322 if err != nil {
323 _ = pr.CloseWithError(err) // unblocks the body-pump's pw.Write
324 return fmt.Errorf("cloud-proxy: build request: %w", err)
325 }

Callers

nothing calls this directly

Calls 12

ModelNotLoadedFunction · 0.92
composeURLFunction · 0.85
isHopByHopHeaderFunction · 0.85
applyAuthHeaderFunction · 0.85
AddMethod · 0.80
LoadMethod · 0.65
GetNameMethod · 0.65
CloseMethod · 0.65
copyFunction · 0.50
WriteMethod · 0.45
ReadMethod · 0.45
IsMethod · 0.45

Tested by

no test coverage detected