[NOD-934] Fix addresses not getting their retry attempt counter incremented if they fail to connect (#702)

* [NOD-934] Fix addresses not getting their retry attempt counter incremented if they fail to connect.

* [NOD-922] Inline parseNetAddress.

* [NOD-922] Fix debug logs.
This commit is contained in:
stasatdaglabs 2020-04-23 17:01:09 +03:00 committed by GitHub
parent 3af945692e
commit 2910724b49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 70 additions and 28 deletions

View File

@ -151,6 +151,10 @@ type Config struct {
// connection is established.
OnConnection func(*ConnReq, net.Conn)
// OnConnectionFailed is a callback that is fired when a new outbound
// connection has failed to be established.
OnConnectionFailed func(*ConnReq)
// OnDisconnection is a callback that is fired when an outbound
// connection is disconnected.
OnDisconnection func(*ConnReq)
@ -419,6 +423,10 @@ out:
connReq, msg.err)
}
cm.handleFailedConn(connReq, msg.err)
if cm.cfg.OnConnectionFailed != nil {
cm.cfg.OnConnectionFailed(connReq)
}
}
case <-cm.quit:

View File

@ -115,6 +115,10 @@ type outboundPeerConnectedMsg struct {
conn net.Conn
}
type outboundPeerConnectionFailedMsg struct {
connReq *connmgr.ConnReq
}
// Peer extends the peer to maintain state shared by the server and
// the blockmanager.
type Peer struct {
@ -227,18 +231,19 @@ type Server struct {
DAG *blockdag.BlockDAG
TxMemPool *mempool.TxPool
modifyRebroadcastInv chan interface{}
newPeers chan *Peer
donePeers chan *Peer
banPeers chan *Peer
newOutboundConnection chan *outboundPeerConnectedMsg
Query chan interface{}
relayInv chan relayMsg
broadcast chan broadcastMsg
wg sync.WaitGroup
nat serverutils.NAT
TimeSource blockdag.TimeSource
services wire.ServiceFlag
modifyRebroadcastInv chan interface{}
newPeers chan *Peer
donePeers chan *Peer
banPeers chan *Peer
newOutboundConnection chan *outboundPeerConnectedMsg
newOutboundConnectionFailed chan *outboundPeerConnectionFailedMsg
Query chan interface{}
relayInv chan relayMsg
broadcast chan broadcastMsg
wg sync.WaitGroup
nat serverutils.NAT
TimeSource blockdag.TimeSource
services wire.ServiceFlag
// We add to quitWaitGroup before every instance in which we wait for
// the quit channel so that all those instances finish before we shut
@ -1039,6 +1044,26 @@ func (s *Server) outboundPeerConnected(state *peerState, msg *outboundPeerConnec
state.outboundGroups[addrmgr.GroupKey(sp.NA())]++
}
// outboundPeerConnected is invoked by the connection manager when a new
// outbound connection failed to be established.
func (s *Server) outboundPeerConnectionFailed(msg *outboundPeerConnectionFailedMsg) {
host, portStr, err := net.SplitHostPort(msg.connReq.Addr.String())
if err != nil {
srvrLog.Debugf("Cannot extract address host and port %s: %s", msg.connReq.Addr, err)
}
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
srvrLog.Debugf("Cannot parse port %s: %s", msg.connReq.Addr, err)
}
// defaultServices is used here because Attempt makes no use
// of the services field and NewNetAddressIPPort does not
// take nil for it.
netAddress := wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), defaultServices)
s.addrManager.Attempt(netAddress)
}
// peerDoneHandler handles peer disconnects by notifiying the server that it's
// done along with other performing other desirable cleanup.
func (s *Server) peerDoneHandler(sp *Peer) {
@ -1144,6 +1169,9 @@ out:
case opcMsg := <-s.newOutboundConnection:
s.outboundPeerConnected(state, opcMsg)
case opcfMsg := <-s.newOutboundConnectionFailed:
s.outboundPeerConnectionFailed(opcfMsg)
}
}
@ -1517,22 +1545,23 @@ func NewServer(listenAddrs []string, dagParams *dagconfig.Params, interrupt <-ch
maxPeers := config.ActiveConfig().TargetOutboundPeers + config.ActiveConfig().MaxInboundPeers
s := Server{
DAGParams: dagParams,
addrManager: amgr,
newPeers: make(chan *Peer, maxPeers),
donePeers: make(chan *Peer, maxPeers),
banPeers: make(chan *Peer, maxPeers),
Query: make(chan interface{}),
relayInv: make(chan relayMsg, maxPeers),
broadcast: make(chan broadcastMsg, maxPeers),
quit: make(chan struct{}),
modifyRebroadcastInv: make(chan interface{}),
newOutboundConnection: make(chan *outboundPeerConnectedMsg, config.ActiveConfig().TargetOutboundPeers),
nat: nat,
TimeSource: blockdag.NewTimeSource(),
services: services,
SigCache: txscript.NewSigCache(config.ActiveConfig().SigCacheMaxSize),
notifyNewTransactions: notifyNewTransactions,
DAGParams: dagParams,
addrManager: amgr,
newPeers: make(chan *Peer, maxPeers),
donePeers: make(chan *Peer, maxPeers),
banPeers: make(chan *Peer, maxPeers),
Query: make(chan interface{}),
relayInv: make(chan relayMsg, maxPeers),
broadcast: make(chan broadcastMsg, maxPeers),
quit: make(chan struct{}),
modifyRebroadcastInv: make(chan interface{}),
newOutboundConnection: make(chan *outboundPeerConnectedMsg, config.ActiveConfig().TargetOutboundPeers),
newOutboundConnectionFailed: make(chan *outboundPeerConnectionFailedMsg, config.ActiveConfig().TargetOutboundPeers),
nat: nat,
TimeSource: blockdag.NewTimeSource(),
services: services,
SigCache: txscript.NewSigCache(config.ActiveConfig().SigCacheMaxSize),
notifyNewTransactions: notifyNewTransactions,
}
// Create indexes if needed.
@ -1657,6 +1686,11 @@ func NewServer(listenAddrs []string, dagParams *dagconfig.Params, interrupt <-ch
conn: conn,
}
},
OnConnectionFailed: func(c *connmgr.ConnReq) {
s.newOutboundConnectionFailed <- &outboundPeerConnectionFailedMsg{
connReq: c,
}
},
GetNewAddress: newAddressFunc,
})
if err != nil {