[NOD-1041] Call outboundPeerConnected and outboundPeerConnectionFailed directly instead of routing them through peerHandler (#748)

* [NOD-1041] Fix a deadlock between connHandler and peerHandler.

* [NOD-1041] Simplified the fix.
This commit is contained in:
stasatdaglabs 2020-06-07 16:35:48 +03:00 committed by GitHub
parent 35c733a4c1
commit 9c78a797e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -109,15 +109,6 @@ type relayMsg struct {
data interface{}
}
type outboundPeerConnectedMsg struct {
connReq *connmgr.ConnReq
conn net.Conn
}
type outboundPeerConnectionFailedMsg struct {
connReq *connmgr.ConnReq
}
// Peer extends the peer to maintain state shared by the server and
// the blockmanager.
type Peer struct {
@ -229,19 +220,17 @@ type Server struct {
DAG *blockdag.BlockDAG
TxMemPool *mempool.TxPool
modifyRebroadcastInv chan interface{}
newPeers chan *Peer
donePeers chan *Peer
banPeers chan *Peer
newOutboundConnection chan *outboundPeerConnectedMsg
newOutboundConnectionFailed chan *outboundPeerConnectionFailedMsg
Query chan interface{}
relayInv chan relayMsg
broadcast chan broadcastMsg
wg sync.WaitGroup
nat serverutils.NAT
TimeSource blockdag.TimeSource
services wire.ServiceFlag
modifyRebroadcastInv chan interface{}
newPeers chan *Peer
donePeers chan *Peer
banPeers chan *Peer
Query chan interface{}
relayInv chan relayMsg
broadcast chan broadcastMsg
wg sync.WaitGroup
nat serverutils.NAT
TimeSource blockdag.TimeSource
services wire.ServiceFlag
// We add to quitWaitGroup before every instance in which we wait for
// the quit channel so that all those instances finish before we shut
@ -977,17 +966,17 @@ func (s *Server) inboundPeerConnected(conn net.Conn) {
// peer instance, associates it with the relevant state such as the connection
// request instance and the connection itself, and finally notifies the address
// manager of the attempt.
func (s *Server) outboundPeerConnected(state *peerState, msg *outboundPeerConnectedMsg) {
sp := newServerPeer(s, msg.connReq.Permanent)
outboundPeer, err := peer.NewOutboundPeer(newPeerConfig(sp), msg.connReq.Addr.String())
func (s *Server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) {
sp := newServerPeer(s, connReq.Permanent)
outboundPeer, err := peer.NewOutboundPeer(newPeerConfig(sp), connReq.Addr.String())
if err != nil {
srvrLog.Debugf("Cannot create outbound peer %s: %s", msg.connReq.Addr, err)
s.connManager.Disconnect(msg.connReq.ID())
srvrLog.Debugf("Cannot create outbound peer %s: %s", connReq.Addr, err)
s.connManager.Disconnect(connReq.ID())
}
sp.Peer = outboundPeer
sp.connReq = msg.connReq
sp.connReq = connReq
s.peerConnected(sp, msg.conn)
s.peerConnected(sp, conn)
s.addrManager.Attempt(sp.NA())
}
@ -1012,20 +1001,20 @@ func (s *Server) peerConnected(sp *Peer, conn net.Conn) {
// outboundPeerConnected is invoked by the connection manager when a new
// outbound connection failed to be established.
func (s *Server) outboundPeerConnectionFailed(msg *outboundPeerConnectionFailedMsg) {
func (s *Server) outboundPeerConnectionFailed(connReq *connmgr.ConnReq) {
// If the connection request has no address
// associated to it, do nothing.
if msg.connReq.Addr == nil {
if connReq.Addr == nil {
return
}
host, portStr, err := net.SplitHostPort(msg.connReq.Addr.String())
host, portStr, err := net.SplitHostPort(connReq.Addr.String())
if err != nil {
srvrLog.Debugf("Cannot extract address host and port %s: %s", msg.connReq.Addr, err)
srvrLog.Debugf("Cannot extract address host and port %s: %s", connReq.Addr, err)
}
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
srvrLog.Debugf("Cannot parse port %s: %s", msg.connReq.Addr, err)
srvrLog.Debugf("Cannot parse port %s: %s", connReq.Addr, err)
}
// defaultServices is used here because Attempt makes no use
@ -1137,12 +1126,6 @@ out:
})
s.quitWaitGroup.Done()
break out
case opcMsg := <-s.newOutboundConnection:
s.outboundPeerConnected(state, opcMsg)
case opcfMsg := <-s.newOutboundConnectionFailed:
s.outboundPeerConnectionFailed(opcfMsg)
}
}
@ -1497,23 +1480,21 @@ func NewServer(listenAddrs []string, dagParams *dagconfig.Params, interrupt <-ch
maxPeers := config.ActiveConfig().TargetOutboundPeers + config.ActiveConfig().MaxInboundPeers
s := Server{
DAGParams: dagParams,
addrManager: amgr,
newPeers: make(chan *Peer, maxPeers),
donePeers: make(chan *Peer, maxPeers),
banPeers: make(chan *Peer, maxPeers),
Query: make(chan interface{}),
relayInv: make(chan relayMsg, maxPeers),
broadcast: make(chan broadcastMsg, maxPeers),
quit: make(chan struct{}),
modifyRebroadcastInv: make(chan interface{}),
newOutboundConnection: make(chan *outboundPeerConnectedMsg, config.ActiveConfig().TargetOutboundPeers),
newOutboundConnectionFailed: make(chan *outboundPeerConnectionFailedMsg, config.ActiveConfig().TargetOutboundPeers),
nat: nat,
TimeSource: blockdag.NewTimeSource(),
services: services,
SigCache: txscript.NewSigCache(config.ActiveConfig().SigCacheMaxSize),
notifyNewTransactions: notifyNewTransactions,
DAGParams: dagParams,
addrManager: amgr,
newPeers: make(chan *Peer, maxPeers),
donePeers: make(chan *Peer, maxPeers),
banPeers: make(chan *Peer, maxPeers),
Query: make(chan interface{}),
relayInv: make(chan relayMsg, maxPeers),
broadcast: make(chan broadcastMsg, maxPeers),
quit: make(chan struct{}),
modifyRebroadcastInv: make(chan interface{}),
nat: nat,
TimeSource: blockdag.NewTimeSource(),
services: services,
SigCache: txscript.NewSigCache(config.ActiveConfig().SigCacheMaxSize),
notifyNewTransactions: notifyNewTransactions,
}
// Create indexes if needed.
@ -1576,23 +1557,14 @@ func NewServer(listenAddrs []string, dagParams *dagconfig.Params, interrupt <-ch
// Create a connection manager.
cmgr, err := connmgr.New(&connmgr.Config{
Listeners: listeners,
OnAccept: s.inboundPeerConnected,
RetryDuration: connectionRetryInterval,
TargetOutbound: uint32(config.ActiveConfig().TargetOutboundPeers),
Dial: serverutils.KaspadDial,
OnConnection: func(c *connmgr.ConnReq, conn net.Conn) {
s.newOutboundConnection <- &outboundPeerConnectedMsg{
connReq: c,
conn: conn,
}
},
OnConnectionFailed: func(c *connmgr.ConnReq) {
s.newOutboundConnectionFailed <- &outboundPeerConnectionFailedMsg{
connReq: c,
}
},
AddrManager: s.addrManager,
Listeners: listeners,
OnAccept: s.inboundPeerConnected,
RetryDuration: connectionRetryInterval,
TargetOutbound: uint32(config.ActiveConfig().TargetOutboundPeers),
Dial: serverutils.KaspadDial,
OnConnection: s.outboundPeerConnected,
OnConnectionFailed: s.outboundPeerConnectionFailed,
AddrManager: s.addrManager,
})
if err != nil {
return nil, err