diff --git a/connmgr/connmanager.go b/connmgr/connmanager.go index 99080213f..aaa245c87 100644 --- a/connmgr/connmanager.go +++ b/connmgr/connmanager.go @@ -151,6 +151,10 @@ type Config struct { // connection is established. OnConnection func(*ConnReq, net.Conn) + // OnConnectionFailed is a callback that is fired when a new outbound + // connection has failed to be established. + OnConnectionFailed func(*ConnReq) + // OnDisconnection is a callback that is fired when an outbound // connection is disconnected. OnDisconnection func(*ConnReq) @@ -419,6 +423,10 @@ out: connReq, msg.err) } cm.handleFailedConn(connReq, msg.err) + + if cm.cfg.OnConnectionFailed != nil { + cm.cfg.OnConnectionFailed(connReq) + } } case <-cm.quit: diff --git a/server/p2p/p2p.go b/server/p2p/p2p.go index bc3218ff2..8d85b34c5 100644 --- a/server/p2p/p2p.go +++ b/server/p2p/p2p.go @@ -115,6 +115,10 @@ type outboundPeerConnectedMsg struct { conn net.Conn } +type outboundPeerConnectionFailedMsg struct { + connReq *connmgr.ConnReq +} + // Peer extends the peer to maintain state shared by the server and // the blockmanager. type Peer struct { @@ -227,18 +231,19 @@ type Server struct { DAG *blockdag.BlockDAG TxMemPool *mempool.TxPool - modifyRebroadcastInv chan interface{} - newPeers chan *Peer - donePeers chan *Peer - banPeers chan *Peer - newOutboundConnection chan *outboundPeerConnectedMsg - Query chan interface{} - relayInv chan relayMsg - broadcast chan broadcastMsg - wg sync.WaitGroup - nat serverutils.NAT - TimeSource blockdag.TimeSource - services wire.ServiceFlag + modifyRebroadcastInv chan interface{} + newPeers chan *Peer + donePeers chan *Peer + banPeers chan *Peer + newOutboundConnection chan *outboundPeerConnectedMsg + newOutboundConnectionFailed chan *outboundPeerConnectionFailedMsg + Query chan interface{} + relayInv chan relayMsg + broadcast chan broadcastMsg + wg sync.WaitGroup + nat serverutils.NAT + TimeSource blockdag.TimeSource + services wire.ServiceFlag // We add to quitWaitGroup before every instance in which we wait for // the quit channel so that all those instances finish before we shut @@ -1039,6 +1044,26 @@ func (s *Server) outboundPeerConnected(state *peerState, msg *outboundPeerConnec state.outboundGroups[addrmgr.GroupKey(sp.NA())]++ } +// outboundPeerConnected is invoked by the connection manager when a new +// outbound connection failed to be established. +func (s *Server) outboundPeerConnectionFailed(msg *outboundPeerConnectionFailedMsg) { + host, portStr, err := net.SplitHostPort(msg.connReq.Addr.String()) + if err != nil { + srvrLog.Debugf("Cannot extract address host and port %s: %s", msg.connReq.Addr, err) + } + port, err := strconv.ParseUint(portStr, 10, 16) + if err != nil { + srvrLog.Debugf("Cannot parse port %s: %s", msg.connReq.Addr, err) + } + + // defaultServices is used here because Attempt makes no use + // of the services field and NewNetAddressIPPort does not + // take nil for it. + netAddress := wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), defaultServices) + + s.addrManager.Attempt(netAddress) +} + // peerDoneHandler handles peer disconnects by notifiying the server that it's // done along with other performing other desirable cleanup. func (s *Server) peerDoneHandler(sp *Peer) { @@ -1144,6 +1169,9 @@ out: case opcMsg := <-s.newOutboundConnection: s.outboundPeerConnected(state, opcMsg) + + case opcfMsg := <-s.newOutboundConnectionFailed: + s.outboundPeerConnectionFailed(opcfMsg) } } @@ -1517,22 +1545,23 @@ func NewServer(listenAddrs []string, dagParams *dagconfig.Params, interrupt <-ch maxPeers := config.ActiveConfig().TargetOutboundPeers + config.ActiveConfig().MaxInboundPeers s := Server{ - DAGParams: dagParams, - addrManager: amgr, - newPeers: make(chan *Peer, maxPeers), - donePeers: make(chan *Peer, maxPeers), - banPeers: make(chan *Peer, maxPeers), - Query: make(chan interface{}), - relayInv: make(chan relayMsg, maxPeers), - broadcast: make(chan broadcastMsg, maxPeers), - quit: make(chan struct{}), - modifyRebroadcastInv: make(chan interface{}), - newOutboundConnection: make(chan *outboundPeerConnectedMsg, config.ActiveConfig().TargetOutboundPeers), - nat: nat, - TimeSource: blockdag.NewTimeSource(), - services: services, - SigCache: txscript.NewSigCache(config.ActiveConfig().SigCacheMaxSize), - notifyNewTransactions: notifyNewTransactions, + DAGParams: dagParams, + addrManager: amgr, + newPeers: make(chan *Peer, maxPeers), + donePeers: make(chan *Peer, maxPeers), + banPeers: make(chan *Peer, maxPeers), + Query: make(chan interface{}), + relayInv: make(chan relayMsg, maxPeers), + broadcast: make(chan broadcastMsg, maxPeers), + quit: make(chan struct{}), + modifyRebroadcastInv: make(chan interface{}), + newOutboundConnection: make(chan *outboundPeerConnectedMsg, config.ActiveConfig().TargetOutboundPeers), + newOutboundConnectionFailed: make(chan *outboundPeerConnectionFailedMsg, config.ActiveConfig().TargetOutboundPeers), + nat: nat, + TimeSource: blockdag.NewTimeSource(), + services: services, + SigCache: txscript.NewSigCache(config.ActiveConfig().SigCacheMaxSize), + notifyNewTransactions: notifyNewTransactions, } // Create indexes if needed. @@ -1657,6 +1686,11 @@ func NewServer(listenAddrs []string, dagParams *dagconfig.Params, interrupt <-ch conn: conn, } }, + OnConnectionFailed: func(c *connmgr.ConnReq) { + s.newOutboundConnectionFailed <- &outboundPeerConnectionFailedMsg{ + connReq: c, + } + }, GetNewAddress: newAddressFunc, }) if err != nil {