diff --git a/server/p2p/p2p.go b/server/p2p/p2p.go index 4bf5cb03e..8b1dacdf6 100644 --- a/server/p2p/p2p.go +++ b/server/p2p/p2p.go @@ -109,15 +109,6 @@ type relayMsg struct { data interface{} } -type outboundPeerConnectedMsg struct { - connReq *connmgr.ConnReq - conn net.Conn -} - -type outboundPeerConnectionFailedMsg struct { - connReq *connmgr.ConnReq -} - // Peer extends the peer to maintain state shared by the server and // the blockmanager. type Peer struct { @@ -229,19 +220,17 @@ type Server struct { DAG *blockdag.BlockDAG TxMemPool *mempool.TxPool - modifyRebroadcastInv chan interface{} - newPeers chan *Peer - donePeers chan *Peer - banPeers chan *Peer - newOutboundConnection chan *outboundPeerConnectedMsg - newOutboundConnectionFailed chan *outboundPeerConnectionFailedMsg - Query chan interface{} - relayInv chan relayMsg - broadcast chan broadcastMsg - wg sync.WaitGroup - nat serverutils.NAT - TimeSource blockdag.TimeSource - services wire.ServiceFlag + modifyRebroadcastInv chan interface{} + newPeers chan *Peer + donePeers chan *Peer + banPeers chan *Peer + Query chan interface{} + relayInv chan relayMsg + broadcast chan broadcastMsg + wg sync.WaitGroup + nat serverutils.NAT + TimeSource blockdag.TimeSource + services wire.ServiceFlag // We add to quitWaitGroup before every instance in which we wait for // the quit channel so that all those instances finish before we shut @@ -977,17 +966,17 @@ func (s *Server) inboundPeerConnected(conn net.Conn) { // peer instance, associates it with the relevant state such as the connection // request instance and the connection itself, and finally notifies the address // manager of the attempt. -func (s *Server) outboundPeerConnected(state *peerState, msg *outboundPeerConnectedMsg) { - sp := newServerPeer(s, msg.connReq.Permanent) - outboundPeer, err := peer.NewOutboundPeer(newPeerConfig(sp), msg.connReq.Addr.String()) +func (s *Server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) { + sp := newServerPeer(s, connReq.Permanent) + outboundPeer, err := peer.NewOutboundPeer(newPeerConfig(sp), connReq.Addr.String()) if err != nil { - srvrLog.Debugf("Cannot create outbound peer %s: %s", msg.connReq.Addr, err) - s.connManager.Disconnect(msg.connReq.ID()) + srvrLog.Debugf("Cannot create outbound peer %s: %s", connReq.Addr, err) + s.connManager.Disconnect(connReq.ID()) } sp.Peer = outboundPeer - sp.connReq = msg.connReq + sp.connReq = connReq - s.peerConnected(sp, msg.conn) + s.peerConnected(sp, conn) s.addrManager.Attempt(sp.NA()) } @@ -1012,20 +1001,20 @@ func (s *Server) peerConnected(sp *Peer, conn net.Conn) { // outboundPeerConnected is invoked by the connection manager when a new // outbound connection failed to be established. -func (s *Server) outboundPeerConnectionFailed(msg *outboundPeerConnectionFailedMsg) { +func (s *Server) outboundPeerConnectionFailed(connReq *connmgr.ConnReq) { // If the connection request has no address // associated to it, do nothing. - if msg.connReq.Addr == nil { + if connReq.Addr == nil { return } - host, portStr, err := net.SplitHostPort(msg.connReq.Addr.String()) + host, portStr, err := net.SplitHostPort(connReq.Addr.String()) if err != nil { - srvrLog.Debugf("Cannot extract address host and port %s: %s", msg.connReq.Addr, err) + srvrLog.Debugf("Cannot extract address host and port %s: %s", connReq.Addr, err) } port, err := strconv.ParseUint(portStr, 10, 16) if err != nil { - srvrLog.Debugf("Cannot parse port %s: %s", msg.connReq.Addr, err) + srvrLog.Debugf("Cannot parse port %s: %s", connReq.Addr, err) } // defaultServices is used here because Attempt makes no use @@ -1137,12 +1126,6 @@ out: }) s.quitWaitGroup.Done() break out - - case opcMsg := <-s.newOutboundConnection: - s.outboundPeerConnected(state, opcMsg) - - case opcfMsg := <-s.newOutboundConnectionFailed: - s.outboundPeerConnectionFailed(opcfMsg) } } @@ -1497,23 +1480,21 @@ func NewServer(listenAddrs []string, dagParams *dagconfig.Params, interrupt <-ch maxPeers := config.ActiveConfig().TargetOutboundPeers + config.ActiveConfig().MaxInboundPeers s := Server{ - DAGParams: dagParams, - addrManager: amgr, - newPeers: make(chan *Peer, maxPeers), - donePeers: make(chan *Peer, maxPeers), - banPeers: make(chan *Peer, maxPeers), - Query: make(chan interface{}), - relayInv: make(chan relayMsg, maxPeers), - broadcast: make(chan broadcastMsg, maxPeers), - quit: make(chan struct{}), - modifyRebroadcastInv: make(chan interface{}), - newOutboundConnection: make(chan *outboundPeerConnectedMsg, config.ActiveConfig().TargetOutboundPeers), - newOutboundConnectionFailed: make(chan *outboundPeerConnectionFailedMsg, config.ActiveConfig().TargetOutboundPeers), - nat: nat, - TimeSource: blockdag.NewTimeSource(), - services: services, - SigCache: txscript.NewSigCache(config.ActiveConfig().SigCacheMaxSize), - notifyNewTransactions: notifyNewTransactions, + DAGParams: dagParams, + addrManager: amgr, + newPeers: make(chan *Peer, maxPeers), + donePeers: make(chan *Peer, maxPeers), + banPeers: make(chan *Peer, maxPeers), + Query: make(chan interface{}), + relayInv: make(chan relayMsg, maxPeers), + broadcast: make(chan broadcastMsg, maxPeers), + quit: make(chan struct{}), + modifyRebroadcastInv: make(chan interface{}), + nat: nat, + TimeSource: blockdag.NewTimeSource(), + services: services, + SigCache: txscript.NewSigCache(config.ActiveConfig().SigCacheMaxSize), + notifyNewTransactions: notifyNewTransactions, } // Create indexes if needed. @@ -1576,23 +1557,14 @@ func NewServer(listenAddrs []string, dagParams *dagconfig.Params, interrupt <-ch // Create a connection manager. cmgr, err := connmgr.New(&connmgr.Config{ - Listeners: listeners, - OnAccept: s.inboundPeerConnected, - RetryDuration: connectionRetryInterval, - TargetOutbound: uint32(config.ActiveConfig().TargetOutboundPeers), - Dial: serverutils.KaspadDial, - OnConnection: func(c *connmgr.ConnReq, conn net.Conn) { - s.newOutboundConnection <- &outboundPeerConnectedMsg{ - connReq: c, - conn: conn, - } - }, - OnConnectionFailed: func(c *connmgr.ConnReq) { - s.newOutboundConnectionFailed <- &outboundPeerConnectionFailedMsg{ - connReq: c, - } - }, - AddrManager: s.addrManager, + Listeners: listeners, + OnAccept: s.inboundPeerConnected, + RetryDuration: connectionRetryInterval, + TargetOutbound: uint32(config.ActiveConfig().TargetOutboundPeers), + Dial: serverutils.KaspadDial, + OnConnection: s.outboundPeerConnected, + OnConnectionFailed: s.outboundPeerConnectionFailed, + AddrManager: s.addrManager, }) if err != nil { return nil, err