Compare commits

..

1 Commits

Author SHA1 Message Date
Svarog
686c25c72d [NOD-1064] Don't send GetBlockInvsMsg with lowHash = nil (#769) 2020-06-21 08:58:29 +03:00
6 changed files with 19 additions and 53 deletions

View File

@@ -157,7 +157,6 @@ type SyncManager struct {
msgChan chan interface{}
wg sync.WaitGroup
quit chan struct{}
syncPeerLock sync.Mutex
// These fields should only be accessed from the messageHandler thread
rejectedTxns map[daghash.TxID]struct{}
@@ -171,8 +170,6 @@ type SyncManager struct {
// download/sync the blockDAG from. When syncing is already running, it
// simply returns. It also examines the candidates for any which are no longer
// candidates and removes them as needed.
//
// This function MUST be called with the sync peer lock held.
func (sm *SyncManager) startSync() {
// Return now if we're already syncing.
if sm.syncPeer != nil {
@@ -192,7 +189,6 @@ func (sm *SyncManager) startSync() {
// TODO(davec): Use a better algorithm to choose the sync peer.
// For now, just pick the first available candidate.
syncPeer = peer
break
}
// Start syncing from the sync peer if one was selected.
@@ -298,8 +294,8 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
}
// Start syncing by choosing the best candidate if needed.
if isSyncCandidate {
sm.restartSyncIfNeeded()
if isSyncCandidate && sm.syncPeer == nil {
sm.startSync()
}
}
@@ -341,7 +337,7 @@ func (sm *SyncManager) stopSyncFromPeer(peer *peerpkg.Peer) {
// sync peer.
if sm.syncPeer == peer {
sm.syncPeer = nil
sm.restartSyncIfNeeded()
sm.startSync()
}
}
@@ -431,34 +427,24 @@ func (sm *SyncManager) current() bool {
// restartSyncIfNeeded finds a new sync candidate if we're not expecting any
// blocks from the current one.
func (sm *SyncManager) restartSyncIfNeeded() {
sm.syncPeerLock.Lock()
defer sm.syncPeerLock.Unlock()
if !sm.shouldReplaceSyncPeer() {
return
if sm.syncPeer != nil {
syncPeerState, exists := sm.peerStates[sm.syncPeer]
if exists {
isWaitingForBlocks := func() bool {
syncPeerState.requestQueueMtx.Lock()
defer syncPeerState.requestQueueMtx.Unlock()
return len(syncPeerState.requestedBlocks) != 0 || len(syncPeerState.requestQueues[wire.InvTypeSyncBlock].queue) != 0
}()
if isWaitingForBlocks {
return
}
}
}
sm.syncPeer = nil
sm.startSync()
}
func (sm *SyncManager) shouldReplaceSyncPeer() bool {
if sm.syncPeer == nil {
return true
}
syncPeerState, exists := sm.peerStates[sm.syncPeer]
if !exists {
panic(errors.Errorf("no peer state for sync peer %s", sm.syncPeer))
}
syncPeerState.requestQueueMtx.Lock()
defer syncPeerState.requestQueueMtx.Unlock()
return len(syncPeerState.requestedBlocks) == 0 &&
len(syncPeerState.requestQueues[wire.InvTypeSyncBlock].queue) == 0 &&
!sm.syncPeer.WasBlockLocatorRequested()
}
// handleBlockMsg handles block messages from all peers.
func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
peer := bmsg.peer
@@ -919,7 +905,7 @@ func (sm *SyncManager) handleSelectedTipMsg(msg *selectedTipMsg) {
return
}
peer.SetSelectedTipHash(selectedTipHash)
sm.restartSyncIfNeeded()
sm.startSync()
}
// messageHandler is the main handler for the sync manager. It must be run as a

View File

@@ -414,8 +414,6 @@ type Peer struct {
prevGetBlockInvsLow *daghash.Hash
prevGetBlockInvsHigh *daghash.Hash
wasBlockLocatorRequested bool
// These fields keep track of statistics for the peer and are protected
// by the statsMtx mutex.
statsMtx sync.RWMutex
@@ -437,20 +435,6 @@ type Peer struct {
quit chan struct{}
}
// WasBlockLocatorRequested returns whether the node
// is expecting to get a block locator from this
// peer.
func (p *Peer) WasBlockLocatorRequested() bool {
return p.wasBlockLocatorRequested
}
// SetWasBlockLocatorRequested sets whether the node
// is expecting to get a block locator from this
// peer.
func (p *Peer) SetWasBlockLocatorRequested(wasBlockLocatorRequested bool) {
p.wasBlockLocatorRequested = wasBlockLocatorRequested
}
// String returns the peer's address and directionality as a human-readable
// string.
//
@@ -791,7 +775,6 @@ func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress, subnetworkID *subnetwor
//
// This function is safe for concurrent access.
func (p *Peer) PushGetBlockLocatorMsg(highHash, lowHash *daghash.Hash) {
p.SetWasBlockLocatorRequested(true)
msg := wire.NewMsgGetBlockLocator(highHash, lowHash)
p.QueueMessage(msg, nil)
}

View File

@@ -607,7 +607,7 @@ func TestOutboundPeer(t *testing.T) {
t.Errorf("PushAddrMsg: unexpected err %v\n", err)
return
}
if err := p2.PushGetBlockInvsMsg(nil, &daghash.Hash{}); err != nil {
if err := p2.PushGetBlockInvsMsg(&daghash.Hash{}, &daghash.Hash{}); err != nil {
t.Errorf("PushGetBlockInvsMsg: unexpected err %v\n", err)
return
}

View File

@@ -8,7 +8,6 @@ import (
// OnBlockLocator is invoked when a peer receives a locator kaspa
// message.
func (sp *Peer) OnBlockLocator(_ *peer.Peer, msg *wire.MsgBlockLocator) {
sp.SetWasBlockLocatorRequested(false)
// Find the highest known shared block between the peers, and asks
// the block and its future from the peer. If the block is not
// found, create a lower resolution block locator and send it to

View File

@@ -24,7 +24,6 @@ func HandlePanic(log *logs.Logger, goroutineStackTrace []byte) {
log.Criticalf("Goroutine stack trace: %s", goroutineStackTrace)
}
log.Criticalf("Stack trace: %s", debug.Stack())
log.Backend().Close()
close(panicHandlerDone)
}()
@@ -35,9 +34,8 @@ func HandlePanic(log *logs.Logger, goroutineStackTrace []byte) {
fmt.Fprintln(os.Stderr, "Couldn't handle a fatal error. Exiting...")
case <-panicHandlerDone:
}
fmt.Print("Exiting...")
log.Criticalf("Exiting")
os.Exit(1)
fmt.Print("After os.Exit(1)")
}
// GoroutineWrapperFunc returns a goroutine wrapper function that handles panics and writes them to the log.

View File

@@ -11,7 +11,7 @@ const validCharacters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrs
const (
appMajor uint = 0
appMinor uint = 3
appPatch uint = 1
appPatch uint = 0
)
// appBuild is defined as a variable so it can be overridden during the build