diff --git a/connmgr/connmanager.go b/connmgr/connmanager.go index 0bd5b221d..e4f596835 100644 --- a/connmgr/connmanager.go +++ b/connmgr/connmanager.go @@ -178,6 +178,8 @@ type ConnManager struct { start int32 stop int32 + newConnReqMtx sync.Mutex + cfg Config wg sync.WaitGroup failedAttempts uint64 @@ -270,7 +272,7 @@ out: delete(pending, connReq.id) if cm.cfg.OnConnection != nil { - go cm.cfg.OnConnection(connReq, msg.conn) + cm.cfg.OnConnection(connReq, msg.conn) } case handleDisconnected: @@ -359,6 +361,8 @@ out: // NewConnReq creates a new connection request and connects to the // corresponding address. func (cm *ConnManager) NewConnReq() { + cm.newConnReqMtx.Lock() + defer cm.newConnReqMtx.Unlock() if atomic.LoadInt32(&cm.stop) != 0 { return } diff --git a/server/p2p/p2p.go b/server/p2p/p2p.go index 97411ae04..55bc4902f 100644 --- a/server/p2p/p2p.go +++ b/server/p2p/p2p.go @@ -137,6 +137,11 @@ type relayMsg struct { data interface{} } +type outboundPeerConnectedMsg struct { + connReq *connmgr.ConnReq + conn net.Conn +} + // updatePeerHeightsMsg is a message sent from the blockmanager to the server // after a new block has been accepted. The purpose of the message is to update // the heights of peers that were known to announce the block before we @@ -262,19 +267,20 @@ type Server struct { DAG *blockdag.BlockDAG TxMemPool *mempool.TxPool - modifyRebroadcastInv chan interface{} - newPeers chan *Peer - donePeers chan *Peer - banPeers chan *Peer - Query chan interface{} - relayInv chan relayMsg - broadcast chan broadcastMsg - wg sync.WaitGroup - quit chan struct{} - nat serverutils.NAT - db database.DB - TimeSource blockdag.MedianTimeSource - services wire.ServiceFlag + modifyRebroadcastInv chan interface{} + newPeers chan *Peer + donePeers chan *Peer + banPeers chan *Peer + newOutboundConnection chan *outboundPeerConnectedMsg + Query chan interface{} + relayInv chan relayMsg + broadcast chan broadcastMsg + wg sync.WaitGroup + quit chan struct{} + nat serverutils.NAT + db database.DB + TimeSource blockdag.MedianTimeSource + services wire.ServiceFlag // The following fields are used for optional indexes. They will be nil // if the associated index is not enabled. These fields are set during @@ -695,7 +701,6 @@ func (s *Server) handleAddPeerMsg(state *peerState, sp *Peer) bool { if sp.Inbound() { state.inboundPeers[sp.ID()] = sp } else { - state.outboundGroups[addrmgr.GroupKey(sp.NA())]++ if sp.persistent { state.persistentPeers[sp.ID()] = sp } else { @@ -1110,21 +1115,22 @@ func (s *Server) inboundPeerConnected(conn net.Conn) { // peer instance, associates it with the relevant state such as the connection // request instance and the connection itself, and finally notifies the address // manager of the attempt. -func (s *Server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) { - sp := newServerPeer(s, c.Permanent) - p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String()) +func (s *Server) outboundPeerConnected(state *peerState, msg *outboundPeerConnectedMsg) { + sp := newServerPeer(s, msg.connReq.Permanent) + outboundPeer, err := peer.NewOutboundPeer(newPeerConfig(sp), msg.connReq.Addr.String()) if err != nil { - srvrLog.Debugf("Cannot create outbound peer %s: %s", c.Addr, err) - s.connManager.Disconnect(c.ID()) + srvrLog.Debugf("Cannot create outbound peer %s: %s", msg.connReq.Addr, err) + s.connManager.Disconnect(msg.connReq.ID()) } - sp.Peer = p - sp.connReq = c - sp.isWhitelisted = isWhitelisted(conn.RemoteAddr()) - sp.AssociateConnection(conn) + sp.Peer = outboundPeer + sp.connReq = msg.connReq + sp.isWhitelisted = isWhitelisted(msg.conn.RemoteAddr()) + sp.AssociateConnection(msg.conn) spawn(func() { s.peerDoneHandler(sp) }) s.addrManager.Attempt(sp.NA()) + state.outboundGroups[addrmgr.GroupKey(sp.NA())]++ } // peerDoneHandler handles peer disconnects by notifiying the server that it's @@ -1226,6 +1232,9 @@ out: return true }) break out + + case opcMsg := <-s.newOutboundConnection: + s.outboundPeerConnected(state, opcMsg) } } @@ -1601,6 +1610,7 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params broadcast: make(chan broadcastMsg, config.MainConfig().MaxPeers), quit: make(chan struct{}), modifyRebroadcastInv: make(chan interface{}), + newOutboundConnection: make(chan *outboundPeerConnectedMsg), nat: nat, db: db, TimeSource: blockdag.NewMedianTime(), @@ -1757,7 +1767,6 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params addrString := addrmgr.NetAddressKey(addr.NetAddress()) return addrStringToNetAddr(addrString) } - return nil, errors.New("no valid connect address") } } @@ -1773,8 +1782,13 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params RetryDuration: connectionRetryInterval, TargetOutbound: uint32(targetOutbound), Dial: serverutils.BTCDDial, - OnConnection: s.outboundPeerConnected, - GetNewAddress: newAddressFunc, + OnConnection: func(c *connmgr.ConnReq, conn net.Conn) { + s.newOutboundConnection <- &outboundPeerConnectedMsg{ + connReq: c, + conn: conn, + } + }, + GetNewAddress: newAddressFunc, }) if err != nil { return nil, err