From 92a8605b2425fe4621a90fa639cc48329126a8b8 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 12 Sep 2013 14:19:10 -0500 Subject: [PATCH] Continue work on addrmgr and multi-peer. - Remove leftover debug log prints - Increment waitgroup outside of goroutine - Various comment and log message consistency - Combine peer setup and newPeer -> newInboundPeer - Save and load peers.json to/from cfg.DataDir - Only claim addrmgr needs more addresses when it has less than 1000 - Add warning if unkown peer on orphan block. --- addrmanager.go | 62 ++++++++++++++------------- blockmanager.go | 3 ++ peer.go | 108 +++++++++++++++++++++++++----------------------- server.go | 2 +- 4 files changed, 93 insertions(+), 82 deletions(-) diff --git a/addrmanager.go b/addrmanager.go index e8a03a30d..17685e049 100644 --- a/addrmanager.go +++ b/addrmanager.go @@ -12,6 +12,7 @@ import ( "math/rand" "net" "os" + "path/filepath" "strconv" "sync" "time" @@ -20,7 +21,12 @@ import ( const ( // maxAddresses identifies the maximum number of addresses that the // address manager will track. - maxAddresses = 2500 + maxAddresses = 2500 + + // needAddressThreshold is the number of addresses under which the + // address manager will claim to need more addresses. + needAddressThreshold = 1000 + newAddressBufferSize = 50 // dumpAddressInterval is the interval used to dump the address @@ -94,12 +100,11 @@ func (a *AddrManager) updateAddress(netAddr, srcAddr *btcwire.NetAddress) { } // bad returns true if the address in question has not been tried in the last -// minute and meets one of the following -// criteria: -// 1) It claims to be from the future. -// 2) It hasn't been seen in over a month. -// 3) It has failed at least three times and never succeeded. -// 4) It has failed ten times in the last week. +// minute and meets one of the following criteria: +// 1) It claims to be from the future +// 2) It hasn't been seen in over a month +// 3) It has failed at least three times and never succeeded +// 4) It has failed ten times in the last week // All addresses that meet these criteria are assumed to be worthless and not // worth keeping hold of. func bad(ka *knownAddress) bool { @@ -132,7 +137,7 @@ func bad(ka *knownAddress) bool { return false } -// chance returns the selection probability for a known address. The priority +// chance returns the selection probability for a known address. The priority // depends upon how recent the address has been seen, how recent it was last // attempted and how often attempts to connect to it have failed. func chance(ka *knownAddress) float64 { @@ -146,7 +151,7 @@ func chance(ka *knownAddress) float64 { if ka.na.Timestamp.IsZero() { // use unix epoch to match bitcoind. dur = now.Sub(time.Unix(0, 0)) - + } else { dur = now.Sub(ka.na.Timestamp) } @@ -165,12 +170,12 @@ func chance(ka *knownAddress) float64 { c = 600.0 / (600.0 + lastSeen) - // very recent attempts are less likely to be retried. + // Very recent attempts are less likely to be retried. if lastTry > 60.0*10.0 { c *= 0.01 } - // failed attempts deprioritise + // Failed attempts deprioritise. if ka.attempts > 0 { c /= (float64(ka.attempts) * 1.5) } @@ -213,7 +218,7 @@ func (a *AddrManager) expireNew() { // pickTried selects an address from the tried bucket to be evicted. // We just choose the eldest. func (a *AddrManager) pickTried() *list.Element { - var oldest *knownAddress + var oldest *knownAddress var oldestElem *list.Element for e := a.addrTried.Front(); e != nil; e = e.Next() { ka := e.Value.(*knownAddress) @@ -226,6 +231,8 @@ func (a *AddrManager) pickTried() *list.Element { return oldestElem } +// knownAddress tracks information about a known network address that is used +// to determine how viable an address is. type knownAddress struct { na *btcwire.NetAddress attempts int @@ -280,38 +287,36 @@ out: func (a *AddrManager) savePeers() { // May give some way to specify this later. filename := "peers.json" + filePath := filepath.Join(cfg.DataDir, filename) var toSave JsonSave list := a.AddressCacheFlat() - log.Info("LIST ", list) toSave.AddrList = list - w, err := os.Create(filename) + w, err := os.Create(filePath) if err != nil { - log.Error("Error opening file: ", filename, err) + log.Error("Error opening file: ", filePath, err) } enc := json.NewEncoder(w) defer w.Close() enc.Encode(&toSave) - log.Info("Saving peer list.") } // loadPeers loads the known address from the saved file. If empty, missing, or // malformed file, just don't load anything and start fresh func (a *AddrManager) loadPeers() { - log.Info("Loading saved peers") - // May give some way to specify this later. filename := "peers.json" + filePath := filepath.Join(cfg.DataDir, filename) - _, err := os.Stat(filename) + _, err := os.Stat(filePath) if os.IsNotExist(err) { - log.Debugf("%s does not exist.\n", filename) + log.Debugf("%s does not exist.\n", filePath) } else { - r, err := os.Open(filename) + r, err := os.Open(filePath) if err != nil { - log.Error("Error opening file: ", filename, err) + log.Error("Error opening file: ", filePath, err) return } defer r.Close() @@ -320,7 +325,7 @@ func (a *AddrManager) loadPeers() { dec := json.NewDecoder(r) err = dec.Decode(&inList) if err != nil { - log.Error("Error reading:", filename, err) + log.Error("Error reading:", filePath, err) return } log.Debug("Adding ", len(inList.AddrList), " saved peers.") @@ -420,7 +425,7 @@ func (a *AddrManager) AddAddressByIP(addrIP string) { func (a *AddrManager) NeedMoreAddresses() bool { // NumAddresses handles concurrent access for us. - return a.NumAddresses()+1 <= maxAddresses + return a.NumAddresses() < needAddressThreshold } // NumAddresses returns the number of addresses known to the address manager. @@ -474,7 +479,7 @@ func (a *AddrManager) AddressCacheFlat() []string { func NewAddrManager() *AddrManager { am := AddrManager{ rand: rand.New(rand.NewSource(time.Now().UnixNano())), - addrIndex: make(map[string]*knownAddress), + addrIndex: make(map[string]*knownAddress), addrNew: make(map[string]*knownAddress), addrTried: list.New(), quit: make(chan bool), @@ -519,8 +524,7 @@ func (a *AddrManager) GetAddress(class string, newBias int) *knownAddress { for { // Pick a random entry in the list e := a.addrTried.Front() - for i := a.rand.Int63n(int64(a.addrTried.Len())); - i > 0; i-- { + for i := a.rand.Int63n(int64(a.addrTried.Len())); i > 0; i-- { e = e.Next() } ka := e.Value.(*knownAddress) @@ -601,8 +605,8 @@ func (a *AddrManager) Connected(addr *btcwire.NetAddress) { } } -// Good marks the given address as good. To be called after a successful -// connection and version exchange. If the address is unkownto the addresss +// Good marks the given address as good. To be called after a successful +// connection and version exchange. If the address is unknown to the addresss // manager it will be ignored. func (a *AddrManager) Good(addr *btcwire.NetAddress) { a.mtx.Lock() diff --git a/blockmanager.go b/blockmanager.go index 2e6740535..9fc80f008 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -297,6 +297,9 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) { peer.pushGetBlocksMsg(locator, orphanRoot) delete(b.blockPeer, *orphanRoot) break + } else { + log.Warnf("Notification for orphan %v with no peer", + orphanHash) } // A block has been accepted into the block chain. diff --git a/peer.go b/peer.go index d46beed73..d48328a7b 100644 --- a/peer.go +++ b/peer.go @@ -232,14 +232,14 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { } } - var err error - // Set up a netaddress for the peer to be used with addrmanager.. - p.na, err = newNetAddress(p.conn.RemoteAddr(), p.services) + // Set up a NetAddress for the peer to be used with AddrManager. + na, err := newNetAddress(p.conn.RemoteAddr(), p.services) if err != nil { log.Errorf("[PEER] %v", err) p.Disconnect() return } + p.na = na // Send verack. p.outputQueue <- btcwire.NewMsgVerAck() @@ -265,15 +265,18 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { // Request known addresses if the server address manager needs // more and the peer has a protocol version new enough to // include a timestamp with addresses. - // XXX bitcoind only does this if we have < 1000 addresses, not - // the max of 2400 hasTimestamp := p.protocolVersion >= btcwire.NetAddressTimeVersion if p.server.addrManager.NeedMoreAddresses() && hasTimestamp { p.outputQueue <- btcwire.NewMsgGetAddr() } - // Add inbound peer address to the server address manager. + + // Mark the address as a known good address. p.server.addrManager.Good(p.na) } else { + // A peer might not be advertising the same address that it + // actually connected from. One example of why this can happen + // is with NAT. Only add the address to the address manager if + // the addresses agree. if NetAddressKey(&msg.AddrMe) == NetAddressKey(p.na) { p.server.addrManager.AddAddress(p.na, p.na) p.server.addrManager.Good(p.na) @@ -893,8 +896,8 @@ out: break out } - markConnected := false // Handle each supported message type. + markConnected := false switch msg := rmsg.(type) { case *btcwire.MsgVersion: p.handleVersionMsg(msg) @@ -942,6 +945,9 @@ out: log.Debugf("[PEER] Received unhandled message of type %v: Fix Me", rmsg.Command()) } + + // Mark the address as currently connected and working as of + // now if one of the messages that trigger if markConnected && !p.disconnect { if p.na == nil { log.Warnf("we're getting stuff before we " + @@ -1085,39 +1091,17 @@ func (p *peer) Shutdown() { p.wg.Wait() } -// newPeer returns a new bitcoin peer for the provided server and connection. -// Use start to begin processing incoming and outgoing messages. -func newPeer(s *server, conn net.Conn) *peer { +// newPeerBase returns a new base bitcoin peer for the provided server and +// inbound flag. This is used by the newInboundPeer and newOutboundPeer +// functions to perform base setup needed by both types of peers. +func newPeerBase(s *server, inbound bool) *peer { p := peer{ server: s, protocolVersion: btcwire.ProtocolVersion, btcnet: s.btcnet, services: btcwire.SFNodeNetwork, - conn: conn, - addr: conn.RemoteAddr().String(), timeConnected: time.Now(), - inbound: true, - persistent: false, - knownAddresses: make(map[string]bool), - outputQueue: make(chan btcwire.Message, outputBufferSize), - quit: make(chan bool), - } - return &p -} - -// newOutbountPeer returns a new bitcoin peer for the provided server and -// address and connects to it asynchronously. If the connetion is successful -// then the peer will also be started. -func newOutboundPeer(s *server, addr string, persistent bool) *peer { - p := peer{ - server: s, - protocolVersion: btcwire.ProtocolVersion, - btcnet: s.btcnet, - services: btcwire.SFNodeNetwork, - addr: addr, - timeConnected: time.Now(), - inbound: false, - persistent: persistent, + inbound: inbound, knownAddresses: make(map[string]bool), knownInventory: NewMruInventoryMap(maxKnownInventory), requestQueue: list.New(), @@ -1127,29 +1111,50 @@ func newOutboundPeer(s *server, addr string, persistent bool) *peer { blockProcessed: make(chan bool, 1), quit: make(chan bool), } - // set up p.na with a temporary address that we are connecting to with - // faked up service flags. We will replace this with the real one after - // version negotiation is successful. The only failure case here would + return &p +} + +// newPeer returns a new inbound bitcoin peer for the provided server and +// connection. Use Start to begin processing incoming and outgoing messages. +func newInboundPeer(s *server, conn net.Conn) *peer { + p := newPeerBase(s, true) + p.conn = conn + p.addr = conn.RemoteAddr().String() + return p +} + +// newOutbountPeer returns a new outbound bitcoin peer for the provided server and +// address and connects to it asynchronously. If the connection is successful +// then the peer will also be started. +func newOutboundPeer(s *server, addr string, persistent bool) *peer { + p := newPeerBase(s, false) + p.addr = addr + p.persistent = persistent + + // Setup p.na with a temporary address that we are connecting to with + // faked up service flags. We will replace this with the real one after + // version negotiation is successful. The only failure case here would // be if the string was incomplete for connection so can't be split - // into address and port, and thus this would be invalid anyway. In - // which case we return nil to be handled by the caller. - // This must be done before we fork off the goroutine because as soon - // as this function returns the peer must have a valid netaddress. + // into address and port, and thus this would be invalid anyway. In + // which case we return nil to be handled by the caller. This must be + // done before we fork off the goroutine because as soon as this + // function returns the peer must have a valid netaddress. ip, portStr, err := net.SplitHostPort(addr) if err != nil { - log.Errorf("tried to create a new outbound peer with invalid "+ + log.Errorf("Tried to create a new outbound peer with invalid "+ "address %s: %v", addr, err) return nil } port, err := strconv.ParseUint(portStr, 10, 16) if err != nil { - log.Errorf("tried to create a new outbound peer with invalid "+ + log.Errorf("Tried to create a new outbound peer with invalid "+ "port %s: %v", portStr, err) return nil } p.na = btcwire.NewNetAddressIPPort(net.ParseIP(ip), uint16(port), 0) + p.wg.Add(1) go func() { // Select which dial method to call depending on whether or // not a proxy is configured. Also, add proxy information to @@ -1161,7 +1166,6 @@ func newOutboundPeer(s *server, addr string, persistent bool) *peer { dial = proxy.Dial faddr = fmt.Sprintf("%s via proxy %s", addr, cfg.Proxy) } - p.wg.Add(1) // Attempt to connect to the peer. If the connection fails and // this is a persistent connection, retry after the retry @@ -1170,10 +1174,10 @@ func newOutboundPeer(s *server, addr string, persistent bool) *peer { log.Debugf("[SRVR] Attempting to connect to %s", faddr) conn, err := dial("tcp", addr) if err != nil { - log.Errorf("[SRVR] failed to connect to %s: %v", + log.Errorf("[SRVR] Failed to connect to %s: %v", faddr, err) if !persistent { - p.server.donePeers <- &p + p.server.donePeers <- p p.wg.Done() return } @@ -1183,11 +1187,10 @@ func newOutboundPeer(s *server, addr string, persistent bool) *peer { continue } - // while we were sleeping trying to get connect then - // the server may have scheduled a shutdown. In that - // case we ditch the connection immediately. + // While we were sleeping trying to connect, the server + // may have scheduled a shutdown. In that case ditch + // the peer immediately. if !s.shutdown { - p.server.addrManager.Attempt(p.na) // Connection was successful so log it and start peer. @@ -1195,8 +1198,9 @@ func newOutboundPeer(s *server, addr string, persistent bool) *peer { p.conn = conn p.Start() } else { - p.server.donePeers <- &p + p.server.donePeers <- p } + // We are done here, Start() will have grabbed // additional waitgroup entries if we are not shutting // down. @@ -1204,5 +1208,5 @@ func newOutboundPeer(s *server, addr string, persistent bool) *peer { return } }() - return &p + return p } diff --git a/server.go b/server.go index c5135d78d..b58409954 100644 --- a/server.go +++ b/server.go @@ -213,7 +213,7 @@ func (s *server) listenHandler(listener net.Listener) { } continue } - s.AddPeer(newPeer(s, conn)) + s.AddPeer(newInboundPeer(s, conn)) } s.wg.Done() log.Tracef("[SRVR] Listener handler done for %s", listener.Addr())