mirror of
https://github.com/kaspanet/kaspad.git
synced 2026-02-21 03:03:08 +00:00
Compare commits
3 Commits
v0.7.2-dev
...
v0.3.1-rc3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c88869778d | ||
|
|
3fd647b291 | ||
|
|
2f255952b7 |
@@ -157,6 +157,7 @@ type SyncManager struct {
|
||||
msgChan chan interface{}
|
||||
wg sync.WaitGroup
|
||||
quit chan struct{}
|
||||
syncPeerLock sync.Mutex
|
||||
|
||||
// These fields should only be accessed from the messageHandler thread
|
||||
rejectedTxns map[daghash.TxID]struct{}
|
||||
@@ -170,6 +171,8 @@ type SyncManager struct {
|
||||
// download/sync the blockDAG from. When syncing is already running, it
|
||||
// simply returns. It also examines the candidates for any which are no longer
|
||||
// candidates and removes them as needed.
|
||||
//
|
||||
// This function MUST be called with the sync peer lock held.
|
||||
func (sm *SyncManager) startSync() {
|
||||
// Return now if we're already syncing.
|
||||
if sm.syncPeer != nil {
|
||||
@@ -189,6 +192,7 @@ func (sm *SyncManager) startSync() {
|
||||
// TODO(davec): Use a better algorithm to choose the sync peer.
|
||||
// For now, just pick the first available candidate.
|
||||
syncPeer = peer
|
||||
break
|
||||
}
|
||||
|
||||
// Start syncing from the sync peer if one was selected.
|
||||
@@ -294,8 +298,8 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
|
||||
}
|
||||
|
||||
// Start syncing by choosing the best candidate if needed.
|
||||
if isSyncCandidate && sm.syncPeer == nil {
|
||||
sm.startSync()
|
||||
if isSyncCandidate {
|
||||
sm.restartSyncIfNeeded()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -337,7 +341,7 @@ func (sm *SyncManager) stopSyncFromPeer(peer *peerpkg.Peer) {
|
||||
// sync peer.
|
||||
if sm.syncPeer == peer {
|
||||
sm.syncPeer = nil
|
||||
sm.startSync()
|
||||
sm.restartSyncIfNeeded()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -427,24 +431,34 @@ func (sm *SyncManager) current() bool {
|
||||
// restartSyncIfNeeded finds a new sync candidate if we're not expecting any
|
||||
// blocks from the current one.
|
||||
func (sm *SyncManager) restartSyncIfNeeded() {
|
||||
if sm.syncPeer != nil {
|
||||
syncPeerState, exists := sm.peerStates[sm.syncPeer]
|
||||
if exists {
|
||||
isWaitingForBlocks := func() bool {
|
||||
syncPeerState.requestQueueMtx.Lock()
|
||||
defer syncPeerState.requestQueueMtx.Unlock()
|
||||
return len(syncPeerState.requestedBlocks) != 0 || len(syncPeerState.requestQueues[wire.InvTypeSyncBlock].queue) != 0
|
||||
}()
|
||||
if isWaitingForBlocks {
|
||||
return
|
||||
}
|
||||
}
|
||||
sm.syncPeerLock.Lock()
|
||||
defer sm.syncPeerLock.Unlock()
|
||||
|
||||
if !sm.shouldReplaceSyncPeer() {
|
||||
return
|
||||
}
|
||||
|
||||
sm.syncPeer = nil
|
||||
sm.startSync()
|
||||
}
|
||||
|
||||
func (sm *SyncManager) shouldReplaceSyncPeer() bool {
|
||||
if sm.syncPeer == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
syncPeerState, exists := sm.peerStates[sm.syncPeer]
|
||||
if !exists {
|
||||
panic(errors.Errorf("no peer state for sync peer %s", sm.syncPeer))
|
||||
}
|
||||
|
||||
syncPeerState.requestQueueMtx.Lock()
|
||||
defer syncPeerState.requestQueueMtx.Unlock()
|
||||
return len(syncPeerState.requestedBlocks) == 0 &&
|
||||
len(syncPeerState.requestQueues[wire.InvTypeSyncBlock].queue) == 0 &&
|
||||
!sm.syncPeer.WasBlockLocatorRequested()
|
||||
}
|
||||
|
||||
// handleBlockMsg handles block messages from all peers.
|
||||
func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
|
||||
peer := bmsg.peer
|
||||
@@ -905,7 +919,7 @@ func (sm *SyncManager) handleSelectedTipMsg(msg *selectedTipMsg) {
|
||||
return
|
||||
}
|
||||
peer.SetSelectedTipHash(selectedTipHash)
|
||||
sm.startSync()
|
||||
sm.restartSyncIfNeeded()
|
||||
}
|
||||
|
||||
// messageHandler is the main handler for the sync manager. It must be run as a
|
||||
|
||||
17
peer/peer.go
17
peer/peer.go
@@ -414,6 +414,8 @@ type Peer struct {
|
||||
prevGetBlockInvsLow *daghash.Hash
|
||||
prevGetBlockInvsHigh *daghash.Hash
|
||||
|
||||
wasBlockLocatorRequested bool
|
||||
|
||||
// These fields keep track of statistics for the peer and are protected
|
||||
// by the statsMtx mutex.
|
||||
statsMtx sync.RWMutex
|
||||
@@ -435,6 +437,20 @@ type Peer struct {
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
// WasBlockLocatorRequested returns whether the node
|
||||
// is expecting to get a block locator from this
|
||||
// peer.
|
||||
func (p *Peer) WasBlockLocatorRequested() bool {
|
||||
return p.wasBlockLocatorRequested
|
||||
}
|
||||
|
||||
// SetWasBlockLocatorRequested sets whether the node
|
||||
// is expecting to get a block locator from this
|
||||
// peer.
|
||||
func (p *Peer) SetWasBlockLocatorRequested(wasBlockLocatorRequested bool) {
|
||||
p.wasBlockLocatorRequested = wasBlockLocatorRequested
|
||||
}
|
||||
|
||||
// String returns the peer's address and directionality as a human-readable
|
||||
// string.
|
||||
//
|
||||
@@ -775,6 +791,7 @@ func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress, subnetworkID *subnetwor
|
||||
//
|
||||
// This function is safe for concurrent access.
|
||||
func (p *Peer) PushGetBlockLocatorMsg(highHash, lowHash *daghash.Hash) {
|
||||
p.SetWasBlockLocatorRequested(true)
|
||||
msg := wire.NewMsgGetBlockLocator(highHash, lowHash)
|
||||
p.QueueMessage(msg, nil)
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
// OnBlockLocator is invoked when a peer receives a locator kaspa
|
||||
// message.
|
||||
func (sp *Peer) OnBlockLocator(_ *peer.Peer, msg *wire.MsgBlockLocator) {
|
||||
sp.SetWasBlockLocatorRequested(false)
|
||||
// Find the highest known shared block between the peers, and asks
|
||||
// the block and its future from the peer. If the block is not
|
||||
// found, create a lower resolution block locator and send it to
|
||||
|
||||
@@ -24,6 +24,7 @@ func HandlePanic(log *logs.Logger, goroutineStackTrace []byte) {
|
||||
log.Criticalf("Goroutine stack trace: %s", goroutineStackTrace)
|
||||
}
|
||||
log.Criticalf("Stack trace: %s", debug.Stack())
|
||||
|
||||
log.Backend().Close()
|
||||
close(panicHandlerDone)
|
||||
}()
|
||||
@@ -34,8 +35,9 @@ func HandlePanic(log *logs.Logger, goroutineStackTrace []byte) {
|
||||
fmt.Fprintln(os.Stderr, "Couldn't handle a fatal error. Exiting...")
|
||||
case <-panicHandlerDone:
|
||||
}
|
||||
log.Criticalf("Exiting")
|
||||
fmt.Print("Exiting...")
|
||||
os.Exit(1)
|
||||
fmt.Print("After os.Exit(1)")
|
||||
}
|
||||
|
||||
// GoroutineWrapperFunc returns a goroutine wrapper function that handles panics and writes them to the log.
|
||||
|
||||
@@ -11,7 +11,7 @@ const validCharacters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrs
|
||||
const (
|
||||
appMajor uint = 0
|
||||
appMinor uint = 3
|
||||
appPatch uint = 0
|
||||
appPatch uint = 1
|
||||
)
|
||||
|
||||
// appBuild is defined as a variable so it can be overridden during the build
|
||||
|
||||
Reference in New Issue
Block a user