[NOD-1017] Move peers.json to db (#733)

* [NOD-1017] Move peers.json to db

* [NOD-1017] Fix tests

* [NOD-1017] Change comments and rename variables

* [NOD-1017] Separate to smaller functions

* [NOD-1017] Renames

* [NOD-1017] Name newAddrManagerForTest return params

* [NOD-1017] Fix handling of non existing peersState

* [NOD-1017] Add getPeersState rpc command

* [NOD-1017] Fix comment

* [NOD-1017] Split long line

* [NOD-1017] Rename getPeersState->getPeerAddresses

* [NOD-1017] Rename getPeerInfo->getConnectedPeerInfo
This commit is contained in:
Ori Newman 2020-06-18 12:12:49 +03:00 committed by GitHub
parent 1358911d95
commit 7bf8bb5436
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 452 additions and 227 deletions

View File

@ -5,16 +5,16 @@
package addrmgr
import (
"bytes"
"container/list"
crand "crypto/rand" // for seeding
"encoding/binary"
"encoding/json"
"encoding/gob"
"github.com/kaspanet/kaspad/dbaccess"
"github.com/pkg/errors"
"io"
"math/rand"
"net"
"os"
"path/filepath"
"strconv"
"sync"
"sync/atomic"
@ -26,14 +26,13 @@ import (
"github.com/kaspanet/kaspad/wire"
)
type newBucket [newBucketCount]map[string]*KnownAddress
type triedBucket [triedBucketCount]*list.List
type newBucket [NewBucketCount]map[string]*KnownAddress
type triedBucket [TriedBucketCount]*list.List
// AddrManager provides a concurrency safe address manager for caching potential
// peers on the Kaspa network.
type AddrManager struct {
mtx sync.Mutex
peersFile string
lookupFunc func(string) ([]net.IP, error)
rand *rand.Rand
key [32]byte
@ -66,10 +65,12 @@ type serializedKnownAddress struct {
// no refcount or tried, that is available from context.
}
type serializedNewBucket [newBucketCount][]string
type serializedTriedBucket [triedBucketCount][]string
type serializedNewBucket [NewBucketCount][]string
type serializedTriedBucket [TriedBucketCount][]string
type serializedAddrManager struct {
// PeersStateForSerialization is the data model that is used to
// serialize the peers state to any encoding.
type PeersStateForSerialization struct {
Version int
Key [32]byte
Addresses []*serializedKnownAddress
@ -118,17 +119,17 @@ const (
// tried address bucket.
triedBucketSize = 256
// triedBucketCount is the number of buckets we split tried
// TriedBucketCount is the number of buckets we split tried
// addresses over.
triedBucketCount = 64
TriedBucketCount = 64
// newBucketSize is the maximum number of addresses in each new address
// bucket.
newBucketSize = 64
// newBucketCount is the number of buckets that we spread new addresses
// NewBucketCount is the number of buckets that we spread new addresses
// over.
newBucketCount = 1024
NewBucketCount = 1024
// triedBucketsPerGroup is the number of tried buckets over which an
// address group will be spread.
@ -171,8 +172,8 @@ const (
// will share with a call to AddressCache.
getAddrPercent = 23
// serialisationVersion is the current version of the on-disk format.
serialisationVersion = 1
// serializationVersion is the current version of the on-disk format.
serializationVersion = 1
)
// updateAddress is a helper function to either update an address already known
@ -392,7 +393,7 @@ func (a *AddrManager) getNewBucket(netAddr, srcAddr *wire.NetAddress) int {
data2 = append(data2, hashbuf[:]...)
hash2 := daghash.DoubleHashB(data2)
return int(binary.LittleEndian.Uint64(hash2) % newBucketCount)
return int(binary.LittleEndian.Uint64(hash2) % NewBucketCount)
}
func (a *AddrManager) getTriedBucket(netAddr *wire.NetAddress) int {
@ -411,7 +412,7 @@ func (a *AddrManager) getTriedBucket(netAddr *wire.NetAddress) int {
data2 = append(data2, hashbuf[:]...)
hash2 := daghash.DoubleHashB(data2)
return int(binary.LittleEndian.Uint64(hash2) % triedBucketCount)
return int(binary.LittleEndian.Uint64(hash2) % TriedBucketCount)
}
// addressHandler is the main handler for the address manager. It must be run
@ -423,30 +424,62 @@ out:
for {
select {
case <-dumpAddressTicker.C:
a.savePeers()
err := a.savePeers()
if err != nil {
panic(errors.Wrap(err, "error saving peers"))
}
case <-a.quit:
break out
}
}
a.savePeers()
err := a.savePeers()
if err != nil {
panic(errors.Wrap(err, "error saving peers"))
}
a.wg.Done()
log.Trace("Address handler done")
}
// savePeers saves all the known addresses to a file so they can be read back
// savePeers saves all the known addresses to the database so they can be read back
// in at next run.
func (a *AddrManager) savePeers() {
func (a *AddrManager) savePeers() error {
serializedPeersState, err := a.serializePeersState()
if err != nil {
return err
}
return dbaccess.StorePeersState(dbaccess.NoTx(), serializedPeersState)
}
func (a *AddrManager) serializePeersState() ([]byte, error) {
peersState, err := a.PeersStateForSerialization()
if err != nil {
return nil, err
}
w := &bytes.Buffer{}
encoder := gob.NewEncoder(w)
err = encoder.Encode(&peersState)
if err != nil {
return nil, errors.Wrap(err, "failed to encode peers state")
}
return w.Bytes(), nil
}
// PeersStateForSerialization returns the data model that is used to serialize the peers state to any encoding.
func (a *AddrManager) PeersStateForSerialization() (*PeersStateForSerialization, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
// First we make a serialisable datastructure so we can encode it to
// json.
sam := new(serializedAddrManager)
sam.Version = serialisationVersion
copy(sam.Key[:], a.key[:])
// First we make a serializable data structure so we can encode it to
// gob.
peersState := new(PeersStateForSerialization)
peersState.Version = serializationVersion
copy(peersState.Key[:], a.key[:])
sam.Addresses = make([]*serializedKnownAddress, len(a.addrIndex))
peersState.Addresses = make([]*serializedKnownAddress, len(a.addrIndex))
i := 0
for k, v := range a.addrIndex {
ska := new(serializedKnownAddress)
@ -463,119 +496,104 @@ func (a *AddrManager) savePeers() {
ska.LastSuccess = v.lastsuccess.Unix()
// Tried and refs are implicit in the rest of the structure
// and will be worked out from context on unserialisation.
sam.Addresses[i] = ska
peersState.Addresses[i] = ska
i++
}
sam.NewBuckets = make(map[string]*serializedNewBucket)
peersState.NewBuckets = make(map[string]*serializedNewBucket)
for subnetworkID := range a.addrNew {
subnetworkIDStr := subnetworkID.String()
sam.NewBuckets[subnetworkIDStr] = &serializedNewBucket{}
peersState.NewBuckets[subnetworkIDStr] = &serializedNewBucket{}
for i := range a.addrNew[subnetworkID] {
sam.NewBuckets[subnetworkIDStr][i] = make([]string, len(a.addrNew[subnetworkID][i]))
peersState.NewBuckets[subnetworkIDStr][i] = make([]string, len(a.addrNew[subnetworkID][i]))
j := 0
for k := range a.addrNew[subnetworkID][i] {
sam.NewBuckets[subnetworkIDStr][i][j] = k
peersState.NewBuckets[subnetworkIDStr][i][j] = k
j++
}
}
}
for i := range a.addrNewFullNodes {
sam.NewBucketFullNodes[i] = make([]string, len(a.addrNewFullNodes[i]))
peersState.NewBucketFullNodes[i] = make([]string, len(a.addrNewFullNodes[i]))
j := 0
for k := range a.addrNewFullNodes[i] {
sam.NewBucketFullNodes[i][j] = k
peersState.NewBucketFullNodes[i][j] = k
j++
}
}
sam.TriedBuckets = make(map[string]*serializedTriedBucket)
peersState.TriedBuckets = make(map[string]*serializedTriedBucket)
for subnetworkID := range a.addrTried {
subnetworkIDStr := subnetworkID.String()
sam.TriedBuckets[subnetworkIDStr] = &serializedTriedBucket{}
peersState.TriedBuckets[subnetworkIDStr] = &serializedTriedBucket{}
for i := range a.addrTried[subnetworkID] {
sam.TriedBuckets[subnetworkIDStr][i] = make([]string, a.addrTried[subnetworkID][i].Len())
peersState.TriedBuckets[subnetworkIDStr][i] = make([]string, a.addrTried[subnetworkID][i].Len())
j := 0
for e := a.addrTried[subnetworkID][i].Front(); e != nil; e = e.Next() {
ka := e.Value.(*KnownAddress)
sam.TriedBuckets[subnetworkIDStr][i][j] = NetAddressKey(ka.na)
peersState.TriedBuckets[subnetworkIDStr][i][j] = NetAddressKey(ka.na)
j++
}
}
}
for i := range a.addrTriedFullNodes {
sam.TriedBucketFullNodes[i] = make([]string, a.addrTriedFullNodes[i].Len())
peersState.TriedBucketFullNodes[i] = make([]string, a.addrTriedFullNodes[i].Len())
j := 0
for e := a.addrTriedFullNodes[i].Front(); e != nil; e = e.Next() {
ka := e.Value.(*KnownAddress)
sam.TriedBucketFullNodes[i][j] = NetAddressKey(ka.na)
peersState.TriedBucketFullNodes[i][j] = NetAddressKey(ka.na)
j++
}
}
w, err := os.Create(a.peersFile)
if err != nil {
log.Errorf("Error opening file %s: %s", a.peersFile, err)
return
}
enc := json.NewEncoder(w)
defer w.Close()
if err := enc.Encode(&sam); err != nil {
log.Errorf("Failed to encode file %s: %s", a.peersFile, err)
return
}
return peersState, nil
}
// loadPeers loads the known address from the saved file. If empty, missing, or
// malformed file, just don't load anything and start fresh
func (a *AddrManager) loadPeers() {
// loadPeers loads the known address from the database. If missing,
// just don't load anything and start fresh.
func (a *AddrManager) loadPeers() error {
a.mtx.Lock()
defer a.mtx.Unlock()
err := a.deserializePeers(a.peersFile)
if err != nil {
log.Errorf("Failed to parse file %s: %s", a.peersFile, err)
// if it is invalid we nuke the old one unconditionally.
err = os.Remove(a.peersFile)
if err != nil {
log.Warnf("Failed to remove corrupt peers file %s: %s",
a.peersFile, err)
}
serializedPeerState, err := dbaccess.FetchPeersState(dbaccess.NoTx())
if dbaccess.IsNotFoundError(err) {
a.reset()
return
}
log.Infof("Loaded %d addresses from file '%s'", a.totalNumAddresses(), a.peersFile)
}
func (a *AddrManager) deserializePeers(filePath string) error {
_, err := os.Stat(filePath)
if os.IsNotExist(err) {
log.Info("No peers state was found in the database. Created a new one", a.totalNumAddresses())
return nil
}
r, err := os.Open(filePath)
if err != nil {
return errors.Errorf("%s error opening file: %s", filePath, err)
}
defer r.Close()
var sam serializedAddrManager
dec := json.NewDecoder(r)
err = dec.Decode(&sam)
if err != nil {
return errors.Errorf("error reading %s: %s", filePath, err)
return err
}
if sam.Version != serialisationVersion {
err = a.deserializePeersState(serializedPeerState)
if err != nil {
return err
}
log.Infof("Loaded %d addresses from database", a.totalNumAddresses())
return nil
}
func (a *AddrManager) deserializePeersState(serializedPeerState []byte) error {
var peersState PeersStateForSerialization
r := bytes.NewBuffer(serializedPeerState)
dec := gob.NewDecoder(r)
err := dec.Decode(&peersState)
if err != nil {
return errors.Wrap(err, "error deserializing peers state")
}
if peersState.Version != serializationVersion {
return errors.Errorf("unknown version %d in serialized "+
"addrmanager", sam.Version)
"peers state", peersState.Version)
}
copy(a.key[:], sam.Key[:])
copy(a.key[:], peersState.Key[:])
for _, v := range sam.Addresses {
for _, v := range peersState.Addresses {
ka := new(KnownAddress)
ka.na, err = a.DeserializeNetAddress(v.Addr)
if err != nil {
@ -600,12 +618,12 @@ func (a *AddrManager) deserializePeers(filePath string) error {
a.addrIndex[NetAddressKey(ka.na)] = ka
}
for subnetworkIDStr := range sam.NewBuckets {
for subnetworkIDStr := range peersState.NewBuckets {
subnetworkID, err := subnetworkid.NewFromStr(subnetworkIDStr)
if err != nil {
return err
}
for i, subnetworkNewBucket := range sam.NewBuckets[subnetworkIDStr] {
for i, subnetworkNewBucket := range peersState.NewBuckets[subnetworkIDStr] {
for _, val := range subnetworkNewBucket {
ka, ok := a.addrIndex[val]
if !ok {
@ -622,7 +640,7 @@ func (a *AddrManager) deserializePeers(filePath string) error {
}
}
for i, newBucket := range sam.NewBucketFullNodes {
for i, newBucket := range peersState.NewBucketFullNodes {
for _, val := range newBucket {
ka, ok := a.addrIndex[val]
if !ok {
@ -638,12 +656,12 @@ func (a *AddrManager) deserializePeers(filePath string) error {
}
}
for subnetworkIDStr := range sam.TriedBuckets {
for subnetworkIDStr := range peersState.TriedBuckets {
subnetworkID, err := subnetworkid.NewFromStr(subnetworkIDStr)
if err != nil {
return err
}
for i, subnetworkTriedBucket := range sam.TriedBuckets[subnetworkIDStr] {
for i, subnetworkTriedBucket := range peersState.TriedBuckets[subnetworkIDStr] {
for _, val := range subnetworkTriedBucket {
ka, ok := a.addrIndex[val]
if !ok {
@ -658,7 +676,7 @@ func (a *AddrManager) deserializePeers(filePath string) error {
}
}
for i, triedBucket := range sam.TriedBucketFullNodes {
for i, triedBucket := range peersState.TriedBucketFullNodes {
for _, val := range triedBucket {
ka, ok := a.addrIndex[val]
if !ok {
@ -704,20 +722,24 @@ func (a *AddrManager) DeserializeNetAddress(addr string) (*wire.NetAddress, erro
// Start begins the core address handler which manages a pool of known
// addresses, timeouts, and interval based writes.
func (a *AddrManager) Start() {
func (a *AddrManager) Start() error {
// Already started?
if atomic.AddInt32(&a.started, 1) != 1 {
return
return nil
}
log.Trace("Starting address manager")
// Load peers we already know about from file.
a.loadPeers()
// Load peers we already know about from the database.
err := a.loadPeers()
if err != nil {
return err
}
// Start the address ticker to save addresses periodically.
a.wg.Add(1)
spawn(a.addressHandler)
return nil
}
// Stop gracefully shuts down the address manager by stopping the main handler.
@ -1333,9 +1355,8 @@ func (a *AddrManager) GetBestLocalAddress(remoteAddr *wire.NetAddress) *wire.Net
// New returns a new Kaspa address manager.
// Use Start to begin processing asynchronous address updates.
func New(dataDir string, lookupFunc func(string) ([]net.IP, error), subnetworkID *subnetworkid.SubnetworkID) *AddrManager {
func New(lookupFunc func(string) ([]net.IP, error), subnetworkID *subnetworkid.SubnetworkID) *AddrManager {
am := AddrManager{
peersFile: filepath.Join(dataDir, "peers.json"),
lookupFunc: lookupFunc,
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
quit: make(chan struct{}),

View File

@ -8,7 +8,9 @@ import (
"fmt"
"github.com/kaspanet/kaspad/config"
"github.com/kaspanet/kaspad/dagconfig"
"github.com/kaspanet/kaspad/dbaccess"
"github.com/kaspanet/kaspad/util/subnetworkid"
"io/ioutil"
"net"
"reflect"
"testing"
@ -101,14 +103,41 @@ func addNaTest(ip string, port uint16, want string) {
naTests = append(naTests, test)
}
func lookupFunc(host string) ([]net.IP, error) {
func lookupFuncForTest(host string) ([]net.IP, error) {
return nil, errors.New("not implemented")
}
func newAddrManagerForTest(t *testing.T, testName string,
localSubnetworkID *subnetworkid.SubnetworkID) (addressManager *AddrManager, teardown func()) {
dbPath, err := ioutil.TempDir("", testName)
if err != nil {
t.Fatalf("Error creating temporary directory: %s", err)
}
err = dbaccess.Open(dbPath)
if err != nil {
t.Fatalf("error creating db: %s", err)
}
addressManager = New(lookupFuncForTest, localSubnetworkID)
return addressManager, func() {
err := dbaccess.Close()
if err != nil {
t.Fatalf("error closing the database: %s", err)
}
}
}
func TestStartStop(t *testing.T) {
n := New("teststartstop", lookupFunc, nil)
n.Start()
err := n.Stop()
amgr, teardown := newAddrManagerForTest(t, "TestStartStop", nil)
defer teardown()
err := amgr.Start()
if err != nil {
t.Fatalf("Address Manager failed to start: %v", err)
}
err = amgr.Stop()
if err != nil {
t.Fatalf("Address Manager failed to stop: %v", err)
}
@ -148,7 +177,8 @@ func TestAddAddressByIP(t *testing.T) {
},
}
amgr := New("testaddressbyip", nil, nil)
amgr, teardown := newAddrManagerForTest(t, "TestAddAddressByIP", nil)
defer teardown()
for i, test := range tests {
err := amgr.AddAddressByIP(test.addrIP, nil)
if test.err != nil && err == nil {
@ -213,7 +243,8 @@ func TestAddLocalAddress(t *testing.T) {
true,
},
}
amgr := New("testaddlocaladdress", nil, nil)
amgr, teardown := newAddrManagerForTest(t, "TestAddLocalAddress", nil)
defer teardown()
for x, test := range tests {
result := amgr.AddLocalAddress(&test.address, test.priority)
if result == nil && !test.valid {
@ -239,21 +270,22 @@ func TestAttempt(t *testing.T) {
})
defer config.SetActiveConfig(originalActiveCfg)
n := New("testattempt", lookupFunc, nil)
amgr, teardown := newAddrManagerForTest(t, "TestAttempt", nil)
defer teardown()
// Add a new address and get it
err := n.AddAddressByIP(someIP+":8333", nil)
err := amgr.AddAddressByIP(someIP+":8333", nil)
if err != nil {
t.Fatalf("Adding address failed: %v", err)
}
ka := n.GetAddress()
ka := amgr.GetAddress()
if !ka.LastAttempt().IsZero() {
t.Errorf("Address should not have attempts, but does")
}
na := ka.NetAddress()
n.Attempt(na)
amgr.Attempt(na)
if ka.LastAttempt().IsZero() {
t.Errorf("Address should have an attempt, but does not")
@ -270,19 +302,20 @@ func TestConnected(t *testing.T) {
})
defer config.SetActiveConfig(originalActiveCfg)
n := New("testconnected", lookupFunc, nil)
amgr, teardown := newAddrManagerForTest(t, "TestConnected", nil)
defer teardown()
// Add a new address and get it
err := n.AddAddressByIP(someIP+":8333", nil)
err := amgr.AddAddressByIP(someIP+":8333", nil)
if err != nil {
t.Fatalf("Adding address failed: %v", err)
}
ka := n.GetAddress()
ka := amgr.GetAddress()
na := ka.NetAddress()
// make it an hour ago
na.Timestamp = time.Unix(time.Now().Add(time.Hour*-1).Unix(), 0)
n.Connected(na)
amgr.Connected(na)
if !ka.NetAddress().Timestamp.After(na.Timestamp) {
t.Errorf("Address should have a new timestamp, but does not")
@ -299,9 +332,10 @@ func TestNeedMoreAddresses(t *testing.T) {
})
defer config.SetActiveConfig(originalActiveCfg)
n := New("testneedmoreaddresses", lookupFunc, nil)
amgr, teardown := newAddrManagerForTest(t, "TestNeedMoreAddresses", nil)
defer teardown()
addrsToAdd := 1500
b := n.NeedMoreAddresses()
b := amgr.NeedMoreAddresses()
if !b {
t.Errorf("Expected that we need more addresses")
}
@ -310,7 +344,7 @@ func TestNeedMoreAddresses(t *testing.T) {
var err error
for i := 0; i < addrsToAdd; i++ {
s := fmt.Sprintf("%d.%d.173.147:8333", i/128+60, i%128+60)
addrs[i], err = n.DeserializeNetAddress(s)
addrs[i], err = amgr.DeserializeNetAddress(s)
if err != nil {
t.Errorf("Failed to turn %s into an address: %v", s, err)
}
@ -318,13 +352,13 @@ func TestNeedMoreAddresses(t *testing.T) {
srcAddr := wire.NewNetAddressIPPort(net.IPv4(173, 144, 173, 111), 8333, 0)
n.AddAddresses(addrs, srcAddr, nil)
numAddrs := n.TotalNumAddresses()
amgr.AddAddresses(addrs, srcAddr, nil)
numAddrs := amgr.TotalNumAddresses()
if numAddrs > addrsToAdd {
t.Errorf("Number of addresses is too many %d vs %d", numAddrs, addrsToAdd)
}
b = n.NeedMoreAddresses()
b = amgr.NeedMoreAddresses()
if b {
t.Errorf("Expected that we don't need more addresses")
}
@ -340,7 +374,8 @@ func TestGood(t *testing.T) {
})
defer config.SetActiveConfig(originalActiveCfg)
n := New("testgood", lookupFunc, nil)
amgr, teardown := newAddrManagerForTest(t, "TestGood", nil)
defer teardown()
addrsToAdd := 64 * 64
addrs := make([]*wire.NetAddress, addrsToAdd)
subnetworkCount := 32
@ -349,7 +384,7 @@ func TestGood(t *testing.T) {
var err error
for i := 0; i < addrsToAdd; i++ {
s := fmt.Sprintf("%d.173.147.%d:8333", i/64+60, i%64+60)
addrs[i], err = n.DeserializeNetAddress(s)
addrs[i], err = amgr.DeserializeNetAddress(s)
if err != nil {
t.Errorf("Failed to turn %s into an address: %v", s, err)
}
@ -361,24 +396,24 @@ func TestGood(t *testing.T) {
srcAddr := wire.NewNetAddressIPPort(net.IPv4(173, 144, 173, 111), 8333, 0)
n.AddAddresses(addrs, srcAddr, nil)
amgr.AddAddresses(addrs, srcAddr, nil)
for i, addr := range addrs {
n.Good(addr, subnetworkIDs[i%subnetworkCount])
amgr.Good(addr, subnetworkIDs[i%subnetworkCount])
}
numAddrs := n.TotalNumAddresses()
numAddrs := amgr.TotalNumAddresses()
if numAddrs >= addrsToAdd {
t.Errorf("Number of addresses is too many: %d vs %d", numAddrs, addrsToAdd)
}
numCache := len(n.AddressCache(true, nil))
numCache := len(amgr.AddressCache(true, nil))
if numCache == 0 || numCache >= numAddrs/4 {
t.Errorf("Number of addresses in cache: got %d, want positive and less than %d",
numCache, numAddrs/4)
}
for i := 0; i < subnetworkCount; i++ {
numCache = len(n.AddressCache(false, subnetworkIDs[i]))
numCache = len(amgr.AddressCache(false, subnetworkIDs[i]))
if numCache == 0 || numCache >= numAddrs/subnetworkCount {
t.Errorf("Number of addresses in subnetwork cache: got %d, want positive and less than %d",
numCache, numAddrs/4/subnetworkCount)
@ -396,17 +431,18 @@ func TestGoodChangeSubnetworkID(t *testing.T) {
})
defer config.SetActiveConfig(originalActiveCfg)
n := New("test_good_change_subnetwork_id", lookupFunc, nil)
amgr, teardown := newAddrManagerForTest(t, "TestGoodChangeSubnetworkID", nil)
defer teardown()
addr := wire.NewNetAddressIPPort(net.IPv4(173, 144, 173, 111), 8333, 0)
addrKey := NetAddressKey(addr)
srcAddr := wire.NewNetAddressIPPort(net.IPv4(173, 144, 173, 111), 8333, 0)
oldSubnetwork := subnetworkid.SubnetworkIDNative
n.AddAddress(addr, srcAddr, oldSubnetwork)
n.Good(addr, oldSubnetwork)
amgr.AddAddress(addr, srcAddr, oldSubnetwork)
amgr.Good(addr, oldSubnetwork)
// make sure address was saved to addrIndex under oldSubnetwork
ka := n.find(addr)
ka := amgr.find(addr)
if ka == nil {
t.Fatalf("Address was not found after first time .Good called")
}
@ -415,7 +451,7 @@ func TestGoodChangeSubnetworkID(t *testing.T) {
}
// make sure address was added to correct bucket under oldSubnetwork
bucket := n.addrTried[*oldSubnetwork][n.getTriedBucket(addr)]
bucket := amgr.addrTried[*oldSubnetwork][amgr.getTriedBucket(addr)]
wasFound := false
for e := bucket.Front(); e != nil; e = e.Next() {
if NetAddressKey(e.Value.(*KnownAddress).NetAddress()) == addrKey {
@ -428,10 +464,10 @@ func TestGoodChangeSubnetworkID(t *testing.T) {
// now call .Good again with a different subnetwork
newSubnetwork := subnetworkid.SubnetworkIDRegistry
n.Good(addr, newSubnetwork)
amgr.Good(addr, newSubnetwork)
// make sure address was updated in addrIndex under newSubnetwork
ka = n.find(addr)
ka = amgr.find(addr)
if ka == nil {
t.Fatalf("Address was not found after second time .Good called")
}
@ -440,7 +476,7 @@ func TestGoodChangeSubnetworkID(t *testing.T) {
}
// make sure address was removed from bucket under oldSubnetwork
bucket = n.addrTried[*oldSubnetwork][n.getTriedBucket(addr)]
bucket = amgr.addrTried[*oldSubnetwork][amgr.getTriedBucket(addr)]
wasFound = false
for e := bucket.Front(); e != nil; e = e.Next() {
if NetAddressKey(e.Value.(*KnownAddress).NetAddress()) == addrKey {
@ -452,7 +488,7 @@ func TestGoodChangeSubnetworkID(t *testing.T) {
}
// make sure address was added to correct bucket under newSubnetwork
bucket = n.addrTried[*newSubnetwork][n.getTriedBucket(addr)]
bucket = amgr.addrTried[*newSubnetwork][amgr.getTriedBucket(addr)]
wasFound = false
for e := bucket.Front(); e != nil; e = e.Next() {
if NetAddressKey(e.Value.(*KnownAddress).NetAddress()) == addrKey {
@ -475,34 +511,35 @@ func TestGetAddress(t *testing.T) {
defer config.SetActiveConfig(originalActiveCfg)
localSubnetworkID := &subnetworkid.SubnetworkID{0xff}
n := New("testgetaddress", lookupFunc, localSubnetworkID)
amgr, teardown := newAddrManagerForTest(t, "TestGetAddress", localSubnetworkID)
defer teardown()
// Get an address from an empty set (should error)
if rv := n.GetAddress(); rv != nil {
if rv := amgr.GetAddress(); rv != nil {
t.Errorf("GetAddress failed: got: %v want: %v\n", rv, nil)
}
// Add a new address and get it
err := n.AddAddressByIP(someIP+":8332", localSubnetworkID)
err := amgr.AddAddressByIP(someIP+":8332", localSubnetworkID)
if err != nil {
t.Fatalf("Adding address failed: %v", err)
}
ka := n.GetAddress()
ka := amgr.GetAddress()
if ka == nil {
t.Fatalf("Did not get an address where there is one in the pool")
}
n.Attempt(ka.NetAddress())
amgr.Attempt(ka.NetAddress())
// Checks that we don't get it if we find that it has other subnetwork ID than expected.
actualSubnetworkID := &subnetworkid.SubnetworkID{0xfe}
n.Good(ka.NetAddress(), actualSubnetworkID)
ka = n.GetAddress()
amgr.Good(ka.NetAddress(), actualSubnetworkID)
ka = amgr.GetAddress()
if ka != nil {
t.Errorf("Didn't expect to get an address because there shouldn't be any address from subnetwork ID %s or nil", localSubnetworkID)
}
// Checks that the total number of addresses incremented although the new address is not full node or a partial node of the same subnetwork as the local node.
numAddrs := n.TotalNumAddresses()
numAddrs := amgr.TotalNumAddresses()
if numAddrs != 1 {
t.Errorf("Wrong number of addresses: got %d, want %d", numAddrs, 1)
}
@ -510,11 +547,11 @@ func TestGetAddress(t *testing.T) {
// Now we repeat the same process, but now the address has the expected subnetwork ID.
// Add a new address and get it
err = n.AddAddressByIP(someIP+":8333", localSubnetworkID)
err = amgr.AddAddressByIP(someIP+":8333", localSubnetworkID)
if err != nil {
t.Fatalf("Adding address failed: %v", err)
}
ka = n.GetAddress()
ka = amgr.GetAddress()
if ka == nil {
t.Fatalf("Did not get an address where there is one in the pool")
}
@ -524,11 +561,11 @@ func TestGetAddress(t *testing.T) {
if !ka.SubnetworkID().IsEqual(localSubnetworkID) {
t.Errorf("Wrong Subnetwork ID: got %v, want %v", *ka.SubnetworkID(), localSubnetworkID)
}
n.Attempt(ka.NetAddress())
amgr.Attempt(ka.NetAddress())
// Mark this as a good address and get it
n.Good(ka.NetAddress(), localSubnetworkID)
ka = n.GetAddress()
amgr.Good(ka.NetAddress(), localSubnetworkID)
ka = amgr.GetAddress()
if ka == nil {
t.Fatalf("Did not get an address where there is one in the pool")
}
@ -539,7 +576,7 @@ func TestGetAddress(t *testing.T) {
t.Errorf("Wrong Subnetwork ID: got %v, want %v", ka.SubnetworkID(), localSubnetworkID)
}
numAddrs = n.TotalNumAddresses()
numAddrs = amgr.TotalNumAddresses()
if numAddrs != 2 {
t.Errorf("Wrong number of addresses: got %d, want %d", numAddrs, 1)
}
@ -604,7 +641,8 @@ func TestGetBestLocalAddress(t *testing.T) {
*/
}
amgr := New("testgetbestlocaladdress", nil, nil)
amgr, teardown := newAddrManagerForTest(t, "TestGetBestLocalAddress", nil)
defer teardown()
// Test against default when there's no address
for x, test := range tests {

View File

@ -9,11 +9,11 @@ import (
"github.com/kaspanet/kaspad/addrmgr"
"github.com/kaspanet/kaspad/config"
"github.com/kaspanet/kaspad/dagconfig"
"github.com/kaspanet/kaspad/dbaccess"
"github.com/pkg/errors"
"io"
"io/ioutil"
"net"
"os"
"sync/atomic"
"testing"
"time"
@ -192,19 +192,26 @@ func addressManagerForTest(t *testing.T, testName string, numAddresses uint8) (*
}
func createEmptyAddressManagerForTest(t *testing.T, testName string) (*addrmgr.AddrManager, func()) {
path, err := ioutil.TempDir("", fmt.Sprintf("%s-addressmanager", testName))
path, err := ioutil.TempDir("", fmt.Sprintf("%s-database", testName))
if err != nil {
t.Fatalf("createEmptyAddressManagerForTest: TempDir unexpectedly "+
"failed: %s", err)
}
return addrmgr.New(path, nil, nil), func() {
// Wait for the connection manager to finish
err = dbaccess.Open(path)
if err != nil {
t.Fatalf("error creating db: %s", err)
}
return addrmgr.New(nil, nil), func() {
// Wait for the connection manager to finish, so it'll
// have access to the address manager as long as it's
// alive.
time.Sleep(10 * time.Millisecond)
err := os.RemoveAll(path)
err := dbaccess.Close()
if err != nil {
t.Fatalf("couldn't remove path %s", path)
t.Fatalf("error closing the database: %s", err)
}
}
}

26
dbaccess/peers.go Normal file
View File

@ -0,0 +1,26 @@
package dbaccess
import "github.com/kaspanet/kaspad/database"
var (
peersKey = database.MakeBucket().Key([]byte("peers"))
)
// StorePeersState stores the peers state in the database.
func StorePeersState(context Context, peersState []byte) error {
accessor, err := context.accessor()
if err != nil {
return err
}
return accessor.Put(peersKey, peersState)
}
// FetchPeersState retrieves the peers state from the database.
// Returns ErrNotFound if the state is missing from the database.
func FetchPeersState(context Context) ([]byte, error) {
accessor, err := context.accessor()
if err != nil {
return nil, err
}
return accessor.Get(peersKey)
}

View File

@ -186,26 +186,26 @@ func (c *Client) PingAsync() FuturePingResult {
// Ping queues a ping to be sent to each connected peer.
//
// Use the GetPeerInfo function and examine the PingTime and PingWait fields to
// Use the GetConnectedPeerInfo function and examine the PingTime and PingWait fields to
// access the ping times.
func (c *Client) Ping() error {
return c.PingAsync().Receive()
}
// FutureGetPeerInfoResult is a future promise to deliver the result of a
// GetPeerInfoAsync RPC invocation (or an applicable error).
type FutureGetPeerInfoResult chan *response
// FutureGetConnectedPeerInfo is a future promise to deliver the result of a
// GetConnectedPeerInfoAsync RPC invocation (or an applicable error).
type FutureGetConnectedPeerInfo chan *response
// Receive waits for the response promised by the future and returns data about
// each connected network peer.
func (r FutureGetPeerInfoResult) Receive() ([]rpcmodel.GetPeerInfoResult, error) {
func (r FutureGetConnectedPeerInfo) Receive() ([]rpcmodel.GetConnectedPeerInfoResult, error) {
res, err := receiveFuture(r)
if err != nil {
return nil, err
}
// Unmarshal result as an array of getpeerinfo result objects.
var peerInfo []rpcmodel.GetPeerInfoResult
// Unmarshal result as an array of getConnectedPeerInfo result objects.
var peerInfo []rpcmodel.GetConnectedPeerInfoResult
err = json.Unmarshal(res, &peerInfo)
if err != nil {
return nil, err
@ -214,19 +214,19 @@ func (r FutureGetPeerInfoResult) Receive() ([]rpcmodel.GetPeerInfoResult, error)
return peerInfo, nil
}
// GetPeerInfoAsync returns an instance of a type that can be used to get the
// GetConnectedPeerInfoAsync returns an instance of a type that can be used to get the
// result of the RPC at some future time by invoking the Receive function on the
// returned instance.
//
// See GetPeerInfo for the blocking version and more details.
func (c *Client) GetPeerInfoAsync() FutureGetPeerInfoResult {
cmd := rpcmodel.NewGetPeerInfoCmd()
// See GetConnectedPeerInfo for the blocking version and more details.
func (c *Client) GetConnectedPeerInfoAsync() FutureGetConnectedPeerInfo {
cmd := rpcmodel.NewGetConnectedPeerInfoCmd()
return c.sendCmd(cmd)
}
// GetPeerInfo returns data about each connected network peer.
func (c *Client) GetPeerInfo() ([]rpcmodel.GetPeerInfoResult, error) {
return c.GetPeerInfoAsync().Receive()
// GetConnectedPeerInfo returns data about each connected network peer.
func (c *Client) GetConnectedPeerInfo() ([]rpcmodel.GetConnectedPeerInfoResult, error) {
return c.GetConnectedPeerInfoAsync().Receive()
}
// FutureGetNetTotalsResult is a future promise to deliver the result of a

View File

@ -382,13 +382,13 @@ func NewGetNetTotalsCmd() *GetNetTotalsCmd {
return &GetNetTotalsCmd{}
}
// GetPeerInfoCmd defines the getPeerInfo JSON-RPC command.
type GetPeerInfoCmd struct{}
// GetConnectedPeerInfoCmd defines the getConnectedPeerInfo JSON-RPC command.
type GetConnectedPeerInfoCmd struct{}
// NewGetPeerInfoCmd returns a new instance which can be used to issue a getpeer
// NewGetConnectedPeerInfoCmd returns a new instance which can be used to issue a getpeer
// JSON-RPC command.
func NewGetPeerInfoCmd() *GetPeerInfoCmd {
return &GetPeerInfoCmd{}
func NewGetConnectedPeerInfoCmd() *GetConnectedPeerInfoCmd {
return &GetConnectedPeerInfoCmd{}
}
// GetRawMempoolCmd defines the getmempool JSON-RPC command.
@ -655,6 +655,14 @@ type VersionCmd struct{}
// version command.
func NewVersionCmd() *VersionCmd { return new(VersionCmd) }
// GetPeerAddressesCmd defines the getPeerAddresses JSON-RPC command.
type GetPeerAddressesCmd struct {
}
// NewGetPeerAddressesCmd returns a new instance which can be used to issue a JSON-RPC
// getPeerAddresses command.
func NewGetPeerAddressesCmd() *GetPeerAddressesCmd { return new(GetPeerAddressesCmd) }
func init() {
// No special flags for commands in this file.
flags := UsageFlag(0)
@ -681,7 +689,8 @@ func init() {
MustRegisterCommand("getMempoolInfo", (*GetMempoolInfoCmd)(nil), flags)
MustRegisterCommand("getNetworkInfo", (*GetNetworkInfoCmd)(nil), flags)
MustRegisterCommand("getNetTotals", (*GetNetTotalsCmd)(nil), flags)
MustRegisterCommand("getPeerInfo", (*GetPeerInfoCmd)(nil), flags)
MustRegisterCommand("getConnectedPeerInfo", (*GetConnectedPeerInfoCmd)(nil), flags)
MustRegisterCommand("getPeerAddresses", (*GetPeerAddressesCmd)(nil), flags)
MustRegisterCommand("getRawMempool", (*GetRawMempoolCmd)(nil), flags)
MustRegisterCommand("getSubnetwork", (*GetSubnetworkCmd)(nil), flags)
MustRegisterCommand("getTxOut", (*GetTxOutCmd)(nil), flags)

View File

@ -444,15 +444,15 @@ func TestRPCServerCommands(t *testing.T) {
unmarshalled: &rpcmodel.GetNetTotalsCmd{},
},
{
name: "getPeerInfo",
name: "getConnectedPeerInfo",
newCmd: func() (interface{}, error) {
return rpcmodel.NewCommand("getPeerInfo")
return rpcmodel.NewCommand("getConnectedPeerInfo")
},
staticCmd: func() interface{} {
return rpcmodel.NewGetPeerInfoCmd()
return rpcmodel.NewGetConnectedPeerInfoCmd()
},
marshalled: `{"jsonrpc":"1.0","method":"getPeerInfo","params":[],"id":1}`,
unmarshalled: &rpcmodel.GetPeerInfoCmd{},
marshalled: `{"jsonrpc":"1.0","method":"getConnectedPeerInfo","params":[],"id":1}`,
unmarshalled: &rpcmodel.GetConnectedPeerInfoCmd{},
},
{
name: "getRawMempool",

View File

@ -4,7 +4,10 @@
package rpcmodel
import "encoding/json"
import (
"encoding/json"
"github.com/kaspanet/kaspad/addrmgr"
)
// GetBlockHeaderVerboseResult models the data from the getblockheader command when
// the verbose flag is set. When the verbose flag is not set, getblockheader
@ -213,8 +216,8 @@ type GetNetworkInfoResult struct {
Warnings string `json:"warnings"`
}
// GetPeerInfoResult models the data returned from the getpeerinfo command.
type GetPeerInfoResult struct {
// GetConnectedPeerInfoResult models the data returned from the getConnectedPeerInfo command.
type GetConnectedPeerInfoResult struct {
ID int32 `json:"id"`
Addr string `json:"addr"`
Services string `json:"services"`
@ -236,6 +239,34 @@ type GetPeerInfoResult struct {
SyncNode bool `json:"syncNode"`
}
// GetPeerAddressesResult models the data returned from the getPeerAddresses command.
type GetPeerAddressesResult struct {
Version int
Key [32]byte
Addresses []*GetPeerAddressesKnownAddressResult
NewBuckets map[string]*GetPeerAddressesNewBucketResult // string is Subnetwork ID
NewBucketFullNodes GetPeerAddressesNewBucketResult
TriedBuckets map[string]*GetPeerAddressesTriedBucketResult // string is Subnetwork ID
TriedBucketFullNodes GetPeerAddressesTriedBucketResult
}
// GetPeerAddressesKnownAddressResult models a GetPeerAddressesResult known address.
type GetPeerAddressesKnownAddressResult struct {
Addr string
Src string
SubnetworkID string
Attempts int
TimeStamp int64
LastAttempt int64
LastSuccess int64
}
// GetPeerAddressesNewBucketResult models a GetPeerAddressesResult new bucket.
type GetPeerAddressesNewBucketResult [addrmgr.NewBucketCount][]string
// GetPeerAddressesTriedBucketResult models a GetPeerAddressesResult tried bucket.
type GetPeerAddressesTriedBucketResult [addrmgr.TriedBucketCount][]string
// GetRawMempoolVerboseResult models the data returned from the getrawmempool
// command when the verbose flag is set. When the verbose flag is not set,
// getrawmempool returns an array of transaction hashes.

View File

@ -59,5 +59,5 @@ func (sp *Peer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) {
// Add addresses to server address manager. The address manager handles
// the details of things such as preventing duplicate addresses, max
// addresses, and last seen updates.
sp.server.addrManager.AddAddresses(msg.AddrList, sp.NA(), msg.SubnetworkID)
sp.server.AddrManager.AddAddresses(msg.AddrList, sp.NA(), msg.SubnetworkID)
}

View File

@ -34,7 +34,7 @@ func (sp *Peer) OnGetAddr(_ *peer.Peer, msg *wire.MsgGetAddr) {
sp.sentAddrs = true
// Get the current known addresses from the address manager.
addrCache := sp.server.addrManager.AddressCache(msg.IncludeAllSubnetworks, msg.SubnetworkID)
addrCache := sp.server.AddrManager.AddressCache(msg.IncludeAllSubnetworks, msg.SubnetworkID)
// Push the addresses.
sp.pushAddrMsg(addrCache, sp.SubnetworkID())

View File

@ -21,7 +21,7 @@ func (sp *Peer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) {
// to specified peers and actively avoids advertising and connecting to
// discovered peers.
if !config.ActiveConfig().Simnet {
addrManager := sp.server.addrManager
addrManager := sp.server.AddrManager
// Outbound connections.
if !sp.Inbound() {

View File

@ -214,7 +214,7 @@ type Server struct {
shutdownSched int32
DAGParams *dagconfig.Params
addrManager *addrmgr.AddrManager
AddrManager *addrmgr.AddrManager
connManager *connmgr.ConnManager
SigCache *txscript.SigCache
SyncManager *netsync.SyncManager
@ -669,7 +669,7 @@ func (s *Server) handleDonePeerMsg(state *peerState, sp *Peer) {
// Update the address' last seen time if the peer has acknowledged
// our version and has sent us its version as well.
if sp.VerAckReceived() && sp.VersionKnown() && sp.NA() != nil {
s.addrManager.Connected(sp.NA())
s.AddrManager.Connected(sp.NA())
}
// If we get here it means that either we didn't know about the peer
@ -940,7 +940,7 @@ func newPeerConfig(sp *Peer) *peer.Config {
SelectedTipHash: sp.selectedTipHash,
IsInDAG: sp.blockExists,
AddBanScore: sp.addBanScore,
HostToNetAddress: sp.server.addrManager.HostToNetAddress,
HostToNetAddress: sp.server.AddrManager.HostToNetAddress,
Proxy: config.ActiveConfig().Proxy,
UserAgentName: userAgentName,
UserAgentVersion: userAgentVersion,
@ -981,7 +981,7 @@ func (s *Server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
s.peerConnected(sp, conn)
s.addrManager.Attempt(sp.NA())
s.AddrManager.Attempt(sp.NA())
}
func (s *Server) peerConnected(sp *Peer, conn net.Conn) {
@ -1025,7 +1025,7 @@ func (s *Server) outboundPeerConnectionFailed(connReq *connmgr.ConnReq) {
// take nil for it.
netAddress := wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), defaultServices)
s.addrManager.Attempt(netAddress)
s.AddrManager.Attempt(netAddress)
}
// peerDoneHandler handles peer disconnects by notifiying the server that it's
@ -1058,7 +1058,10 @@ func (s *Server) peerHandler() {
// to this handler and rather than adding more channels to sychronize
// things, it's easier and slightly faster to simply start and stop them
// in this handler.
s.addrManager.Start()
err := s.AddrManager.Start()
if err != nil {
panic(errors.Wrap(err, "address manager failed to start"))
}
s.SyncManager.Start()
s.quitWaitGroup.Add(1)
@ -1079,7 +1082,7 @@ func (s *Server) peerHandler() {
// Kaspad uses a lookup of the dns seeder here. Since seeder returns
// IPs of nodes and not its own IP, we can not know real IP of
// source. So we'll take first returned address as source.
s.addrManager.AddAddresses(addrs, addrs[0], subnetworkID)
s.AddrManager.AddAddresses(addrs, addrs[0], subnetworkID)
})
}
@ -1138,7 +1141,7 @@ out:
s.connManager.Stop()
s.SyncManager.Stop()
s.addrManager.Stop()
s.AddrManager.Stop()
// Drain channels before exiting so nothing is left waiting around
// to send.
@ -1431,7 +1434,7 @@ out:
}
na := wire.NewNetAddressIPPort(externalip, uint16(listenPort),
s.services)
err = s.addrManager.AddLocalAddress(na, addrmgr.UpnpPrio)
err = s.AddrManager.AddLocalAddress(na, addrmgr.UpnpPrio)
if err != nil {
// XXX DeletePortMapping?
}
@ -1465,13 +1468,13 @@ func NewServer(listenAddrs []string, dagParams *dagconfig.Params, interrupt <-ch
services &^= wire.SFNodeBloom
}
amgr := addrmgr.New(config.ActiveConfig().DataDir, serverutils.KaspadLookup, config.ActiveConfig().SubnetworkID)
addressManager := addrmgr.New(serverutils.KaspadLookup, config.ActiveConfig().SubnetworkID)
var listeners []net.Listener
var nat serverutils.NAT
if !config.ActiveConfig().DisableListen {
var err error
listeners, nat, err = initListeners(amgr, listenAddrs, services)
listeners, nat, err = initListeners(addressManager, listenAddrs, services)
if err != nil {
return nil, err
}
@ -1484,7 +1487,7 @@ func NewServer(listenAddrs []string, dagParams *dagconfig.Params, interrupt <-ch
s := Server{
DAGParams: dagParams,
addrManager: amgr,
AddrManager: addressManager,
newPeers: make(chan *Peer, maxPeers),
donePeers: make(chan *Peer, maxPeers),
banPeers: make(chan *Peer, maxPeers),
@ -1567,7 +1570,7 @@ func NewServer(listenAddrs []string, dagParams *dagconfig.Params, interrupt <-ch
Dial: serverutils.KaspadDial,
OnConnection: s.outboundPeerConnected,
OnConnectionFailed: s.outboundPeerConnectionFailed,
AddrManager: s.addrManager,
AddrManager: s.AddrManager,
})
if err != nil {
return nil, err

View File

@ -6,14 +6,14 @@ import (
"time"
)
// handleGetPeerInfo implements the getPeerInfo command.
func handleGetPeerInfo(s *Server, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) {
// handleGetConnectedPeerInfo implements the getConnectedPeerInfo command.
func handleGetConnectedPeerInfo(s *Server, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) {
peers := s.cfg.ConnMgr.ConnectedPeers()
syncPeerID := s.cfg.SyncMgr.SyncPeerID()
infos := make([]*rpcmodel.GetPeerInfoResult, 0, len(peers))
infos := make([]*rpcmodel.GetConnectedPeerInfoResult, 0, len(peers))
for _, p := range peers {
statsSnap := p.ToPeer().StatsSnapshot()
info := &rpcmodel.GetPeerInfoResult{
info := &rpcmodel.GetConnectedPeerInfoResult{
ID: statsSnap.ID,
Addr: statsSnap.Addr,
Services: fmt.Sprintf("%08d", uint64(statsSnap.Services)),

View File

@ -0,0 +1,57 @@
package rpc
import "github.com/kaspanet/kaspad/rpcmodel"
// handleGetPeerAddresses handles getPeerAddresses commands.
func handleGetPeerAddresses(s *Server, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) {
peersState, err := s.cfg.addressManager.PeersStateForSerialization()
if err != nil {
return nil, err
}
rpcPeersState := rpcmodel.GetPeerAddressesResult{
Version: peersState.Version,
Key: peersState.Key,
Addresses: make([]*rpcmodel.GetPeerAddressesKnownAddressResult, len(peersState.Addresses)),
NewBuckets: make(map[string]*rpcmodel.GetPeerAddressesNewBucketResult),
NewBucketFullNodes: rpcmodel.GetPeerAddressesNewBucketResult{},
TriedBuckets: make(map[string]*rpcmodel.GetPeerAddressesTriedBucketResult),
TriedBucketFullNodes: rpcmodel.GetPeerAddressesTriedBucketResult{},
}
for i, addr := range peersState.Addresses {
rpcPeersState.Addresses[i] = &rpcmodel.GetPeerAddressesKnownAddressResult{
Addr: addr.Addr,
Src: addr.Src,
SubnetworkID: addr.SubnetworkID,
Attempts: addr.Attempts,
TimeStamp: addr.TimeStamp,
LastAttempt: addr.LastAttempt,
LastSuccess: addr.LastSuccess,
}
}
for subnetworkID, bucket := range peersState.NewBuckets {
rpcPeersState.NewBuckets[subnetworkID] = &rpcmodel.GetPeerAddressesNewBucketResult{}
for i, addr := range bucket {
rpcPeersState.NewBuckets[subnetworkID][i] = addr
}
}
for i, addr := range peersState.NewBucketFullNodes {
rpcPeersState.NewBucketFullNodes[i] = addr
}
for subnetworkID, bucket := range peersState.TriedBuckets {
rpcPeersState.TriedBuckets[subnetworkID] = &rpcmodel.GetPeerAddressesTriedBucketResult{}
for i, addr := range bucket {
rpcPeersState.TriedBuckets[subnetworkID][i] = addr
}
}
for i, addr := range peersState.TriedBucketFullNodes {
rpcPeersState.TriedBucketFullNodes[i] = addr
}
return rpcPeersState, nil
}

View File

@ -12,6 +12,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"github.com/kaspanet/kaspad/addrmgr"
"io"
"io/ioutil"
"math/rand"
@ -83,7 +84,8 @@ var rpcHandlersBeforeInit = map[string]commandHandler{
"getMempoolInfo": handleGetMempoolInfo,
"getMempoolEntry": handleGetMempoolEntry,
"getNetTotals": handleGetNetTotals,
"getPeerInfo": handleGetPeerInfo,
"getConnectedPeerInfo": handleGetConnectedPeerInfo,
"getPeerAddresses": handleGetPeerAddresses,
"getRawMempool": handleGetRawMempool,
"getSubnetwork": handleGetSubnetwork,
"getTxOut": handleGetTxOut,
@ -783,6 +785,9 @@ type rpcserverConfig struct {
// These fields define any optional indexes the RPC server can make use
// of to provide additional data when queried.
AcceptanceIndex *indexers.AcceptanceIndex
// addressManager defines the address manager for the RPC server to use.
addressManager *addrmgr.AddrManager
}
// setupRPCListeners returns a slice of listeners that are configured for use
@ -855,6 +860,7 @@ func NewRPCServer(
StartupTime: startupTime,
ConnMgr: &rpcConnManager{p2pServer},
SyncMgr: &rpcSyncMgr{p2pServer, p2pServer.SyncManager},
addressManager: p2pServer.AddrManager,
TimeSource: p2pServer.TimeSource,
DAGParams: p2pServer.DAGParams,
TxMemPool: p2pServer.TxMemPool,

View File

@ -416,29 +416,55 @@ var helpDescsEnUS = map[string]string{
"getNetTotalsResult-totalBytesSent": "Total bytes sent",
"getNetTotalsResult-timeMillis": "Number of milliseconds since 1 Jan 1970 GMT",
// GetPeerInfoResult help.
"getPeerInfoResult-id": "A unique node ID",
"getPeerInfoResult-addr": "The ip address and port of the peer",
"getPeerInfoResult-services": "Services bitmask which represents the services supported by the peer",
"getPeerInfoResult-relayTxes": "Peer has requested transactions be relayed to it",
"getPeerInfoResult-lastSend": "Time the last message was received in seconds since 1 Jan 1970 GMT",
"getPeerInfoResult-lastRecv": "Time the last message was sent in seconds since 1 Jan 1970 GMT",
"getPeerInfoResult-bytesSent": "Total bytes sent",
"getPeerInfoResult-bytesRecv": "Total bytes received",
"getPeerInfoResult-connTime": "Time the connection was made in seconds since 1 Jan 1970 GMT",
"getPeerInfoResult-timeOffset": "The time offset of the peer",
"getPeerInfoResult-pingTime": "Number of microseconds the last ping took",
"getPeerInfoResult-pingWait": "Number of microseconds a queued ping has been waiting for a response",
"getPeerInfoResult-version": "The protocol version of the peer",
"getPeerInfoResult-subVer": "The user agent of the peer",
"getPeerInfoResult-inbound": "Whether or not the peer is an inbound connection",
"getPeerInfoResult-selectedTip": "The selected tip of the peer",
"getPeerInfoResult-banScore": "The ban score",
"getPeerInfoResult-feeFilter": "The requested minimum fee a transaction must have to be announced to the peer",
"getPeerInfoResult-syncNode": "Whether or not the peer is the sync peer",
// GetConnectedPeerInfoResult help.
"getConnectedPeerInfoResult-id": "A unique node ID",
"getConnectedPeerInfoResult-addr": "The ip address and port of the peer",
"getConnectedPeerInfoResult-services": "Services bitmask which represents the services supported by the peer",
"getConnectedPeerInfoResult-relayTxes": "Peer has requested transactions be relayed to it",
"getConnectedPeerInfoResult-lastSend": "Time the last message was received in seconds since 1 Jan 1970 GMT",
"getConnectedPeerInfoResult-lastRecv": "Time the last message was sent in seconds since 1 Jan 1970 GMT",
"getConnectedPeerInfoResult-bytesSent": "Total bytes sent",
"getConnectedPeerInfoResult-bytesRecv": "Total bytes received",
"getConnectedPeerInfoResult-connTime": "Time the connection was made in seconds since 1 Jan 1970 GMT",
"getConnectedPeerInfoResult-timeOffset": "The time offset of the peer",
"getConnectedPeerInfoResult-pingTime": "Number of microseconds the last ping took",
"getConnectedPeerInfoResult-pingWait": "Number of microseconds a queued ping has been waiting for a response",
"getConnectedPeerInfoResult-version": "The protocol version of the peer",
"getConnectedPeerInfoResult-subVer": "The user agent of the peer",
"getConnectedPeerInfoResult-inbound": "Whether or not the peer is an inbound connection",
"getConnectedPeerInfoResult-selectedTip": "The selected tip of the peer",
"getConnectedPeerInfoResult-banScore": "The ban score",
"getConnectedPeerInfoResult-feeFilter": "The requested minimum fee a transaction must have to be announced to the peer",
"getConnectedPeerInfoResult-syncNode": "Whether or not the peer is the sync peer",
// GetPeerInfoCmd help.
"getPeerInfo--synopsis": "Returns data about each connected network peer as an array of json objects.",
// GetConnectedPeerInfoCmd help.
"getConnectedPeerInfo--synopsis": "Returns data about each connected network peer as an array of json objects.",
// GetPeerAddressesResult help.
"getPeerAddressesResult-version": "Peers state serialization version",
"getPeerAddressesResult-key": "Address manager's key for randomness purposes.",
"getPeerAddressesResult-addresses": "The node's known addresses",
"getPeerAddressesResult-newBuckets": "Peers state subnetwork new buckets",
"getPeerAddressesResult-newBuckets--desc": "New buckets keyed by subnetwork ID",
"getPeerAddressesResult-newBuckets--key": "subnetworkId",
"getPeerAddressesResult-newBuckets--value": "New bucket",
"getPeerAddressesResult-newBucketFullNodes": "Peers state full nodes new bucket",
"getPeerAddressesResult-triedBuckets": "Peers state subnetwork tried buckets",
"getPeerAddressesResult-triedBuckets--desc": "Tried buckets keyed by subnetwork ID",
"getPeerAddressesResult-triedBuckets--key": "subnetworkId",
"getPeerAddressesResult-triedBuckets--value": "Tried bucket",
"getPeerAddressesResult-triedBucketFullNodes": "Peers state tried full nodes bucket",
"getPeerAddressesKnownAddressResult-addr": "Address",
"getPeerAddressesKnownAddressResult-src": "Address of the peer that handed the address",
"getPeerAddressesKnownAddressResult-subnetworkId": "Address subnetwork ID",
"getPeerAddressesKnownAddressResult-attempts": "Number of attempts to connect to the address",
"getPeerAddressesKnownAddressResult-timeStamp": "Time the address was added",
"getPeerAddressesKnownAddressResult-lastAttempt": "Last attempt to connect to the address",
"getPeerAddressesKnownAddressResult-lastSuccess": "Last successful attempt to connect to the address",
// GetPeerAddressesCmd help.
"getPeerAddresses--synopsis": "Returns the peers state.",
// GetRawMempoolVerboseResult help.
"getRawMempoolVerboseResult-size": "Transaction size in bytes",
@ -488,7 +514,7 @@ var helpDescsEnUS = map[string]string{
// PingCmd help.
"ping--synopsis": "Queues a ping to be sent to each connected peer.\n" +
"Ping times are provided by getPeerInfo via the pingtime and pingwait fields.",
"Ping times are provided by getConnectedPeerInfo via the pingtime and pingwait fields.",
// RemoveManualNodeCmd help.
"removeManualNode--synopsis": "Removes a peer from the manual nodes list",
@ -616,7 +642,8 @@ var rpcResultTypes = map[string][]interface{}{
"getMempoolInfo": {(*rpcmodel.GetMempoolInfoResult)(nil)},
"getMempoolEntry": {(*rpcmodel.GetMempoolEntryResult)(nil)},
"getNetTotals": {(*rpcmodel.GetNetTotalsResult)(nil)},
"getPeerInfo": {(*[]rpcmodel.GetPeerInfoResult)(nil)},
"getConnectedPeerInfo": {(*[]rpcmodel.GetConnectedPeerInfoResult)(nil)},
"getPeerAddresses": {(*[]rpcmodel.GetPeerAddressesResult)(nil)},
"getRawMempool": {(*[]string)(nil), (*rpcmodel.GetRawMempoolVerboseResult)(nil)},
"getSubnetwork": {(*rpcmodel.GetSubnetworkResult)(nil)},
"getTxOut": {(*rpcmodel.GetTxOutResult)(nil)},