Handle outbound node communication: read messages from the channel, forward to remote nodes. FIXME(gene): this will drain the outbound queue in case of a failure: all unprocessed messages will be dropped. Maybe it's a good thing, maybe not.
()
| 246 | // FIXME(gene): this will drain the outbound queue in case of a failure: all unprocessed messages will be dropped. |
| 247 | // Maybe it's a good thing, maybe not. |
| 248 | func (n *ClusterNode) reconnect() { |
| 249 | var reconnTicker *time.Ticker |
| 250 | |
| 251 | // Avoid parallel reconnection threads. |
| 252 | n.lock.Lock() |
| 253 | if n.reconnecting { |
| 254 | n.lock.Unlock() |
| 255 | return |
| 256 | } |
| 257 | n.reconnecting = true |
| 258 | n.lock.Unlock() |
| 259 | |
| 260 | count := 0 |
| 261 | for { |
| 262 | // Attempt to reconnect right away |
| 263 | if conn, err := net.DialTimeout("tcp", n.address, clusterNetworkTimeout); err == nil { |
| 264 | if reconnTicker != nil { |
| 265 | reconnTicker.Stop() |
| 266 | } |
| 267 | n.lock.Lock() |
| 268 | n.endpoint = rpc.NewClient(conn) |
| 269 | n.connected = true |
| 270 | n.reconnecting = false |
| 271 | n.lock.Unlock() |
| 272 | statsInc("LiveClusterNodes", 1) |
| 273 | logs.Info.Println("cluster: connected to", n.name) |
| 274 | // Send this node credentials to the new node. |
| 275 | var unused bool |
| 276 | n.call("Cluster.Ping", |
| 277 | &ClusterPing{ |
| 278 | Node: globals.cluster.thisNodeName, |
| 279 | Fingerprint: globals.cluster.fingerprint, |
| 280 | }, |
| 281 | &unused) |
| 282 | return |
| 283 | } else if count == 0 { |
| 284 | reconnTicker = time.NewTicker(clusterDefaultReconnectTime) |
| 285 | } |
| 286 | |
| 287 | count++ |
| 288 | |
| 289 | select { |
| 290 | case <-reconnTicker.C: |
| 291 | // Wait for timer to try to reconnect again. Do nothing if the timer is inactive. |
| 292 | case <-n.done: |
| 293 | // Shutting down |
| 294 | logs.Info.Println("cluster: shutdown started at node", n.name) |
| 295 | reconnTicker.Stop() |
| 296 | if n.endpoint != nil { |
| 297 | n.endpoint.Close() |
| 298 | } |
| 299 | n.lock.Lock() |
| 300 | n.connected = false |
| 301 | n.reconnecting = false |
| 302 | n.lock.Unlock() |
| 303 | logs.Info.Println("cluster: shut down completed at node", n.name) |
| 304 | return |
| 305 | } |