| 93 | } |
| 94 | |
| 95 | func (g GrpcProxyInterceptor) Stream(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { |
| 96 | ctx := stream.Context() |
| 97 | |
| 98 | target, err := g.lookup(ctx, info.FullMethod) |
| 99 | |
| 100 | if err != nil { |
| 101 | log.Println("[ERROR] grpc: error looking up route", err) |
| 102 | return status.Error(codes.Internal, "internal error") |
| 103 | } |
| 104 | |
| 105 | if target == nil { |
| 106 | g.StatsHandler.NoRoute.Add(1) |
| 107 | log.Println("[WARN] grpc: no route found for", info.FullMethod) |
| 108 | return status.Error(codes.NotFound, "no route found") |
| 109 | } |
| 110 | |
| 111 | ctx = context.WithValue(ctx, targetKey{}, target) |
| 112 | |
| 113 | proxyStream := proxyStream{ |
| 114 | ServerStream: stream, |
| 115 | ctx: ctx, |
| 116 | } |
| 117 | |
| 118 | start := time.Now() |
| 119 | |
| 120 | err = handler(srv, proxyStream) |
| 121 | |
| 122 | end := time.Now() |
| 123 | dur := end.Sub(start) |
| 124 | |
| 125 | target.Timer.Observe(dur.Seconds()) |
| 126 | |
| 127 | return err |
| 128 | } |
| 129 | |
| 130 | func (g GrpcProxyInterceptor) lookup(ctx context.Context, fullMethodName string) (*route.Target, error) { |
| 131 | pick := route.Picker[g.Config.Proxy.Strategy] |