| 171 | } |
| 172 | |
| 173 | func (pool *LoadBalancedPool) Update(instanceInfos []LBPoolInstanceInfo) { |
| 174 | |
| 175 | pool.lock.Lock() |
| 176 | defer pool.lock.Unlock() |
| 177 | newInstances := make(map[string]*instancePool) |
| 178 | var newInstanceList instancePoolSlice |
| 179 | for _, instanceInfo := range instanceInfos { |
| 180 | if _, ok := newInstances[instanceInfo.Addr]; !ok { |
| 181 | var instance *instancePool |
| 182 | if instance, ok = pool.instances[instanceInfo.Addr]; !ok { |
| 183 | instance = pool.newInstancePool(instanceInfo) |
| 184 | } else { |
| 185 | // Update `instanceId` for given instance since same host:port address |
| 186 | // might have a new `instanceId` now. |
| 187 | instance.instanceId = instanceInfo.InstanceId |
| 188 | } |
| 189 | newInstances[instanceInfo.Addr] = instance |
| 190 | newInstanceList = append(newInstanceList, instance) |
| 191 | } |
| 192 | } |
| 193 | |
| 194 | var instanceHashes []uint32 |
| 195 | if pool.strategy == LBConsistentHashing { |
| 196 | // we need to recompute the new hashes |
| 197 | instanceHashes = make([]uint32, len(newInstanceList)) |
| 198 | for i, instance := range newInstanceList { |
| 199 | instanceHashes[i] = hashInstance(pool.hashSeed, pool.hashFunction, instance) |
| 200 | } |
| 201 | } |
| 202 | |
| 203 | // Each strategy has a specific sorter that knows how to sort the instances |
| 204 | pool.sortInstances(newInstanceList, instanceHashes) |
| 205 | |
| 206 | for addr, instancePool := range pool.instances { |
| 207 | // Close out all InstancePools that are not needed anymore. |
| 208 | if _, ok := newInstances[addr]; !ok { |
| 209 | instancePool.Close() |
| 210 | } |
| 211 | } |
| 212 | pool.instances = newInstances |
| 213 | pool.instanceList = newInstanceList |
| 214 | pool.instanceHashes = instanceHashes |
| 215 | pool.markDownUntil = make([]int64, len(newInstanceList)) |
| 216 | |
| 217 | } |
| 218 | |
| 219 | // |
| 220 | // Pool interface methods |