diff --git a/netsync/manager.go b/netsync/manager.go index 15670f470..2d847c2a5 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -158,7 +158,7 @@ type SyncManager struct { wg sync.WaitGroup quit chan struct{} - // These fields should only be accessed from the blockHandler thread + // These fields should only be accessed from the messageHandler thread rejectedTxns map[daghash.TxID]struct{} requestedTxns map[daghash.TxID]struct{} requestedBlocks map[daghash.Hash]struct{} @@ -865,6 +865,19 @@ func (sm *SyncManager) limitHashMap(m map[daghash.Hash]struct{}, limit int) { } } +func (sm *SyncManager) handleProcessBlockMsg(msg processBlockMsg) (isOrphan bool, err error) { + isOrphan, isDelayed, err := sm.dag.ProcessBlock( + msg.block, msg.flags) + if err != nil { + return false, err + } + if isDelayed { + return false, errors.New("Cannot process blocks from RPC beyond the allowed time offset") + } + + return isOrphan, nil +} + func (sm *SyncManager) handleSelectedTipMsg(msg *selectedTipMsg) { peer := msg.peer selectedTipHash := msg.selectedTipHash @@ -882,13 +895,13 @@ func (sm *SyncManager) handleSelectedTipMsg(msg *selectedTipMsg) { sm.startSync() } -// blockHandler is the main handler for the sync manager. It must be run as a +// messageHandler is the main handler for the sync manager. It must be run as a // goroutine. It processes block and inv messages in a separate goroutine // from the peer handlers so the block (MsgBlock) messages are handled by a // single thread without needing to lock memory data structures. This is // important because the sync manager controls which blocks are needed and how // the fetching should proceed. -func (sm *SyncManager) blockHandler() { +func (sm *SyncManager) messageHandler() { out: for { select { @@ -922,24 +935,10 @@ out: msg.reply <- peerID case processBlockMsg: - isOrphan, isDelayed, err := sm.dag.ProcessBlock( - msg.block, msg.flags) - if err != nil { - msg.reply <- processBlockResponse{ - isOrphan: false, - err: err, - } - } - if isDelayed { - msg.reply <- processBlockResponse{ - isOrphan: false, - err: errors.New("Cannot process blocks from RPC beyond the allowed time offset"), - } - } - + isOrphan, err := sm.handleProcessBlockMsg(msg) msg.reply <- processBlockResponse{ isOrphan: isOrphan, - err: nil, + err: err, } case isCurrentMsg: @@ -1092,7 +1091,7 @@ func (sm *SyncManager) Start() { log.Trace("Starting sync manager") sm.wg.Add(1) - spawn(sm.blockHandler, sm.handlePanic) + spawn(sm.messageHandler, sm.handlePanic) } func (sm *SyncManager) handlePanic() { @@ -1123,7 +1122,7 @@ func (sm *SyncManager) SyncPeerID() int32 { // ProcessBlock makes use of ProcessBlock on an internal instance of a blockDAG. func (sm *SyncManager) ProcessBlock(block *util.Block, flags blockdag.BehaviorFlags) (bool, error) { - reply := make(chan processBlockResponse, 1) + reply := make(chan processBlockResponse) sm.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply} response := <-reply return response.isOrphan, response.err