Compare commits

...

3 Commits

Author SHA1 Message Date
Svarog
fee7b631f6 [NOD-869] Add a print after os.Exit(1) to see if it is ever called (#701) 2020-04-16 15:03:41 +03:00
Ori Newman
3fd647b291 [NOD-858] Don't switch sync peer if the syncing process hasn't yet started with the current sync peer (#700)
* [NOD-858] Don't switch sync peer if the syncing process hasn't yet started with the current sync peer

* [NOD-858] SetShouldSendBlockLocator(false) on OnBlockLocator

* [NOD-858] Rename shouldSendBlockLocator->wasBlockLocatorRequested

* [NOD-858] Move panic to shouldReplaceSyncPeer
2020-04-13 15:49:46 +03:00
Mike Zak
2f255952b7 Updated to version v0.3.1 2020-04-13 15:10:27 +03:00
5 changed files with 52 additions and 18 deletions

View File

@@ -157,6 +157,7 @@ type SyncManager struct {
msgChan chan interface{}
wg sync.WaitGroup
quit chan struct{}
syncPeerLock sync.Mutex
// These fields should only be accessed from the messageHandler thread
rejectedTxns map[daghash.TxID]struct{}
@@ -170,6 +171,8 @@ type SyncManager struct {
// download/sync the blockDAG from. When syncing is already running, it
// simply returns. It also examines the candidates for any which are no longer
// candidates and removes them as needed.
//
// This function MUST be called with the sync peer lock held.
func (sm *SyncManager) startSync() {
// Return now if we're already syncing.
if sm.syncPeer != nil {
@@ -189,6 +192,7 @@ func (sm *SyncManager) startSync() {
// TODO(davec): Use a better algorithm to choose the sync peer.
// For now, just pick the first available candidate.
syncPeer = peer
break
}
// Start syncing from the sync peer if one was selected.
@@ -294,8 +298,8 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
}
// Start syncing by choosing the best candidate if needed.
if isSyncCandidate && sm.syncPeer == nil {
sm.startSync()
if isSyncCandidate {
sm.restartSyncIfNeeded()
}
}
@@ -337,7 +341,7 @@ func (sm *SyncManager) stopSyncFromPeer(peer *peerpkg.Peer) {
// sync peer.
if sm.syncPeer == peer {
sm.syncPeer = nil
sm.startSync()
sm.restartSyncIfNeeded()
}
}
@@ -427,24 +431,34 @@ func (sm *SyncManager) current() bool {
// restartSyncIfNeeded finds a new sync candidate if we're not expecting any
// blocks from the current one.
func (sm *SyncManager) restartSyncIfNeeded() {
if sm.syncPeer != nil {
syncPeerState, exists := sm.peerStates[sm.syncPeer]
if exists {
isWaitingForBlocks := func() bool {
syncPeerState.requestQueueMtx.Lock()
defer syncPeerState.requestQueueMtx.Unlock()
return len(syncPeerState.requestedBlocks) != 0 || len(syncPeerState.requestQueues[wire.InvTypeSyncBlock].queue) != 0
}()
if isWaitingForBlocks {
return
}
}
sm.syncPeerLock.Lock()
defer sm.syncPeerLock.Unlock()
if !sm.shouldReplaceSyncPeer() {
return
}
sm.syncPeer = nil
sm.startSync()
}
func (sm *SyncManager) shouldReplaceSyncPeer() bool {
if sm.syncPeer == nil {
return true
}
syncPeerState, exists := sm.peerStates[sm.syncPeer]
if !exists {
panic(errors.Errorf("no peer state for sync peer %s", sm.syncPeer))
}
syncPeerState.requestQueueMtx.Lock()
defer syncPeerState.requestQueueMtx.Unlock()
return len(syncPeerState.requestedBlocks) == 0 &&
len(syncPeerState.requestQueues[wire.InvTypeSyncBlock].queue) == 0 &&
!sm.syncPeer.WasBlockLocatorRequested()
}
// handleBlockMsg handles block messages from all peers.
func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
peer := bmsg.peer
@@ -905,7 +919,7 @@ func (sm *SyncManager) handleSelectedTipMsg(msg *selectedTipMsg) {
return
}
peer.SetSelectedTipHash(selectedTipHash)
sm.startSync()
sm.restartSyncIfNeeded()
}
// messageHandler is the main handler for the sync manager. It must be run as a

View File

@@ -414,6 +414,8 @@ type Peer struct {
prevGetBlockInvsLow *daghash.Hash
prevGetBlockInvsHigh *daghash.Hash
wasBlockLocatorRequested bool
// These fields keep track of statistics for the peer and are protected
// by the statsMtx mutex.
statsMtx sync.RWMutex
@@ -435,6 +437,20 @@ type Peer struct {
quit chan struct{}
}
// WasBlockLocatorRequested returns whether the node
// is expecting to get a block locator from this
// peer.
func (p *Peer) WasBlockLocatorRequested() bool {
return p.wasBlockLocatorRequested
}
// SetWasBlockLocatorRequested sets whether the node
// is expecting to get a block locator from this
// peer.
func (p *Peer) SetWasBlockLocatorRequested(wasBlockLocatorRequested bool) {
p.wasBlockLocatorRequested = wasBlockLocatorRequested
}
// String returns the peer's address and directionality as a human-readable
// string.
//
@@ -775,6 +791,7 @@ func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress, subnetworkID *subnetwor
//
// This function is safe for concurrent access.
func (p *Peer) PushGetBlockLocatorMsg(highHash, lowHash *daghash.Hash) {
p.SetWasBlockLocatorRequested(true)
msg := wire.NewMsgGetBlockLocator(highHash, lowHash)
p.QueueMessage(msg, nil)
}

View File

@@ -8,6 +8,7 @@ import (
// OnBlockLocator is invoked when a peer receives a locator kaspa
// message.
func (sp *Peer) OnBlockLocator(_ *peer.Peer, msg *wire.MsgBlockLocator) {
sp.SetWasBlockLocatorRequested(false)
// Find the highest known shared block between the peers, and asks
// the block and its future from the peer. If the block is not
// found, create a lower resolution block locator and send it to

View File

@@ -24,6 +24,7 @@ func HandlePanic(log *logs.Logger, goroutineStackTrace []byte) {
log.Criticalf("Goroutine stack trace: %s", goroutineStackTrace)
}
log.Criticalf("Stack trace: %s", debug.Stack())
log.Backend().Close()
close(panicHandlerDone)
}()
@@ -34,8 +35,9 @@ func HandlePanic(log *logs.Logger, goroutineStackTrace []byte) {
fmt.Fprintln(os.Stderr, "Couldn't handle a fatal error. Exiting...")
case <-panicHandlerDone:
}
log.Criticalf("Exiting")
fmt.Print("Exiting...")
os.Exit(1)
fmt.Print("After os.Exit(1)")
}
// GoroutineWrapperFunc returns a goroutine wrapper function that handles panics and writes them to the log.

View File

@@ -11,7 +11,7 @@ const validCharacters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrs
const (
appMajor uint = 0
appMinor uint = 3
appPatch uint = 0
appPatch uint = 1
)
// appBuild is defined as a variable so it can be overridden during the build