MCPcopy
hub / github.com/fabiolb/fabio / Stream

Method Stream

proxy/grpc_handler.go:95–128  ·  view source on GitHub ↗
(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler)

Source from the content-addressed store, hash-verified

93}
94
95func (g GrpcProxyInterceptor) Stream(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
96 ctx := stream.Context()
97
98 target, err := g.lookup(ctx, info.FullMethod)
99
100 if err != nil {
101 log.Println("[ERROR] grpc: error looking up route", err)
102 return status.Error(codes.Internal, "internal error")
103 }
104
105 if target == nil {
106 g.StatsHandler.NoRoute.Add(1)
107 log.Println("[WARN] grpc: no route found for", info.FullMethod)
108 return status.Error(codes.NotFound, "no route found")
109 }
110
111 ctx = context.WithValue(ctx, targetKey{}, target)
112
113 proxyStream := proxyStream{
114 ServerStream: stream,
115 ctx: ctx,
116 }
117
118 start := time.Now()
119
120 err = handler(srv, proxyStream)
121
122 end := time.Now()
123 dur := end.Sub(start)
124
125 target.Timer.Observe(dur.Seconds())
126
127 return err
128}
129
130func (g GrpcProxyInterceptor) lookup(ctx context.Context, fullMethodName string) (*route.Target, error) {
131 pick := route.Picker[g.Config.Proxy.Strategy]

Callers

nothing calls this directly

Calls 5

lookupMethod · 0.95
ContextMethod · 0.80
ErrorMethod · 0.80
AddMethod · 0.45
ObserveMethod · 0.45

Tested by

no test coverage detected