GetSchemaOverNetwork checks which group should be serving the schema according to fingerprint of the predicate and sends it to that instance.
(ctx context.Context, schema *pb.SchemaRequest)
| 180 | // GetSchemaOverNetwork checks which group should be serving the schema |
| 181 | // according to fingerprint of the predicate and sends it to that instance. |
| 182 | func GetSchemaOverNetwork(ctx context.Context, schema *pb.SchemaRequest) ( |
| 183 | []*pb.SchemaNode, error) { |
| 184 | |
| 185 | ctx, span := otel.Tracer("").Start(ctx, "worker.GetSchemaOverNetwork") |
| 186 | defer span.End() |
| 187 | |
| 188 | // There was a health check here which is not needed. The health check should be done by the |
| 189 | // receiver of the request, not the sender. |
| 190 | |
| 191 | if len(schema.Predicates) == 0 && len(schema.Types) > 0 { |
| 192 | return nil, nil |
| 193 | } |
| 194 | |
| 195 | // Map of groupd id => Predicates for that group. |
| 196 | schemaMap := make(map[uint32]*pb.SchemaRequest) |
| 197 | if err := addToSchemaMap(schemaMap, schema); err != nil { |
| 198 | return nil, err |
| 199 | } |
| 200 | |
| 201 | results := make(chan resultErr, len(schemaMap)) |
| 202 | var schemaNodes []*pb.SchemaNode |
| 203 | |
| 204 | for gid, s := range schemaMap { |
| 205 | go getSchemaOverNetwork(ctx, gid, s, results) |
| 206 | } |
| 207 | |
| 208 | // wait for all the goroutines to reply back. |
| 209 | // we return if an error was returned or the parent called ctx.Done() |
| 210 | for range schemaMap { |
| 211 | select { |
| 212 | case r := <-results: |
| 213 | if r.err != nil { |
| 214 | return nil, r.err |
| 215 | } |
| 216 | schemaNodes = append(schemaNodes, r.result.Schema...) |
| 217 | case <-ctx.Done(): |
| 218 | return nil, ctx.Err() |
| 219 | } |
| 220 | } |
| 221 | |
| 222 | return schemaNodes, nil |
| 223 | } |
| 224 | |
| 225 | // Schema is used to get schema information over the network on other instances. |
| 226 | func (w *grpcWorker) Schema(ctx context.Context, s *pb.SchemaRequest) (*pb.SchemaResult, error) { |
no test coverage detected