(ctx context.Context)
| 65 | } |
| 66 | |
| 67 | func (c *InFlightTrackingClient) track(ctx context.Context) func() { |
| 68 | if err := c.registry.IncrementInFlight(ctx, c.nodeID, c.modelName, c.replicaIndex); err != nil { |
| 69 | xlog.Warn("Failed to increment in-flight counter", "node", c.nodeID, "model", c.modelName, "replica", c.replicaIndex, "error", err) |
| 70 | return func() {} |
| 71 | } |
| 72 | return func() { |
| 73 | decCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 74 | defer cancel() |
| 75 | c.registry.DecrementInFlight(decCtx, c.nodeID, c.modelName, c.replicaIndex) |
| 76 | // Release the initial reservation after the first inference call completes |
| 77 | if c.onFirstComplete != nil { |
| 78 | c.firstOnce.Do(c.onFirstComplete) |
| 79 | } |
| 80 | } |
| 81 | } |
| 82 | |
| 83 | // reconcile self-heals stale routing: when a backend reports that the model is |
| 84 | // no longer loaded (the process survived but the model was evicted, while the |
no test coverage detected