[DEV-336] Update peer package to accommodate partial nodes (#176)

* [DEV-336] Split addresses in address manager by subnetwork id

* [DEV-336] Load DAG with subnetwork from the config file

* [DEV-336] Remove redundant checks in updateAddrNew and updateAddrTried
This commit is contained in:
Ori Newman 2019-01-28 13:55:58 +02:00 committed by stasatdaglabs
parent a82e6ae24a
commit 1e09d470f7
5 changed files with 269 additions and 164 deletions

View File

@ -22,10 +22,15 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/daglabs/btcd/util/subnetworkid"
"github.com/daglabs/btcd/dagconfig/daghash" "github.com/daglabs/btcd/dagconfig/daghash"
"github.com/daglabs/btcd/wire" "github.com/daglabs/btcd/wire"
) )
type newBucket [newBucketCount]map[string]*KnownAddress
type triedBucket [triedBucketCount]*list.List
// AddrManager provides a concurrency safe address manager for caching potential // AddrManager provides a concurrency safe address manager for caching potential
// peers on the bitcoin network. // peers on the bitcoin network.
type AddrManager struct { type AddrManager struct {
@ -35,21 +40,23 @@ type AddrManager struct {
rand *rand.Rand rand *rand.Rand
key [32]byte key [32]byte
addrIndex map[string]*KnownAddress // address key to ka for all addrs. addrIndex map[string]*KnownAddress // address key to ka for all addrs.
addrNew [newBucketCount]map[string]*KnownAddress addrNew map[subnetworkid.SubnetworkID]*newBucket
addrTried [triedBucketCount]*list.List addrTried map[subnetworkid.SubnetworkID]*triedBucket
started int32 started int32
shutdown int32 shutdown int32
wg sync.WaitGroup wg sync.WaitGroup
quit chan struct{} quit chan struct{}
nTried int nTried map[subnetworkid.SubnetworkID]int
nNew int nNew map[subnetworkid.SubnetworkID]int
lamtx sync.Mutex lamtx sync.Mutex
localAddresses map[string]*localAddress localAddresses map[string]*localAddress
localSubnetworkID *subnetworkid.SubnetworkID
} }
type serializedKnownAddress struct { type serializedKnownAddress struct {
Addr string Addr string
Src string Src string
SubnetworkID string
Attempts int Attempts int
TimeStamp int64 TimeStamp int64
LastAttempt int64 LastAttempt int64
@ -57,12 +64,15 @@ type serializedKnownAddress struct {
// no refcount or tried, that is available from context. // no refcount or tried, that is available from context.
} }
type serializedNewBucket [newBucketCount][]string
type serializedTriedBucket [triedBucketCount][]string
type serializedAddrManager struct { type serializedAddrManager struct {
Version int Version int
Key [32]byte Key [32]byte
Addresses []*serializedKnownAddress Addresses []*serializedKnownAddress
NewBuckets [newBucketCount][]string // string is NetAddressKey NewBuckets map[string]*serializedNewBucket // string is NetAddressKey
TriedBuckets [triedBucketCount][]string TriedBuckets map[string]*serializedTriedBucket
} }
type localAddress struct { type localAddress struct {
@ -206,49 +216,72 @@ func (a *AddrManager) updateAddress(netAddr, srcAddr *wire.NetAddress) {
// updated elsewhere in the addrmanager code and would otherwise // updated elsewhere in the addrmanager code and would otherwise
// change the actual netaddress on the peer. // change the actual netaddress on the peer.
netAddrCopy := *netAddr netAddrCopy := *netAddr
ka = &KnownAddress{na: &netAddrCopy, srcAddr: srcAddr} ka = &KnownAddress{na: &netAddrCopy, srcAddr: srcAddr, subnetworkID: &wire.SubnetworkIDUnknown}
a.addrIndex[addr] = ka a.addrIndex[addr] = ka
a.nNew++ a.nNew[wire.SubnetworkIDUnknown]++
// XXX time penalty? // XXX time penalty?
} }
bucket := a.getNewBucket(netAddr, srcAddr) bucket := a.getNewBucket(netAddr, srcAddr)
// Already exists? // Already exists?
if _, ok := a.addrNew[bucket][addr]; ok { if a.addrNew[*ka.subnetworkID] != nil {
if _, ok := a.addrNew[*ka.subnetworkID][bucket][addr]; ok {
return return
} }
}
// Enforce max addresses. // Enforce max addresses.
if len(a.addrNew[bucket]) > newBucketSize { if a.addrNew[*ka.subnetworkID] != nil && len(a.addrNew[*ka.subnetworkID][bucket]) > newBucketSize {
log.Tracef("new bucket is full, expiring old") log.Tracef("new bucket is full, expiring old")
a.expireNew(bucket) a.expireNew(bucket)
} }
// Add to new bucket. // Add to new bucket.
ka.refs++ ka.refs++
a.addrNew[bucket][addr] = ka a.updateAddrNew(bucket, addr, ka)
log.Tracef("Added new address %s for a total of %d addresses", addr, log.Tracef("Added new address %s for a total of %d addresses", addr,
a.nTried+a.nNew) a.nTried[*ka.subnetworkID]+a.nNew[*ka.subnetworkID])
}
func (a *AddrManager) updateAddrNew(bucket int, addr string, ka *KnownAddress) {
if _, ok := a.addrNew[*ka.subnetworkID]; !ok {
a.addrNew[*ka.subnetworkID] = &newBucket{}
for i := range a.addrNew[*ka.subnetworkID] {
a.addrNew[*ka.subnetworkID][i] = make(map[string]*KnownAddress)
}
}
a.addrNew[*ka.subnetworkID][bucket][addr] = ka
}
func (a *AddrManager) updateAddrTried(bucket int, ka *KnownAddress) {
if _, ok := a.addrTried[*ka.subnetworkID]; !ok {
a.addrTried[*ka.subnetworkID] = &triedBucket{}
for i := range a.addrTried[*ka.subnetworkID] {
a.addrTried[*ka.subnetworkID][i] = list.New()
}
}
a.addrTried[*ka.subnetworkID][bucket].PushBack(ka)
} }
// expireNew makes space in the new buckets by expiring the really bad entries. // expireNew makes space in the new buckets by expiring the really bad entries.
// If no bad entries are available we look at a few and remove the oldest. // If no bad entries are available we look at a few and remove the oldest.
func (a *AddrManager) expireNew(bucket int) { func (a *AddrManager) expireNew(bucket int) {
for subnetworkID := range a.addrNew {
// First see if there are any entries that are so bad we can just throw // First see if there are any entries that are so bad we can just throw
// them away. otherwise we throw away the oldest entry in the cache. // them away. otherwise we throw away the oldest entry in the cache.
// Bitcoind here chooses four random and just throws the oldest of // Bitcoind here chooses four random and just throws the oldest of
// those away, but we keep track of oldest in the initial traversal and // those away, but we keep track of oldest in the initial traversal and
// use that information instead. // use that information instead.
var oldest *KnownAddress var oldest *KnownAddress
for k, v := range a.addrNew[bucket] { for k, v := range a.addrNew[subnetworkID][bucket] {
if v.isBad() { if v.isBad() {
log.Tracef("expiring bad address %v", k) log.Tracef("expiring bad address %v", k)
delete(a.addrNew[bucket], k) delete(a.addrNew[subnetworkID][bucket], k)
v.refs-- v.refs--
if v.refs == 0 { if v.refs == 0 {
a.nNew-- a.nNew[subnetworkID]--
delete(a.addrIndex, k) delete(a.addrIndex, k)
} }
continue continue
@ -264,22 +297,24 @@ func (a *AddrManager) expireNew(bucket int) {
key := NetAddressKey(oldest.na) key := NetAddressKey(oldest.na)
log.Tracef("expiring oldest address %v", key) log.Tracef("expiring oldest address %v", key)
delete(a.addrNew[bucket], key) delete(a.addrNew[subnetworkID][bucket], key)
oldest.refs-- oldest.refs--
if oldest.refs == 0 { if oldest.refs == 0 {
a.nNew-- a.nNew[subnetworkID]--
delete(a.addrIndex, key) delete(a.addrIndex, key)
} }
} }
}
} }
// pickTried selects an address from the tried bucket to be evicted. // pickTried selects an address from the tried bucket to be evicted.
// We just choose the eldest. Bitcoind selects 4 random entries and throws away // We just choose the eldest. Bitcoind selects 4 random entries and throws away
// the older of them. // the older of them.
func (a *AddrManager) pickTried(bucket int) *list.Element { func (a *AddrManager) pickTried(subnetworkID *subnetworkid.SubnetworkID, bucket int) *list.Element {
var oldest *KnownAddress var oldest *KnownAddress
var oldestElem *list.Element var oldestElem *list.Element
for e := a.addrTried[bucket].Front(); e != nil; e = e.Next() { for e := a.addrTried[*subnetworkID][bucket].Front(); e != nil; e = e.Next() {
ka := e.Value.(*KnownAddress) ka := e.Value.(*KnownAddress)
if oldest == nil || oldest.na.Timestamp.After(ka.na.Timestamp) { if oldest == nil || oldest.na.Timestamp.After(ka.na.Timestamp) {
oldestElem = e oldestElem = e
@ -369,6 +404,7 @@ func (a *AddrManager) savePeers() {
for k, v := range a.addrIndex { for k, v := range a.addrIndex {
ska := new(serializedKnownAddress) ska := new(serializedKnownAddress)
ska.Addr = k ska.Addr = k
ska.SubnetworkID = v.subnetworkID.String()
ska.TimeStamp = v.na.Timestamp.Unix() ska.TimeStamp = v.na.Timestamp.Unix()
ska.Src = NetAddressKey(v.srcAddr) ska.Src = NetAddressKey(v.srcAddr)
ska.Attempts = v.attempts ska.Attempts = v.attempts
@ -379,23 +415,37 @@ func (a *AddrManager) savePeers() {
sam.Addresses[i] = ska sam.Addresses[i] = ska
i++ i++
} }
for i := range a.addrNew {
sam.NewBuckets[i] = make([]string, len(a.addrNew[i])) sam.NewBuckets = make(map[string]*serializedNewBucket)
for subnetworkID := range a.addrNew {
subnetworkIDStr := subnetworkID.String()
sam.NewBuckets[subnetworkIDStr] = &serializedNewBucket{}
for i := range a.addrNew[subnetworkID] {
sam.NewBuckets[subnetworkIDStr][i] = make([]string, len(a.addrNew[subnetworkID][i]))
j := 0 j := 0
for k := range a.addrNew[i] { for k := range a.addrNew[subnetworkID][i] {
sam.NewBuckets[i][j] = k sam.NewBuckets[subnetworkIDStr][i][j] = k
j++ j++
} }
} }
for i := range a.addrTried { }
sam.TriedBuckets[i] = make([]string, a.addrTried[i].Len())
sam.TriedBuckets = make(map[string]*serializedTriedBucket)
for subnetworkID := range a.addrTried {
subnetworkIDStr := subnetworkID.String()
sam.TriedBuckets[subnetworkIDStr] = &serializedTriedBucket{}
for i := range a.addrTried[subnetworkID] {
sam.TriedBuckets[subnetworkIDStr][i] = make([]string, a.addrTried[subnetworkID][i].Len())
j := 0 j := 0
for e := a.addrTried[i].Front(); e != nil; e = e.Next() { for e := a.addrTried[subnetworkID][i].Front(); e != nil; e = e.Next() {
ka := e.Value.(*KnownAddress) ka := e.Value.(*KnownAddress)
sam.TriedBuckets[i][j] = NetAddressKey(ka.na) sam.TriedBuckets[subnetworkIDStr][i][j] = NetAddressKey(ka.na)
j++ j++
} }
} }
}
w, err := os.Create(a.peersFile) w, err := os.Create(a.peersFile)
if err != nil { if err != nil {
@ -428,7 +478,7 @@ func (a *AddrManager) loadPeers() {
a.reset() a.reset()
return return
} }
log.Infof("Loaded %d addresses from file '%s'", a.numAddresses(), a.peersFile) log.Infof("Loaded %d addresses from file '%s'", a.totalNumAddresses(), a.peersFile)
} }
func (a *AddrManager) deserializePeers(filePath string) error { func (a *AddrManager) deserializePeers(filePath string) error {
@ -468,14 +518,24 @@ func (a *AddrManager) deserializePeers(filePath string) error {
return fmt.Errorf("failed to deserialize netaddress "+ return fmt.Errorf("failed to deserialize netaddress "+
"%s: %v", v.Src, err) "%s: %v", v.Src, err)
} }
ka.subnetworkID, err = subnetworkid.NewFromStr(v.SubnetworkID)
if err != nil {
return fmt.Errorf("failed to deserialize subnetwork id "+
"%s: %v", v.SubnetworkID, err)
}
ka.attempts = v.Attempts ka.attempts = v.Attempts
ka.lastattempt = time.Unix(v.LastAttempt, 0) ka.lastattempt = time.Unix(v.LastAttempt, 0)
ka.lastsuccess = time.Unix(v.LastSuccess, 0) ka.lastsuccess = time.Unix(v.LastSuccess, 0)
a.addrIndex[NetAddressKey(ka.na)] = ka a.addrIndex[NetAddressKey(ka.na)] = ka
} }
for i := range sam.NewBuckets { for subnetworkIDStr := range sam.NewBuckets {
for _, val := range sam.NewBuckets[i] { subnetworkID, err := subnetworkid.NewFromStr(subnetworkIDStr)
if err != nil {
return err
}
for i := range sam.NewBuckets[subnetworkIDStr] {
for _, val := range sam.NewBuckets[subnetworkIDStr][i] {
ka, ok := a.addrIndex[val] ka, ok := a.addrIndex[val]
if !ok { if !ok {
return fmt.Errorf("newbucket contains %s but "+ return fmt.Errorf("newbucket contains %s but "+
@ -483,14 +543,20 @@ func (a *AddrManager) deserializePeers(filePath string) error {
} }
if ka.refs == 0 { if ka.refs == 0 {
a.nNew++ a.nNew[*subnetworkID]++
} }
ka.refs++ ka.refs++
a.addrNew[i][val] = ka a.addrNew[*subnetworkID][i][val] = ka
} }
} }
for i := range sam.TriedBuckets { }
for _, val := range sam.TriedBuckets[i] { for subnetworkIDStr := range sam.NewBuckets {
subnetworkID, err := subnetworkid.NewFromStr(subnetworkIDStr)
if err != nil {
return err
}
for i := range sam.TriedBuckets[subnetworkIDStr] {
for _, val := range sam.TriedBuckets[subnetworkIDStr][i] {
ka, ok := a.addrIndex[val] ka, ok := a.addrIndex[val]
if !ok { if !ok {
return fmt.Errorf("Newbucket contains %s but "+ return fmt.Errorf("Newbucket contains %s but "+
@ -498,8 +564,9 @@ func (a *AddrManager) deserializePeers(filePath string) error {
} }
ka.tried = true ka.tried = true
a.nTried++ a.nTried[*subnetworkID]++
a.addrTried[i].PushBack(ka) a.addrTried[*subnetworkID][i].PushBack(ka)
}
} }
} }
@ -609,17 +676,30 @@ func (a *AddrManager) AddAddressByIP(addrIP string) error {
return nil return nil
} }
// NumAddresses returns the number of addresses known to the address manager. // numAddresses returns the number of addresses that belongs to a specific subnetwork id
func (a *AddrManager) numAddresses() int { // which are known to the address manager.
return a.nTried + a.nNew func (a *AddrManager) numAddresses(subnetworkID *subnetworkid.SubnetworkID) int {
return a.nTried[*subnetworkID] + a.nNew[*subnetworkID]
} }
// NumAddresses returns the number of addresses known to the address manager. // totalNumAddresses returns the number of addresses known to the address manager.
func (a *AddrManager) NumAddresses() int { func (a *AddrManager) totalNumAddresses() int {
total := 0
for _, numAddresses := range a.nTried {
total += numAddresses
}
for _, numAddresses := range a.nNew {
total += numAddresses
}
return total
}
// TotalNumAddresses returns the number of addresses known to the address manager.
func (a *AddrManager) TotalNumAddresses() int {
a.mtx.Lock() a.mtx.Lock()
defer a.mtx.Unlock() defer a.mtx.Unlock()
return a.numAddresses() return a.totalNumAddresses()
} }
// NeedMoreAddresses returns whether or not the address manager needs more // NeedMoreAddresses returns whether or not the address manager needs more
@ -628,7 +708,7 @@ func (a *AddrManager) NeedMoreAddresses() bool {
a.mtx.Lock() a.mtx.Lock()
defer a.mtx.Unlock() defer a.mtx.Unlock()
return a.numAddresses() < needAddressThreshold return a.numAddresses(a.localSubnetworkID)+a.numAddresses(&wire.SubnetworkIDUnknown) < needAddressThreshold
} }
// AddressCache returns the current address cache. It must be treated as // AddressCache returns the current address cache. It must be treated as
@ -673,12 +753,11 @@ func (a *AddrManager) reset() {
// fill key with bytes from a good random source. // fill key with bytes from a good random source.
io.ReadFull(crand.Reader, a.key[:]) io.ReadFull(crand.Reader, a.key[:])
for i := range a.addrNew { a.addrNew = make(map[subnetworkid.SubnetworkID]*newBucket)
a.addrNew[i] = make(map[string]*KnownAddress) a.addrTried = make(map[subnetworkid.SubnetworkID]*triedBucket)
}
for i := range a.addrTried { a.nNew = make(map[subnetworkid.SubnetworkID]int)
a.addrTried[i] = list.New() a.nTried = make(map[subnetworkid.SubnetworkID]int)
}
} }
// HostToNetAddress returns a netaddress given a host address. If the address // HostToNetAddress returns a netaddress given a host address. If the address
@ -742,26 +821,27 @@ func (a *AddrManager) GetAddress() *KnownAddress {
a.mtx.Lock() a.mtx.Lock()
defer a.mtx.Unlock() defer a.mtx.Unlock()
if a.numAddresses() == 0 { subnetworkID := *a.localSubnetworkID
return nil if a.nTried[subnetworkID] == 0 && a.nNew[subnetworkID] == 0 {
subnetworkID = wire.SubnetworkIDUnknown
} }
// Use a 50% chance for choosing between tried and new table entries. // Use a 50% chance for choosing between tried and new table entries.
if a.nTried > 0 && (a.nNew == 0 || a.rand.Intn(2) == 0) { if a.nTried[subnetworkID] > 0 && (a.nNew[subnetworkID] == 0 || a.rand.Intn(2) == 0) {
// Tried entry. // Tried entry.
large := 1 << 30 large := 1 << 30
factor := 1.0 factor := 1.0
for { for {
// pick a random bucket. // pick a random bucket.
bucket := a.rand.Intn(len(a.addrTried)) bucket := a.rand.Intn(len(a.addrTried[subnetworkID]))
if a.addrTried[bucket].Len() == 0 { if a.addrTried[subnetworkID][bucket].Len() == 0 {
continue continue
} }
// Pick a random entry in the list // Pick a random entry in the list
e := a.addrTried[bucket].Front() e := a.addrTried[subnetworkID][bucket].Front()
for i := for i :=
a.rand.Int63n(int64(a.addrTried[bucket].Len())); i > 0; i-- { a.rand.Int63n(int64(a.addrTried[subnetworkID][bucket].Len())); i > 0; i-- {
e = e.Next() e = e.Next()
} }
ka := e.Value.(*KnownAddress) ka := e.Value.(*KnownAddress)
@ -773,21 +853,21 @@ func (a *AddrManager) GetAddress() *KnownAddress {
} }
factor *= 1.2 factor *= 1.2
} }
} else { } else if a.nNew[subnetworkID] > 0 {
// new node. // new node.
// XXX use a closure/function to avoid repeating this. // XXX use a closure/function to avoid repeating this.
large := 1 << 30 large := 1 << 30
factor := 1.0 factor := 1.0
for { for {
// Pick a random bucket. // Pick a random bucket.
bucket := a.rand.Intn(len(a.addrNew)) bucket := a.rand.Intn(len(a.addrNew[subnetworkID]))
if len(a.addrNew[bucket]) == 0 { if len(a.addrNew[subnetworkID][bucket]) == 0 {
continue continue
} }
// Then, a random entry in it. // Then, a random entry in it.
var ka *KnownAddress var ka *KnownAddress
nth := a.rand.Intn(len(a.addrNew[bucket])) nth := a.rand.Intn(len(a.addrNew[subnetworkID][bucket]))
for _, value := range a.addrNew[bucket] { for _, value := range a.addrNew[subnetworkID][bucket] {
if nth == 0 { if nth == 0 {
ka = value ka = value
} }
@ -802,6 +882,7 @@ func (a *AddrManager) GetAddress() *KnownAddress {
factor *= 1.2 factor *= 1.2
} }
} }
return nil
} }
func (a *AddrManager) find(addr *wire.NetAddress) *KnownAddress { func (a *AddrManager) find(addr *wire.NetAddress) *KnownAddress {
@ -851,7 +932,7 @@ func (a *AddrManager) Connected(addr *wire.NetAddress) {
// Good marks the given address as good. To be called after a successful // Good marks the given address as good. To be called after a successful
// connection and version exchange. If the address is unknown to the address // connection and version exchange. If the address is unknown to the address
// manager it will be ignored. // manager it will be ignored.
func (a *AddrManager) Good(addr *wire.NetAddress) { func (a *AddrManager) Good(addr *wire.NetAddress, subnetworkID *subnetworkid.SubnetworkID) {
a.mtx.Lock() a.mtx.Lock()
defer a.mtx.Unlock() defer a.mtx.Unlock()
@ -859,6 +940,7 @@ func (a *AddrManager) Good(addr *wire.NetAddress) {
if ka == nil { if ka == nil {
return return
} }
oldSubnetworkID := ka.subnetworkID
// ka.Timestamp is not updated here to avoid leaking information // ka.Timestamp is not updated here to avoid leaking information
// about currently connected peers. // about currently connected peers.
@ -866,6 +948,7 @@ func (a *AddrManager) Good(addr *wire.NetAddress) {
ka.lastsuccess = now ka.lastsuccess = now
ka.lastattempt = now ka.lastattempt = now
ka.attempts = 0 ka.attempts = 0
ka.subnetworkID = subnetworkID
// move to tried set, optionally evicting other addresses if neeed. // move to tried set, optionally evicting other addresses if neeed.
if ka.tried { if ka.tried {
@ -878,17 +961,17 @@ func (a *AddrManager) Good(addr *wire.NetAddress) {
// record one of the buckets in question and call it the `first' // record one of the buckets in question and call it the `first'
addrKey := NetAddressKey(addr) addrKey := NetAddressKey(addr)
oldBucket := -1 oldBucket := -1
for i := range a.addrNew { for i := range a.addrNew[*oldSubnetworkID] {
// we check for existence so we can record the first one // we check for existence so we can record the first one
if _, ok := a.addrNew[i][addrKey]; ok { if _, ok := a.addrNew[*oldSubnetworkID][i][addrKey]; ok {
delete(a.addrNew[i], addrKey) delete(a.addrNew[*oldSubnetworkID][i], addrKey)
ka.refs-- ka.refs--
if oldBucket == -1 { if oldBucket == -1 {
oldBucket = i oldBucket = i
} }
} }
} }
a.nNew-- a.nNew[*oldSubnetworkID]--
if oldBucket == -1 { if oldBucket == -1 {
// What? wasn't in a bucket after all.... Panic? // What? wasn't in a bucket after all.... Panic?
@ -898,15 +981,15 @@ func (a *AddrManager) Good(addr *wire.NetAddress) {
bucket := a.getTriedBucket(ka.na) bucket := a.getTriedBucket(ka.na)
// Room in this tried bucket? // Room in this tried bucket?
if a.addrTried[bucket].Len() < triedBucketSize { if a.nTried[*ka.subnetworkID] == 0 || a.addrTried[*ka.subnetworkID][bucket].Len() < triedBucketSize {
ka.tried = true ka.tried = true
a.addrTried[bucket].PushBack(ka) a.updateAddrTried(bucket, ka)
a.nTried++ a.nTried[*ka.subnetworkID]++
return return
} }
// No room, we have to evict something else. // No room, we have to evict something else.
entry := a.pickTried(bucket) entry := a.pickTried(ka.subnetworkID, bucket)
rmka := entry.Value.(*KnownAddress) rmka := entry.Value.(*KnownAddress)
// First bucket it would have been put in. // First bucket it would have been put in.
@ -914,7 +997,7 @@ func (a *AddrManager) Good(addr *wire.NetAddress) {
// If no room in the original bucket, we put it in a bucket we just // If no room in the original bucket, we put it in a bucket we just
// freed up a space in. // freed up a space in.
if len(a.addrNew[newBucket]) >= newBucketSize { if len(a.addrNew[*ka.subnetworkID][newBucket]) >= newBucketSize {
newBucket = oldBucket newBucket = oldBucket
} }
@ -928,13 +1011,13 @@ func (a *AddrManager) Good(addr *wire.NetAddress) {
// We don't touch a.nTried here since the number of tried stays the same // We don't touch a.nTried here since the number of tried stays the same
// but we decemented new above, raise it again since we're putting // but we decemented new above, raise it again since we're putting
// something back. // something back.
a.nNew++ a.nNew[*ka.subnetworkID]++
rmkey := NetAddressKey(rmka.na) rmkey := NetAddressKey(rmka.na)
log.Tracef("Replacing %s with %s in tried", rmkey, addrKey) log.Tracef("Replacing %s with %s in tried", rmkey, addrKey)
// We made sure there is space here just above. // We made sure there is space here just above.
a.addrNew[newBucket][rmkey] = rmka a.addrNew[*ka.subnetworkID][newBucket][rmkey] = rmka
} }
// AddLocalAddress adds na to the list of known local addresses to advertise // AddLocalAddress adds na to the list of known local addresses to advertise
@ -1082,13 +1165,14 @@ func (a *AddrManager) GetBestLocalAddress(remoteAddr *wire.NetAddress) *wire.Net
// New returns a new bitcoin address manager. // New returns a new bitcoin address manager.
// Use Start to begin processing asynchronous address updates. // Use Start to begin processing asynchronous address updates.
func New(dataDir string, lookupFunc func(string) ([]net.IP, error)) *AddrManager { func New(dataDir string, lookupFunc func(string) ([]net.IP, error), subnetworkID *subnetworkid.SubnetworkID) *AddrManager {
am := AddrManager{ am := AddrManager{
peersFile: filepath.Join(dataDir, "peers.json"), peersFile: filepath.Join(dataDir, "peers.json"),
lookupFunc: lookupFunc, lookupFunc: lookupFunc,
rand: rand.New(rand.NewSource(time.Now().UnixNano())), rand: rand.New(rand.NewSource(time.Now().UnixNano())),
quit: make(chan struct{}), quit: make(chan struct{}),
localAddresses: make(map[string]*localAddress), localAddresses: make(map[string]*localAddress),
localSubnetworkID: subnetworkID,
} }
am.reset() am.reset()
return &am return &am

View File

@ -12,6 +12,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/daglabs/btcd/util/subnetworkid"
"github.com/daglabs/btcd/addrmgr" "github.com/daglabs/btcd/addrmgr"
"github.com/daglabs/btcd/wire" "github.com/daglabs/btcd/wire"
) )
@ -103,7 +105,7 @@ func lookupFunc(host string) ([]net.IP, error) {
} }
func TestStartStop(t *testing.T) { func TestStartStop(t *testing.T) {
n := addrmgr.New("teststartstop", lookupFunc) n := addrmgr.New("teststartstop", lookupFunc, &wire.SubnetworkIDSupportsAll)
n.Start() n.Start()
err := n.Stop() err := n.Stop()
if err != nil { if err != nil {
@ -136,7 +138,7 @@ func TestAddAddressByIP(t *testing.T) {
}, },
} }
amgr := addrmgr.New("testaddressbyip", nil) amgr := addrmgr.New("testaddressbyip", nil, &wire.SubnetworkIDSupportsAll)
for i, test := range tests { for i, test := range tests {
err := amgr.AddAddressByIP(test.addrIP) err := amgr.AddAddressByIP(test.addrIP)
if test.err != nil && err == nil { if test.err != nil && err == nil {
@ -192,7 +194,7 @@ func TestAddLocalAddress(t *testing.T) {
true, true,
}, },
} }
amgr := addrmgr.New("testaddlocaladdress", nil) amgr := addrmgr.New("testaddlocaladdress", nil, &wire.SubnetworkIDSupportsAll)
for x, test := range tests { for x, test := range tests {
result := amgr.AddLocalAddress(&test.address, test.priority) result := amgr.AddLocalAddress(&test.address, test.priority)
if result == nil && !test.valid { if result == nil && !test.valid {
@ -209,7 +211,7 @@ func TestAddLocalAddress(t *testing.T) {
} }
func TestAttempt(t *testing.T) { func TestAttempt(t *testing.T) {
n := addrmgr.New("testattempt", lookupFunc) n := addrmgr.New("testattempt", lookupFunc, &wire.SubnetworkIDSupportsAll)
// Add a new address and get it // Add a new address and get it
err := n.AddAddressByIP(someIP + ":8333") err := n.AddAddressByIP(someIP + ":8333")
@ -231,7 +233,7 @@ func TestAttempt(t *testing.T) {
} }
func TestConnected(t *testing.T) { func TestConnected(t *testing.T) {
n := addrmgr.New("testconnected", lookupFunc) n := addrmgr.New("testconnected", lookupFunc, &wire.SubnetworkIDSupportsAll)
// Add a new address and get it // Add a new address and get it
err := n.AddAddressByIP(someIP + ":8333") err := n.AddAddressByIP(someIP + ":8333")
@ -251,7 +253,7 @@ func TestConnected(t *testing.T) {
} }
func TestNeedMoreAddresses(t *testing.T) { func TestNeedMoreAddresses(t *testing.T) {
n := addrmgr.New("testneedmoreaddresses", lookupFunc) n := addrmgr.New("testneedmoreaddresses", lookupFunc, &wire.SubnetworkIDSupportsAll)
addrsToAdd := 1500 addrsToAdd := 1500
b := n.NeedMoreAddresses() b := n.NeedMoreAddresses()
if !b { if !b {
@ -271,7 +273,7 @@ func TestNeedMoreAddresses(t *testing.T) {
srcAddr := wire.NewNetAddressIPPort(net.IPv4(173, 144, 173, 111), 8333, 0) srcAddr := wire.NewNetAddressIPPort(net.IPv4(173, 144, 173, 111), 8333, 0)
n.AddAddresses(addrs, srcAddr) n.AddAddresses(addrs, srcAddr)
numAddrs := n.NumAddresses() numAddrs := n.TotalNumAddresses()
if numAddrs > addrsToAdd { if numAddrs > addrsToAdd {
t.Errorf("Number of addresses is too many %d vs %d", numAddrs, addrsToAdd) t.Errorf("Number of addresses is too many %d vs %d", numAddrs, addrsToAdd)
} }
@ -283,7 +285,7 @@ func TestNeedMoreAddresses(t *testing.T) {
} }
func TestGood(t *testing.T) { func TestGood(t *testing.T) {
n := addrmgr.New("testgood", lookupFunc) n := addrmgr.New("testgood", lookupFunc, &wire.SubnetworkIDSupportsAll)
addrsToAdd := 64 * 64 addrsToAdd := 64 * 64
addrs := make([]*wire.NetAddress, addrsToAdd) addrs := make([]*wire.NetAddress, addrsToAdd)
@ -300,10 +302,10 @@ func TestGood(t *testing.T) {
n.AddAddresses(addrs, srcAddr) n.AddAddresses(addrs, srcAddr)
for _, addr := range addrs { for _, addr := range addrs {
n.Good(addr) n.Good(addr, &wire.SubnetworkIDSupportsAll)
} }
numAddrs := n.NumAddresses() numAddrs := n.TotalNumAddresses()
if numAddrs >= addrsToAdd { if numAddrs >= addrsToAdd {
t.Errorf("Number of addresses is too many: %d vs %d", numAddrs, addrsToAdd) t.Errorf("Number of addresses is too many: %d vs %d", numAddrs, addrsToAdd)
} }
@ -315,7 +317,8 @@ func TestGood(t *testing.T) {
} }
func TestGetAddress(t *testing.T) { func TestGetAddress(t *testing.T) {
n := addrmgr.New("testgetaddress", lookupFunc) localSubnetworkID := &subnetworkid.SubnetworkID{0xff}
n := addrmgr.New("testgetaddress", lookupFunc, localSubnetworkID)
// Get an address from an empty set (should error) // Get an address from an empty set (should error)
if rv := n.GetAddress(); rv != nil { if rv := n.GetAddress(); rv != nil {
@ -334,9 +337,12 @@ func TestGetAddress(t *testing.T) {
if ka.NetAddress().IP.String() != someIP { if ka.NetAddress().IP.String() != someIP {
t.Errorf("Wrong IP: got %v, want %v", ka.NetAddress().IP.String(), someIP) t.Errorf("Wrong IP: got %v, want %v", ka.NetAddress().IP.String(), someIP)
} }
if *ka.SubnetworkID() != wire.SubnetworkIDUnknown {
t.Errorf("Wrong Subnetwork ID: got %v, want %v", *ka.SubnetworkID(), wire.SubnetworkIDUnknown)
}
// Mark this as a good address and get it // Mark this as a good address and get it
n.Good(ka.NetAddress()) n.Good(ka.NetAddress(), localSubnetworkID)
ka = n.GetAddress() ka = n.GetAddress()
if ka == nil { if ka == nil {
t.Fatalf("Did not get an address where there is one in the pool") t.Fatalf("Did not get an address where there is one in the pool")
@ -344,8 +350,11 @@ func TestGetAddress(t *testing.T) {
if ka.NetAddress().IP.String() != someIP { if ka.NetAddress().IP.String() != someIP {
t.Errorf("Wrong IP: got %v, want %v", ka.NetAddress().IP.String(), someIP) t.Errorf("Wrong IP: got %v, want %v", ka.NetAddress().IP.String(), someIP)
} }
if *ka.SubnetworkID() != *localSubnetworkID {
t.Errorf("Wrong Subnetwork ID: got %v, want %v", ka.SubnetworkID(), localSubnetworkID)
}
numAddrs := n.NumAddresses() numAddrs := n.TotalNumAddresses()
if numAddrs != 1 { if numAddrs != 1 {
t.Errorf("Wrong number of addresses: got %d, want %d", numAddrs, 1) t.Errorf("Wrong number of addresses: got %d, want %d", numAddrs, 1)
} }
@ -401,7 +410,7 @@ func TestGetBestLocalAddress(t *testing.T) {
*/ */
} }
amgr := addrmgr.New("testgetbestlocaladdress", nil) amgr := addrmgr.New("testgetbestlocaladdress", nil, &wire.SubnetworkIDSupportsAll)
// Test against default when there's no address // Test against default when there's no address
for x, test := range tests { for x, test := range tests {

View File

@ -7,6 +7,8 @@ package addrmgr
import ( import (
"time" "time"
"github.com/daglabs/btcd/util/subnetworkid"
"github.com/daglabs/btcd/wire" "github.com/daglabs/btcd/wire"
) )
@ -20,6 +22,7 @@ type KnownAddress struct {
lastsuccess time.Time lastsuccess time.Time
tried bool tried bool
refs int // reference count of new buckets refs int // reference count of new buckets
subnetworkID *subnetworkid.SubnetworkID
} }
// NetAddress returns the underlying wire.NetAddress associated with the // NetAddress returns the underlying wire.NetAddress associated with the
@ -28,6 +31,11 @@ func (ka *KnownAddress) NetAddress() *wire.NetAddress {
return ka.na return ka.na
} }
// SubnetworkID returns the subnetwork ID of the known address.
func (ka *KnownAddress) SubnetworkID() *subnetworkid.SubnetworkID {
return ka.subnetworkID
}
// LastAttempt returns the last time the known address was attempted. // LastAttempt returns the last time the known address was attempted.
func (ka *KnownAddress) LastAttempt() time.Time { func (ka *KnownAddress) LastAttempt() time.Time {
return ka.lastattempt return ka.lastattempt

View File

@ -429,7 +429,7 @@ func (sp *Peer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) {
} }
// Mark the address as a known good address. // Mark the address as a known good address.
addrManager.Good(sp.NA()) addrManager.Good(sp.NA(), &msg.SubnetworkID)
} }
} }
@ -2313,7 +2313,7 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params
services &^= wire.SFNodeCF services &^= wire.SFNodeCF
} }
amgr := addrmgr.New(config.MainConfig().DataDir, serverutils.BTCDLookup) amgr := addrmgr.New(config.MainConfig().DataDir, serverutils.BTCDLookup, config.MainConfig().SubnetworkID)
var listeners []net.Listener var listeners []net.Listener
var nat serverutils.NAT var nat serverutils.NAT
@ -2403,6 +2403,7 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params
TimeSource: s.TimeSource, TimeSource: s.TimeSource,
SigCache: s.SigCache, SigCache: s.SigCache,
IndexManager: indexManager, IndexManager: indexManager,
SubnetworkID: config.MainConfig().SubnetworkID,
}) })
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -119,6 +119,9 @@ var (
// SubnetworkIDRegistry is the subnetwork ID which is used for adding new sub networks to the registry // SubnetworkIDRegistry is the subnetwork ID which is used for adding new sub networks to the registry
SubnetworkIDRegistry = subnetworkid.SubnetworkID{2} SubnetworkIDRegistry = subnetworkid.SubnetworkID{2}
// SubnetworkIDUnknown is the subnetwork ID which is used for marking subnetwork ID as unknown in the adress manager
SubnetworkIDUnknown = subnetworkid.SubnetworkID{3}
) )
// scriptFreeList defines a free list of byte slices (up to the maximum number // scriptFreeList defines a free list of byte slices (up to the maximum number