fetchRoutingTable updates the cluster routing table by fetching the latest version from the cluster. It initializes the partition count if it's the first invocation. Returns an error if fetching fails.
()
| 755 | // fetchRoutingTable updates the cluster routing table by fetching the latest version from the cluster. |
| 756 | // It initializes the partition count if it's the first invocation. Returns an error if fetching fails. |
| 757 | func (cl *ClusterClient) fetchRoutingTable() error { |
| 758 | ctx, cancel := context.WithCancel(cl.ctx) |
| 759 | defer cancel() |
| 760 | |
| 761 | routingTable, err := cl.RoutingTable(ctx) |
| 762 | if err != nil { |
| 763 | return fmt.Errorf("error while loading the routing table: %w", err) |
| 764 | } |
| 765 | |
| 766 | previous := cl.routingTable.Load() |
| 767 | if previous == nil { |
| 768 | // First run. Partition count is a constant, actually. It has to be greater than zero. |
| 769 | cl.partitionCount = uint64(len(routingTable)) |
| 770 | } |
| 771 | cl.routingTable.Store(routingTable) |
| 772 | return nil |
| 773 | } |
| 774 | |
| 775 | // fetchRoutingTablePeriodically periodically updates the routing table by invoking fetchRoutingTable at configured intervals. |
| 776 | // It stops gracefully when the context is canceled or an error occurs. |
no test coverage detected