[NOD-722] Fix processBlockMsg case in blockHandler to send only one response to msg.reply, and rename blockHandler->messageHandler (#622)

This commit is contained in:
Ori Newman 2020-02-04 18:10:15 +02:00 committed by GitHub
parent 45dc1a3e7b
commit d90a08ecfa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -158,7 +158,7 @@ type SyncManager struct {
wg sync.WaitGroup
quit chan struct{}
// These fields should only be accessed from the blockHandler thread
// These fields should only be accessed from the messageHandler thread
rejectedTxns map[daghash.TxID]struct{}
requestedTxns map[daghash.TxID]struct{}
requestedBlocks map[daghash.Hash]struct{}
@ -865,6 +865,19 @@ func (sm *SyncManager) limitHashMap(m map[daghash.Hash]struct{}, limit int) {
}
}
func (sm *SyncManager) handleProcessBlockMsg(msg processBlockMsg) (isOrphan bool, err error) {
isOrphan, isDelayed, err := sm.dag.ProcessBlock(
msg.block, msg.flags)
if err != nil {
return false, err
}
if isDelayed {
return false, errors.New("Cannot process blocks from RPC beyond the allowed time offset")
}
return isOrphan, nil
}
func (sm *SyncManager) handleSelectedTipMsg(msg *selectedTipMsg) {
peer := msg.peer
selectedTipHash := msg.selectedTipHash
@ -882,13 +895,13 @@ func (sm *SyncManager) handleSelectedTipMsg(msg *selectedTipMsg) {
sm.startSync()
}
// blockHandler is the main handler for the sync manager. It must be run as a
// messageHandler is the main handler for the sync manager. It must be run as a
// goroutine. It processes block and inv messages in a separate goroutine
// from the peer handlers so the block (MsgBlock) messages are handled by a
// single thread without needing to lock memory data structures. This is
// important because the sync manager controls which blocks are needed and how
// the fetching should proceed.
func (sm *SyncManager) blockHandler() {
func (sm *SyncManager) messageHandler() {
out:
for {
select {
@ -922,24 +935,10 @@ out:
msg.reply <- peerID
case processBlockMsg:
isOrphan, isDelayed, err := sm.dag.ProcessBlock(
msg.block, msg.flags)
if err != nil {
msg.reply <- processBlockResponse{
isOrphan: false,
err: err,
}
}
if isDelayed {
msg.reply <- processBlockResponse{
isOrphan: false,
err: errors.New("Cannot process blocks from RPC beyond the allowed time offset"),
}
}
isOrphan, err := sm.handleProcessBlockMsg(msg)
msg.reply <- processBlockResponse{
isOrphan: isOrphan,
err: nil,
err: err,
}
case isCurrentMsg:
@ -1092,7 +1091,7 @@ func (sm *SyncManager) Start() {
log.Trace("Starting sync manager")
sm.wg.Add(1)
spawn(sm.blockHandler, sm.handlePanic)
spawn(sm.messageHandler, sm.handlePanic)
}
func (sm *SyncManager) handlePanic() {
@ -1123,7 +1122,7 @@ func (sm *SyncManager) SyncPeerID() int32 {
// ProcessBlock makes use of ProcessBlock on an internal instance of a blockDAG.
func (sm *SyncManager) ProcessBlock(block *util.Block, flags blockdag.BehaviorFlags) (bool, error) {
reply := make(chan processBlockResponse, 1)
reply := make(chan processBlockResponse)
sm.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply}
response := <-reply
return response.isOrphan, response.err