From 4d80750afec174cb5e748a146eeb0981767d7193 Mon Sep 17 00:00:00 2001 From: "Owain G. Ainsworth" Date: Wed, 30 Oct 2013 17:22:35 +0000 Subject: [PATCH] Move all local data to peerhandler into a peerState structure Pass peerstate around instead of indivdual bits. --- server.go | 105 +++++++++++++++++++++++++++++------------------------- 1 file changed, 57 insertions(+), 48 deletions(-) diff --git a/server.go b/server.go index 60b2b2ca0..469befdbb 100644 --- a/server.go +++ b/server.go @@ -69,9 +69,16 @@ type server struct { db btcdb.Db } +type peerState struct { + peers *list.List + banned map[string]time.Time + outboundPeers int + maxOutboundPeers int +} + // handleAddPeerMsg deals with adding new peers. It is invoked from the // peerHandler goroutine. -func (s *server) handleAddPeerMsg(peers *list.List, banned map[string]time.Time, p *peer) bool { +func (s *server) handleAddPeerMsg(state *peerState, p *peer) bool { if p == nil { return false } @@ -91,7 +98,7 @@ func (s *server) handleAddPeerMsg(peers *list.List, banned map[string]time.Time, p.Shutdown() return false } - if banEnd, ok := banned[host]; ok { + if banEnd, ok := state.banned[host]; ok { if time.Now().Before(banEnd) { log.Debugf("SRVR: Peer %s is banned for another %v - "+ "disconnecting", host, banEnd.Sub(time.Now())) @@ -100,13 +107,13 @@ func (s *server) handleAddPeerMsg(peers *list.List, banned map[string]time.Time, } log.Infof("SRVR: Peer %s is no longer banned", host) - delete(banned, host) + delete(state.banned, host) } // TODO: Check for max peers from a single IP. // Limit max number of total peers. - if peers.Len() >= cfg.MaxPeers { + if state.peers.Len() >= cfg.MaxPeers { log.Infof("SRVR: Max peers reached [%d] - disconnecting "+ "peer %s", cfg.MaxPeers, p) p.Shutdown() @@ -117,7 +124,7 @@ func (s *server) handleAddPeerMsg(peers *list.List, banned map[string]time.Time, // Add the new peer and start it. log.Debugf("SRVR: New peer %s", p) - peers.PushBack(p) + state.peers.PushBack(p) if p.inbound { p.Start() } @@ -127,8 +134,8 @@ func (s *server) handleAddPeerMsg(peers *list.List, banned map[string]time.Time, // handleDonePeerMsg deals with peers that have signalled they are done. It is // invoked from the peerHandler goroutine. -func (s *server) handleDonePeerMsg(peers *list.List, p *peer) bool { - for e := peers.Front(); e != nil; e = e.Next() { +func (s *server) handleDonePeerMsg(state *peerState, p *peer) bool { + for e := state.peers.Front(); e != nil; e = e.Next() { if e.Value == p { // Issue an asynchronous reconnect if the peer was a // persistent outbound connection. @@ -137,7 +144,7 @@ func (s *server) handleDonePeerMsg(peers *list.List, p *peer) bool { e.Value = newOutboundPeer(s, p.addr, true) return false } - peers.Remove(e) + state.peers.Remove(e) log.Debugf("SRVR: Removed peer %s", p) return true } @@ -148,7 +155,7 @@ func (s *server) handleDonePeerMsg(peers *list.List, p *peer) bool { // handleBanPeerMsg deals with banning peers. It is invoked from the // peerHandler goroutine. -func (s *server) handleBanPeerMsg(banned map[string]time.Time, p *peer) { +func (s *server) handleBanPeerMsg(state *peerState, p *peer) { host, _, err := net.SplitHostPort(p.addr) if err != nil { log.Debugf("SRVR: can't split ban peer %s %v", p.addr, err) @@ -157,16 +164,16 @@ func (s *server) handleBanPeerMsg(banned map[string]time.Time, p *peer) { direction := directionString(p.inbound) log.Infof("SRVR: Banned peer %s (%s) for %v", host, direction, cfg.BanDuration) - banned[host] = time.Now().Add(cfg.BanDuration) + state.banned[host] = time.Now().Add(cfg.BanDuration) } // handleRelayInvMsg deals with relaying inventory to peers that are not already // known to have it. It is invoked from the peerHandler goroutine. -func (s *server) handleRelayInvMsg(peers *list.List, iv *btcwire.InvVect) { +func (s *server) handleRelayInvMsg(state *peerState, iv *btcwire.InvVect) { // Loop through all connected peers and relay the inventory to those // which are not already known to have it. - for e := peers.Front(); e != nil; e = e.Next() { + for e := state.peers.Front(); e != nil; e = e.Next() { p := e.Value.(*peer) if !p.Connected() { continue @@ -181,8 +188,8 @@ func (s *server) handleRelayInvMsg(peers *list.List, iv *btcwire.InvVect) { // handleBroadcastMsg deals with broadcasting messages to peers. It is invoked // from the peerHandler goroutine. -func (s *server) handleBroadcastMsg(peers *list.List, bmsg *broadcastMsg) { - for e := peers.Front(); e != nil; e = e.Next() { +func (s *server) handleBroadcastMsg(state *peerState, bmsg *broadcastMsg) { + for e := state.peers.Front(); e != nil; e = e.Next() { excluded := false for _, p := range bmsg.excludePeers { if e.Value == p { @@ -238,11 +245,11 @@ type delNodeMsg struct { // handleQuery is the central handler for all queries and commands from other // goroutines related to peer state. -func (s *server) handleQuery(querymsg interface{}, peers *list.List, bannedPeers map[string]time.Time) { +func (s *server) handleQuery(querymsg interface{}, state *peerState) { switch msg := querymsg.(type) { case getConnCountMsg: nconnected := 0 - for e := peers.Front(); e != nil; e = e.Next() { + for e := state.peers.Front(); e != nil; e = e.Next() { peer := e.Value.(*peer) if peer.Connected() { nconnected++ @@ -251,8 +258,8 @@ func (s *server) handleQuery(querymsg interface{}, peers *list.List, bannedPeers msg.reply <- nconnected case getPeerInfoMsg: - infos := make([]*PeerInfo, 0, peers.Len()) - for e := peers.Front(); e != nil; e = e.Next() { + infos := make([]*PeerInfo, 0, state.peers.Len()) + for e := state.peers.Front(); e != nil; e = e.Next() { peer := e.Value.(*peer) if !peer.Connected() { continue @@ -281,7 +288,7 @@ func (s *server) handleQuery(querymsg interface{}, peers *list.List, bannedPeers msg.reply <- infos case addNodeMsg: // TODO(oga) really these checks only apply to permanent peers. - for e := peers.Front(); e != nil; e = e.Next() { + for e := state.peers.Front(); e != nil; e = e.Next() { peer := e.Value.(*peer) if peer.addr == msg.addr { msg.reply <- errors.New("peer already connected") @@ -289,7 +296,7 @@ func (s *server) handleQuery(querymsg interface{}, peers *list.List, bannedPeers } } // TODO(oga) if too many, nuke a non-perm peer. - if s.handleAddPeerMsg(peers, bannedPeers, + if s.handleAddPeerMsg(state, newOutboundPeer(s, msg.addr, msg.permanent)) { msg.reply <- nil } else { @@ -299,7 +306,7 @@ func (s *server) handleQuery(querymsg interface{}, peers *list.List, bannedPeers case delNodeMsg: found := false // TODO(oga) really these checks only apply to permanent peers. - for e := peers.Front(); e != nil; e = e.Next() { + for e := state.peers.Front(); e != nil; e = e.Next() { peer := e.Value.(*peer) if peer.addr == msg.addr { peer.persistent = false // XXX hack! @@ -391,12 +398,14 @@ func (s *server) peerHandler() { s.blockManager.Start() log.Tracef("SRVR: Starting peer handler") - peers := list.New() - bannedPeers := make(map[string]time.Time) - outboundPeers := 0 - maxOutbound := defaultMaxOutbound - if cfg.MaxPeers < maxOutbound { - maxOutbound = cfg.MaxPeers + state := &peerState{ + peers: list.New(), + persistentPeers: list.New(), + banned: make(map[string]time.Time), + maxOutboundPeers: defaultMaxOutbound, + } + if cfg.MaxPeers < state.maxOutboundPeers { + state.maxOutboundPeers = cfg.MaxPeers } // Add peers discovered through DNS to the address manager. @@ -408,9 +417,8 @@ func (s *server) peerHandler() { permanentPeers = cfg.AddPeers } for _, addr := range permanentPeers { - if s.handleAddPeerMsg(peers, bannedPeers, - newOutboundPeer(s, addr, true)) { - outboundPeers++ + if s.handleAddPeerMsg(state, newOutboundPeer(s, addr, true)) { + state.outboundPeers++ } } @@ -422,42 +430,41 @@ out: select { // New peers connected to the server. case p := <-s.newPeers: - if s.handleAddPeerMsg(peers, bannedPeers, p) && - !p.inbound { - outboundPeers++ + if s.handleAddPeerMsg(state, p) && !p.inbound { + state.outboundPeers++ } // Disconnected peers. case p := <-s.donePeers: // handleDonePeerMsg return true if it removed a peer - if s.handleDonePeerMsg(peers, p) { - outboundPeers-- + if s.handleDonePeerMsg(state, p) { + state.outboundPeers-- } // Peer to ban. case p := <-s.banPeers: - s.handleBanPeerMsg(bannedPeers, p) + s.handleBanPeerMsg(state, p) // New inventory to potentially be relayed to other peers. case invMsg := <-s.relayInv: - s.handleRelayInvMsg(peers, invMsg) + s.handleRelayInvMsg(state, invMsg) // Message to broadcast to all connected peers except those // which are excluded by the message. case bmsg := <-s.broadcast: - s.handleBroadcastMsg(peers, &bmsg) + s.handleBroadcastMsg(state, &bmsg) // Used by timers below to wake us back up. case <-s.wakeup: // this page left intentionally blank case qmsg := <-s.query: - s.handleQuery(qmsg, peers, bannedPeers) + s.handleQuery(qmsg, state) // Shutdown the peer handler. case <-s.quit: // Shutdown peers. - for e := peers.Front(); e != nil; e = e.Next() { + for e := state.peers.Front(); e != nil; e = e.Next() { p := e.Value.(*peer) p.Shutdown() } @@ -465,12 +472,13 @@ out: } // Only try connect to more peers if we actually need more - if outboundPeers >= maxOutbound || len(cfg.ConnectPeers) > 0 || + if state.outboundPeers >= state.maxOutboundPeers || + len(cfg.ConnectPeers) > 0 || atomic.LoadInt32(&s.shutdown) != 0 { continue } groups := make(map[string]int) - for e := peers.Front(); e != nil; e = e.Next() { + for e := state.peers.Front(); e != nil; e = e.Next() { peer := e.Value.(*peer) if !peer.inbound { groups[GroupKey(peer.na)]++ @@ -478,14 +486,14 @@ out: } tries := 0 - for outboundPeers < maxOutbound && - peers.Len() < cfg.MaxPeers && + for state.outboundPeers < state.maxOutboundPeers && + state.peers.Len() < cfg.MaxPeers && atomic.LoadInt32(&s.shutdown) == 0 { // We bias like bitcoind does, 10 for no outgoing // up to 90 (8) for the selection of new vs tried //addresses. - nPeers := outboundPeers + nPeers := state.outboundPeers if nPeers > 8 { nPeers = 8 } @@ -532,15 +540,16 @@ out: tries = 0 // any failure will be due to banned peers etc. we have // already checked that we have room for more peers. - if s.handleAddPeerMsg(peers, bannedPeers, + if s.handleAddPeerMsg(state, newOutboundPeer(s, addrStr, false)) { - outboundPeers++ + state.outboundPeers++ groups[key]++ } } // We we need more peers, wake up in ten seconds and try again. - if outboundPeers < maxOutbound && peers.Len() < cfg.MaxPeers { + if state.outboundPeers < state.maxOutboundPeers && + state.peers.Len() < cfg.MaxPeers { time.AfterFunc(10*time.Second, func() { s.wakeup <- true })