From ece3ed84439c2e4ad46bd94d35c7374914c99263 Mon Sep 17 00:00:00 2001 From: Guilherme Salgado Date: Tue, 22 Jul 2014 17:58:24 -0300 Subject: [PATCH] Incrementally backoff when reconnecting to peers This ensures we backoff when reconnecting to peers for which we don't understand the replies, just like we do for peers we fail to connect to. Closes #103 --- peer.go | 69 ++++++++++++++++++++++++------------------------------- server.go | 13 +++++------ 2 files changed, 36 insertions(+), 46 deletions(-) diff --git a/peer.go b/peer.go index 97a9e3196..79cc7a031 100644 --- a/peer.go +++ b/peer.go @@ -1527,6 +1527,7 @@ out: // ok we got a message, reset the timer. // timer just calls p.Disconnect() after logging. idleTimer.Reset(idleTimeoutMinutes * time.Minute) + p.retryCount = 0 } idleTimer.Stop() @@ -1910,10 +1911,11 @@ func newInboundPeer(s *server, conn net.Conn) *peer { // newOutbountPeer returns a new outbound bitcoin peer for the provided server and // address and connects to it asynchronously. If the connection is successful // then the peer will also be started. -func newOutboundPeer(s *server, addr string, persistent bool) *peer { +func newOutboundPeer(s *server, addr string, persistent bool, retryCount int64) *peer { p := newPeerBase(s, false) p.addr = addr p.persistent = persistent + p.retryCount = retryCount // Setup p.na with a temporary address that we are connecting to with // faked up service flags. We will replace this with the real one after @@ -1945,46 +1947,35 @@ func newOutboundPeer(s *server, addr string, persistent bool) *peer { } go func() { - // Attempt to connect to the peer. If the connection fails and - // this is a persistent connection, retry after the retry - // interval. - for atomic.LoadInt32(&p.disconnect) == 0 { - srvrLog.Debugf("Attempting to connect to %s", addr) - conn, err := btcdDial("tcp", addr) - if err != nil { - p.retryCount++ - srvrLog.Debugf("Failed to connect to %s: %v", - addr, err) - if !persistent { - p.server.donePeers <- p - return - } - scaledInterval := connectionRetryInterval.Nanoseconds() * p.retryCount / 2 - scaledDuration := time.Duration(scaledInterval) - srvrLog.Debugf("Retrying connection to %s in "+ - "%s", addr, scaledDuration) - time.Sleep(scaledDuration) - continue - } - - // While we were sleeping trying to connect, the server - // may have scheduled a shutdown. In that case ditch - // the peer immediately. - if atomic.LoadInt32(&p.disconnect) == 0 { - p.timeConnected = time.Now() - p.server.addrManager.Attempt(p.na) - - // Connection was successful so log it and start peer. - srvrLog.Debugf("Connected to %s", - conn.RemoteAddr()) - p.conn = conn - atomic.AddInt32(&p.connected, 1) - p.retryCount = 0 - p.Start() - } - + if atomic.LoadInt32(&p.disconnect) != 0 { return } + if p.retryCount > 0 { + scaledInterval := connectionRetryInterval.Nanoseconds() * p.retryCount / 2 + scaledDuration := time.Duration(scaledInterval) + srvrLog.Debugf("Retrying connection to %s in %s", addr, scaledDuration) + time.Sleep(scaledDuration) + } + srvrLog.Debugf("Attempting to connect to %s", addr) + conn, err := btcdDial("tcp", addr) + if err != nil { + srvrLog.Debugf("Failed to connect to %s: %v", addr, err) + p.server.donePeers <- p + return + } + + // We may have slept and the server may have scheduled a shutdown. In that + // case ditch the peer immediately. + if atomic.LoadInt32(&p.disconnect) == 0 { + p.timeConnected = time.Now() + p.server.addrManager.Attempt(p.na) + + // Connection was successful so log it and start peer. + srvrLog.Debugf("Connected to %s", conn.RemoteAddr()) + p.conn = conn + atomic.AddInt32(&p.connected, 1) + p.Start() + } }() return p } diff --git a/server.go b/server.go index 0e4759484..3e6640cc0 100644 --- a/server.go +++ b/server.go @@ -252,9 +252,8 @@ func (s *server) handleDonePeerMsg(state *peerState, p *peer) { if e.Value == p { // Issue an asynchronous reconnect if the peer was a // persistent outbound connection. - if !p.inbound && p.persistent && - atomic.LoadInt32(&s.shutdown) == 0 { - e.Value = newOutboundPeer(s, p.addr, true) + if !p.inbound && p.persistent && atomic.LoadInt32(&s.shutdown) == 0 { + e.Value = newOutboundPeer(s, p.addr, true, p.retryCount+1) return } if !p.inbound { @@ -431,7 +430,7 @@ func (s *server) handleQuery(querymsg interface{}, state *peerState) { } // TODO(oga) if too many, nuke a non-perm peer. if s.handleAddPeerMsg(state, - newOutboundPeer(s, msg.addr, msg.permanent)) { + newOutboundPeer(s, msg.addr, msg.permanent, 0)) { msg.reply <- nil } else { msg.reply <- errors.New("failed to add peer") @@ -573,7 +572,7 @@ func (s *server) peerHandler() { permanentPeers = cfg.AddPeers } for _, addr := range permanentPeers { - s.handleAddPeerMsg(state, newOutboundPeer(s, addr, true)) + s.handleAddPeerMsg(state, newOutboundPeer(s, addr, true, 0)) } // if nothing else happens, wake us up soon. @@ -686,11 +685,11 @@ out: // any failure will be due to banned peers etc. we have // already checked that we have room for more peers. if s.handleAddPeerMsg(state, - newOutboundPeer(s, addrStr, false)) { + newOutboundPeer(s, addrStr, false, 0)) { } } - // We we need more peers, wake up in ten seconds and try again. + // We need more peers, wake up in ten seconds and try again. if state.NeedMoreOutbound() { time.AfterFunc(10*time.Second, func() { s.wakeup <- struct{}{}