Compare commits

...

11 Commits

Author SHA1 Message Date
stasatdaglabs
3e5a840c5a [NOD-1052] Add a lock around clearOldEntries to protect against concurrent access of utxoDiffStore.loaded. (#758) 2020-06-11 11:56:25 +03:00
Ori Newman
d6d34238d2 [NOD-1049] Allow empty addr messages (#753) 2020-06-10 16:13:13 +03:00
Ori Newman
8bbced5925 [NOD-1051] Don't disconnect from sync peer if it sends an orphan (#757) 2020-06-10 16:05:48 +03:00
stasatdaglabs
20da1b9c9a [NOD-1048] Make leveldb compaction much less frequent (#756)
* [NOD-1048] Make leveldb compaction much less frequent. Also, allocate an entire gigabyte for leveldb's blockCache and writeBuffer.

* [NOD-1048] Implement changing the options for testing purposes.

* [NOD-1048] Rename originalOptions to originalLDBOptions.

* [NOD-1048] Add a comment.
2020-06-10 16:05:02 +03:00
stasatdaglabs
222477b33e [NOD-1040] Don't remove DAG tips from the diffStore's loaded set (#750)
* [NOD-1040] Don't remove DAG tips from the diffStore's loaded set

* [NOD-1040] Remove a debug log.
2020-06-08 12:14:58 +03:00
Mike Zak
4a50d94633 Update to v0.4.1 2020-06-07 17:54:30 +03:00
stasatdaglabs
b4dba782fb [NOD-1040] Increase maxBlueScoreDifferenceToKeepLoaded to 1500 (#746)
* [NOD-1040] Don't remove DAG tips from the diffStore's loaded set

* [NOD-1040] Fix TestClearOldEntries.

* Revert "[NOD-1040] Fix TestClearOldEntries."

This reverts commit e0705814

* Revert "[NOD-1040] Don't remove DAG tips from the diffStore's loaded set"

This reverts commit d3eba1c1

* [NOD-1040] Increase maxBlueScoreDifferenceToKeepLoaded to 1500.
2020-06-07 17:50:57 +03:00
stasatdaglabs
9c78a797e4 [NOD-1041] Call outboundPeerConnected and outboundPeerConnectionFailed directly instead of routing them through peerHandler (#748)
* [NOD-1041] Fix a deadlock between connHandler and peerHandler.

* [NOD-1041] Simplified the fix.
2020-06-07 16:35:48 +03:00
Ori Newman
35c733a4c1 [NOD-970] Add isSyncing flag (#747)
* [NOD-970] Add isSyncing flag

* [NOD-970] Rename shouldSendSelectedTip->peerShouldSendSelectedTip
2020-06-07 16:31:17 +03:00
stasatdaglabs
96930bd6ea [NOD-1039] Remove the call to SetGCPercent. (#745) 2020-06-07 09:19:28 +03:00
stasatdaglabs
d15c009b3c [NOD-1030] Disconnect from syncPeers that send orphan blocks (#744)
* [NOD-1030] Disconnect from syncPeers that send orphan blocks.

* [NOD-1030] Remove debug log.

* [NOD-1030] Remove unnecessary call to stopSyncFromPeer.
2020-06-04 15:11:05 +03:00
10 changed files with 122 additions and 123 deletions

View File

@@ -5,9 +5,11 @@ package blockdag
import (
"compress/bzip2"
"encoding/binary"
"github.com/kaspanet/kaspad/database/ffldb/ldb"
"github.com/kaspanet/kaspad/dbaccess"
"github.com/kaspanet/kaspad/util"
"github.com/pkg/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
"io"
"io/ioutil"
"os"
@@ -62,6 +64,15 @@ func DAGSetup(dbName string, openDb bool, config Config) (*BlockDAG, func(), err
return nil, nil, errors.Errorf("error creating temp dir: %s", err)
}
// We set ldb.Options here to return nil because normally
// the database is initialized with very large caches that
// can make opening/closing the database for every test
// quite heavy.
originalLDBOptions := ldb.Options
ldb.Options = func() *opt.Options {
return nil
}
dbPath := filepath.Join(tmpDir, dbName)
_ = os.RemoveAll(dbPath)
err = dbaccess.Open(dbPath)
@@ -75,6 +86,7 @@ func DAGSetup(dbName string, openDb bool, config Config) (*BlockDAG, func(), err
spawnWaitGroup.Wait()
spawn = realSpawn
dbaccess.Close()
ldb.Options = originalLDBOptions
os.RemoveAll(dbPath)
}
} else {

View File

@@ -156,17 +156,24 @@ func (diffStore *utxoDiffStore) clearDirtyEntries() {
var maxBlueScoreDifferenceToKeepLoaded uint64 = 100
// clearOldEntries removes entries whose blue score is lower than
// virtual.blueScore - maxBlueScoreDifferenceToKeepLoaded.
// virtual.blueScore - maxBlueScoreDifferenceToKeepLoaded. Note
// that tips are not removed either even if their blue score is
// lower than the above.
func (diffStore *utxoDiffStore) clearOldEntries() {
diffStore.mtx.HighPriorityWriteLock()
defer diffStore.mtx.HighPriorityWriteUnlock()
virtualBlueScore := diffStore.dag.VirtualBlueScore()
minBlueScore := virtualBlueScore - maxBlueScoreDifferenceToKeepLoaded
if maxBlueScoreDifferenceToKeepLoaded > virtualBlueScore {
minBlueScore = 0
}
tips := diffStore.dag.virtual.tips()
toRemove := make(map[*blockNode]struct{})
for node := range diffStore.loaded {
if node.blueScore < minBlueScore {
if node.blueScore < minBlueScore && !tips.contains(node) {
toRemove[node] = struct{}{}
}
}

View File

@@ -149,10 +149,11 @@ func TestClearOldEntries(t *testing.T) {
t.Fatalf("TestClearOldEntries: missing blockNode for hash %s", processedBlock.BlockHash())
}
// Make sure that the child-of-genesis node isn't in the loaded set
// Make sure that the child-of-genesis node is in the loaded set, since it
// is a tip.
_, ok := dag.utxoDiffStore.loaded[node]
if ok {
t.Fatalf("TestClearOldEntries: diffData for node %s is in the loaded set", node.hash)
if !ok {
t.Fatalf("TestClearOldEntries: diffData for node %s is not in the loaded set", node.hash)
}
// Make sure that all the old nodes still do not exist in the loaded set

View File

@@ -15,7 +15,7 @@ type LevelDB struct {
// NewLevelDB opens a leveldb instance defined by the given path.
func NewLevelDB(path string) (*LevelDB, error) {
// Open leveldb. If it doesn't exist, create it.
ldb, err := leveldb.OpenFile(path, nil)
ldb, err := leveldb.OpenFile(path, Options())
// If the database is corrupted, attempt to recover.
if _, corrupted := err.(*ldbErrors.ErrCorrupted); corrupted {

View File

@@ -0,0 +1,19 @@
package ldb
import "github.com/syndtr/goleveldb/leveldb/opt"
var (
defaultOptions = opt.Options{
Compression: opt.NoCompression,
BlockCacheCapacity: 512 * opt.MiB,
WriteBuffer: 512 * opt.MiB,
IteratorSamplingRate: 512 * opt.MiB,
}
// Options is a function that returns a leveldb
// opt.Options struct for opening a database.
// It's defined as a variable for the sake of testing.
Options = func() *opt.Options {
return &defaultOptions
}
)

View File

@@ -10,7 +10,6 @@ import (
"os"
"path/filepath"
"runtime"
"runtime/debug"
"runtime/pprof"
"strings"
"time"
@@ -265,12 +264,6 @@ func main() {
// Use all processor cores.
runtime.GOMAXPROCS(runtime.NumCPU())
// Block and transaction processing can cause bursty allocations. This
// limits the garbage collector from excessively overallocating during
// bursts. This value was arrived at with the help of profiling live
// usage.
debug.SetGCPercent(10)
// Up some limits.
if err := limits.SetLimits(); err != nil {
fmt.Fprintf(os.Stderr, "failed to set limits: %s\n", err)

View File

@@ -132,13 +132,13 @@ type requestQueueAndSet struct {
// peerSyncState stores additional information that the SyncManager tracks
// about a peer.
type peerSyncState struct {
syncCandidate bool
lastSelectedTipRequest time.Time
isPendingForSelectedTip bool
requestQueueMtx sync.Mutex
requestQueues map[wire.InvType]*requestQueueAndSet
requestedTxns map[daghash.TxID]struct{}
requestedBlocks map[daghash.Hash]struct{}
syncCandidate bool
lastSelectedTipRequest time.Time
peerShouldSendSelectedTip bool
requestQueueMtx sync.Mutex
requestQueues map[wire.InvType]*requestQueueAndSet
requestedTxns map[daghash.TxID]struct{}
requestedBlocks map[daghash.Hash]struct{}
}
// SyncManager is used to communicate block related messages with peers. The
@@ -158,6 +158,7 @@ type SyncManager struct {
wg sync.WaitGroup
quit chan struct{}
syncPeerLock sync.Mutex
isSyncing bool
// These fields should only be accessed from the messageHandler thread
rejectedTxns map[daghash.TxID]struct{}
@@ -206,13 +207,21 @@ func (sm *SyncManager) startSync() {
syncPeer.SelectedTipHash(), syncPeer.Addr())
syncPeer.PushGetBlockLocatorMsg(syncPeer.SelectedTipHash(), sm.dagParams.GenesisHash)
sm.isSyncing = true
sm.syncPeer = syncPeer
return
}
pendingForSelectedTips := false
if sm.shouldQueryPeerSelectedTips() {
sm.isSyncing = true
hasSyncCandidates := false
for peer, state := range sm.peerStates {
if state.peerShouldSendSelectedTip {
pendingForSelectedTips = true
continue
}
if !state.syncCandidate {
continue
}
@@ -222,21 +231,26 @@ func (sm *SyncManager) startSync() {
continue
}
queueMsgGetSelectedTip(peer, state)
sm.queueMsgGetSelectedTip(peer, state)
pendingForSelectedTips = true
}
if !hasSyncCandidates {
log.Warnf("No sync peer candidates available")
}
}
if !pendingForSelectedTips {
sm.isSyncing = false
}
}
func (sm *SyncManager) shouldQueryPeerSelectedTips() bool {
return sm.dag.Now().Sub(sm.dag.CalcPastMedianTime()) > minDAGTimeDelay
}
func queueMsgGetSelectedTip(peer *peerpkg.Peer, state *peerSyncState) {
func (sm *SyncManager) queueMsgGetSelectedTip(peer *peerpkg.Peer, state *peerSyncState) {
state.lastSelectedTipRequest = time.Now()
state.isPendingForSelectedTip = true
state.peerShouldSendSelectedTip = true
peer.QueueMessage(wire.NewMsgGetSelectedTip(), nil)
}
@@ -417,17 +431,6 @@ func (sm *SyncManager) handleTxMsg(tmsg *txMsg) {
sm.peerNotifier.AnnounceNewTransactions(acceptedTxs)
}
// current returns true if we believe we are synced with our peers, false if we
// still have blocks to check
//
// We consider ourselves current iff both of the following are true:
// 1. there's no syncPeer, a.k.a. all connected peers are at the same tip
// 2. the DAG considers itself current - to prevent attacks where a peer sends an
// unknown tip but never lets us sync to it.
func (sm *SyncManager) current() bool {
return sm.syncPeer == nil && sm.dag.IsCurrent()
}
// restartSyncIfNeeded finds a new sync candidate if we're not expecting any
// blocks from the current one.
func (sm *SyncManager) restartSyncIfNeeded() {
@@ -529,8 +532,8 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
return
}
// Request the parents for the orphan block from the peer that sent it.
if isOrphan {
// Request the parents for the orphan block from the peer that sent it.
missingAncestors, err := sm.dag.GetOrphanMissingAncestorHashes(blockHash)
if err != nil {
log.Errorf("Failed to find missing ancestors for block %s: %s",
@@ -754,7 +757,7 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
log.Errorf("Failed to send invs from queue: %s", err)
}
if haveUnknownInvBlock && !sm.current() {
if haveUnknownInvBlock && !sm.isSyncing {
// If one of the inv messages is an unknown block
// it is an indication that one of our peers has more
// up-to-date data than us.
@@ -839,7 +842,7 @@ func (sm *SyncManager) sendInvsFromRequestQueue(peer *peerpkg.Peer, state *peerS
if err != nil {
return err
}
if sm.syncPeer == nil || sm.isSynced() {
if !sm.isSyncing || sm.isSynced() {
err := sm.addInvsToGetDataMessageFromQueue(gdmsg, state, wire.InvTypeBlock, wire.MaxInvPerGetDataMsg)
if err != nil {
return err
@@ -909,12 +912,12 @@ func (sm *SyncManager) handleSelectedTipMsg(msg *selectedTipMsg) {
peer := msg.peer
selectedTipHash := msg.selectedTipHash
state := sm.peerStates[peer]
if !state.isPendingForSelectedTip {
if !state.peerShouldSendSelectedTip {
log.Warnf("Got unrequested selected tip message from %s -- "+
"disconnecting", peer.Addr())
peer.Disconnect()
}
state.isPendingForSelectedTip = false
state.peerShouldSendSelectedTip = false
if selectedTipHash.IsEqual(peer.SelectedTipHash()) {
return
}
@@ -1019,7 +1022,7 @@ func (sm *SyncManager) handleBlockDAGNotification(notification *blockdag.Notific
// Relay if we are current and the block was not just now unorphaned.
// Otherwise peers that are current should already know about it
if sm.current() && !data.WasUnorphaned {
if sm.isSynced() && !data.WasUnorphaned {
iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
sm.peerNotifier.RelayInventory(iv, block.MsgBlock().Header)
}

View File

@@ -18,14 +18,6 @@ func (sp *Peer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) {
return
}
// A message that has no addresses is invalid.
if len(msg.AddrList) == 0 {
peerLog.Errorf("Command [%s] from %s does not contain any addresses",
msg.Command(), sp.Peer)
sp.Disconnect()
return
}
if msg.IncludeAllSubnetworks {
peerLog.Errorf("Got unexpected IncludeAllSubnetworks=true in [%s] command from %s",
msg.Command(), sp.Peer)

View File

@@ -109,15 +109,6 @@ type relayMsg struct {
data interface{}
}
type outboundPeerConnectedMsg struct {
connReq *connmgr.ConnReq
conn net.Conn
}
type outboundPeerConnectionFailedMsg struct {
connReq *connmgr.ConnReq
}
// Peer extends the peer to maintain state shared by the server and
// the blockmanager.
type Peer struct {
@@ -229,19 +220,17 @@ type Server struct {
DAG *blockdag.BlockDAG
TxMemPool *mempool.TxPool
modifyRebroadcastInv chan interface{}
newPeers chan *Peer
donePeers chan *Peer
banPeers chan *Peer
newOutboundConnection chan *outboundPeerConnectedMsg
newOutboundConnectionFailed chan *outboundPeerConnectionFailedMsg
Query chan interface{}
relayInv chan relayMsg
broadcast chan broadcastMsg
wg sync.WaitGroup
nat serverutils.NAT
TimeSource blockdag.TimeSource
services wire.ServiceFlag
modifyRebroadcastInv chan interface{}
newPeers chan *Peer
donePeers chan *Peer
banPeers chan *Peer
Query chan interface{}
relayInv chan relayMsg
broadcast chan broadcastMsg
wg sync.WaitGroup
nat serverutils.NAT
TimeSource blockdag.TimeSource
services wire.ServiceFlag
// We add to quitWaitGroup before every instance in which we wait for
// the quit channel so that all those instances finish before we shut
@@ -977,17 +966,17 @@ func (s *Server) inboundPeerConnected(conn net.Conn) {
// peer instance, associates it with the relevant state such as the connection
// request instance and the connection itself, and finally notifies the address
// manager of the attempt.
func (s *Server) outboundPeerConnected(state *peerState, msg *outboundPeerConnectedMsg) {
sp := newServerPeer(s, msg.connReq.Permanent)
outboundPeer, err := peer.NewOutboundPeer(newPeerConfig(sp), msg.connReq.Addr.String())
func (s *Server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) {
sp := newServerPeer(s, connReq.Permanent)
outboundPeer, err := peer.NewOutboundPeer(newPeerConfig(sp), connReq.Addr.String())
if err != nil {
srvrLog.Debugf("Cannot create outbound peer %s: %s", msg.connReq.Addr, err)
s.connManager.Disconnect(msg.connReq.ID())
srvrLog.Debugf("Cannot create outbound peer %s: %s", connReq.Addr, err)
s.connManager.Disconnect(connReq.ID())
}
sp.Peer = outboundPeer
sp.connReq = msg.connReq
sp.connReq = connReq
s.peerConnected(sp, msg.conn)
s.peerConnected(sp, conn)
s.addrManager.Attempt(sp.NA())
}
@@ -1012,20 +1001,20 @@ func (s *Server) peerConnected(sp *Peer, conn net.Conn) {
// outboundPeerConnected is invoked by the connection manager when a new
// outbound connection failed to be established.
func (s *Server) outboundPeerConnectionFailed(msg *outboundPeerConnectionFailedMsg) {
func (s *Server) outboundPeerConnectionFailed(connReq *connmgr.ConnReq) {
// If the connection request has no address
// associated to it, do nothing.
if msg.connReq.Addr == nil {
if connReq.Addr == nil {
return
}
host, portStr, err := net.SplitHostPort(msg.connReq.Addr.String())
host, portStr, err := net.SplitHostPort(connReq.Addr.String())
if err != nil {
srvrLog.Debugf("Cannot extract address host and port %s: %s", msg.connReq.Addr, err)
srvrLog.Debugf("Cannot extract address host and port %s: %s", connReq.Addr, err)
}
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
srvrLog.Debugf("Cannot parse port %s: %s", msg.connReq.Addr, err)
srvrLog.Debugf("Cannot parse port %s: %s", connReq.Addr, err)
}
// defaultServices is used here because Attempt makes no use
@@ -1137,12 +1126,6 @@ out:
})
s.quitWaitGroup.Done()
break out
case opcMsg := <-s.newOutboundConnection:
s.outboundPeerConnected(state, opcMsg)
case opcfMsg := <-s.newOutboundConnectionFailed:
s.outboundPeerConnectionFailed(opcfMsg)
}
}
@@ -1497,23 +1480,21 @@ func NewServer(listenAddrs []string, dagParams *dagconfig.Params, interrupt <-ch
maxPeers := config.ActiveConfig().TargetOutboundPeers + config.ActiveConfig().MaxInboundPeers
s := Server{
DAGParams: dagParams,
addrManager: amgr,
newPeers: make(chan *Peer, maxPeers),
donePeers: make(chan *Peer, maxPeers),
banPeers: make(chan *Peer, maxPeers),
Query: make(chan interface{}),
relayInv: make(chan relayMsg, maxPeers),
broadcast: make(chan broadcastMsg, maxPeers),
quit: make(chan struct{}),
modifyRebroadcastInv: make(chan interface{}),
newOutboundConnection: make(chan *outboundPeerConnectedMsg, config.ActiveConfig().TargetOutboundPeers),
newOutboundConnectionFailed: make(chan *outboundPeerConnectionFailedMsg, config.ActiveConfig().TargetOutboundPeers),
nat: nat,
TimeSource: blockdag.NewTimeSource(),
services: services,
SigCache: txscript.NewSigCache(config.ActiveConfig().SigCacheMaxSize),
notifyNewTransactions: notifyNewTransactions,
DAGParams: dagParams,
addrManager: amgr,
newPeers: make(chan *Peer, maxPeers),
donePeers: make(chan *Peer, maxPeers),
banPeers: make(chan *Peer, maxPeers),
Query: make(chan interface{}),
relayInv: make(chan relayMsg, maxPeers),
broadcast: make(chan broadcastMsg, maxPeers),
quit: make(chan struct{}),
modifyRebroadcastInv: make(chan interface{}),
nat: nat,
TimeSource: blockdag.NewTimeSource(),
services: services,
SigCache: txscript.NewSigCache(config.ActiveConfig().SigCacheMaxSize),
notifyNewTransactions: notifyNewTransactions,
}
// Create indexes if needed.
@@ -1576,23 +1557,14 @@ func NewServer(listenAddrs []string, dagParams *dagconfig.Params, interrupt <-ch
// Create a connection manager.
cmgr, err := connmgr.New(&connmgr.Config{
Listeners: listeners,
OnAccept: s.inboundPeerConnected,
RetryDuration: connectionRetryInterval,
TargetOutbound: uint32(config.ActiveConfig().TargetOutboundPeers),
Dial: serverutils.KaspadDial,
OnConnection: func(c *connmgr.ConnReq, conn net.Conn) {
s.newOutboundConnection <- &outboundPeerConnectedMsg{
connReq: c,
conn: conn,
}
},
OnConnectionFailed: func(c *connmgr.ConnReq) {
s.newOutboundConnectionFailed <- &outboundPeerConnectionFailedMsg{
connReq: c,
}
},
AddrManager: s.addrManager,
Listeners: listeners,
OnAccept: s.inboundPeerConnected,
RetryDuration: connectionRetryInterval,
TargetOutbound: uint32(config.ActiveConfig().TargetOutboundPeers),
Dial: serverutils.KaspadDial,
OnConnection: s.outboundPeerConnected,
OnConnectionFailed: s.outboundPeerConnectionFailed,
AddrManager: s.addrManager,
})
if err != nil {
return nil, err

View File

@@ -11,7 +11,7 @@ const validCharacters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrs
const (
appMajor uint = 0
appMinor uint = 4
appPatch uint = 0
appPatch uint = 1
)
// appBuild is defined as a variable so it can be overridden during the build