[NOD-422] Separate request queue and request queue set by inv type (#471)

* [NOD-422] Separate request queue and request queue set by inv type

* [NOD-422] Make one-liner pop and remove redundant nil assignment
This commit is contained in:
Ori Newman 2019-11-18 15:34:01 +02:00 committed by Svarog
parent 0f34cfb1a2
commit cba346d753
2 changed files with 69 additions and 65 deletions

View File

@ -135,19 +135,17 @@ type headerNode struct {
hash *daghash.Hash hash *daghash.Hash
} }
type requestQueueAndSet struct {
queue []*wire.InvVect
set map[daghash.Hash]struct{}
}
// peerSyncState stores additional information that the SyncManager tracks // peerSyncState stores additional information that the SyncManager tracks
// about a peer. // about a peer.
type peerSyncState struct { type peerSyncState struct {
syncCandidate bool syncCandidate bool
requestQueueMtx sync.Mutex requestQueueMtx sync.Mutex
// relayedInvsRequestQueue contains invs of blocks and transactions requestQueues map[wire.InvType]*requestQueueAndSet
// which are relayed to our node.
relayedInvsRequestQueue []*wire.InvVect
// requestQueue contains all of the invs that are not relayed to
// us; we get them by requesting them or by manually creating them.
requestQueue []*wire.InvVect
relayedInvsRequestQueueSet map[daghash.Hash]struct{}
requestQueueSet map[daghash.Hash]struct{}
requestedTxns map[daghash.TxID]struct{} requestedTxns map[daghash.TxID]struct{}
requestedBlocks map[daghash.Hash]struct{} requestedBlocks map[daghash.Hash]struct{}
} }
@ -358,12 +356,18 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
// Initialize the peer state // Initialize the peer state
isSyncCandidate := sm.isSyncCandidate(peer) isSyncCandidate := sm.isSyncCandidate(peer)
requestQueues := make(map[wire.InvType]*requestQueueAndSet)
requestQueueInvTypes := []wire.InvType{wire.InvTypeTx, wire.InvTypeBlock, wire.InvTypeSyncBlock}
for _, invType := range requestQueueInvTypes {
requestQueues[invType] = &requestQueueAndSet{
set: make(map[daghash.Hash]struct{}),
}
}
sm.peerStates[peer] = &peerSyncState{ sm.peerStates[peer] = &peerSyncState{
syncCandidate: isSyncCandidate, syncCandidate: isSyncCandidate,
requestedTxns: make(map[daghash.TxID]struct{}), requestedTxns: make(map[daghash.TxID]struct{}),
requestedBlocks: make(map[daghash.Hash]struct{}), requestedBlocks: make(map[daghash.Hash]struct{}),
requestQueueSet: make(map[daghash.Hash]struct{}), requestQueues: requestQueues,
relayedInvsRequestQueueSet: make(map[daghash.Hash]struct{}),
} }
// Start syncing by choosing the best candidate if needed. // Start syncing by choosing the best candidate if needed.
@ -511,7 +515,7 @@ func (sm *SyncManager) restartSyncIfNeeded() {
isWaitingForBlocks := func() bool { isWaitingForBlocks := func() bool {
syncPeerState.requestQueueMtx.Lock() syncPeerState.requestQueueMtx.Lock()
defer syncPeerState.requestQueueMtx.Unlock() defer syncPeerState.requestQueueMtx.Unlock()
return len(syncPeerState.requestedBlocks) != 0 || len(syncPeerState.requestQueue) != 0 return len(syncPeerState.requestedBlocks) != 0 || len(syncPeerState.requestQueues[wire.InvTypeSyncBlock].queue) != 0
}() }()
if isWaitingForBlocks { if isWaitingForBlocks {
return return
@ -708,33 +712,31 @@ func (sm *SyncManager) addBlocksToRequestQueue(state *peerSyncState, hashes []*d
defer state.requestQueueMtx.Unlock() defer state.requestQueueMtx.Unlock()
for _, hash := range hashes { for _, hash := range hashes {
if _, exists := sm.requestedBlocks[*hash]; !exists { if _, exists := sm.requestedBlocks[*hash]; !exists {
iv := wire.NewInvVect(wire.InvTypeBlock, hash) invType := wire.InvTypeSyncBlock
state.addInvToRequestQueueNoLock(iv, isRelayedInv)
}
}
}
func (state *peerSyncState) addInvToRequestQueueNoLock(iv *wire.InvVect, isRelayedInv bool) {
if iv.Type == wire.InvTypeTx {
log.Criticalf("Adding tx inv %s to request queue", iv.Hash)
}
if isRelayedInv { if isRelayedInv {
if _, exists := state.relayedInvsRequestQueueSet[*iv.Hash]; !exists { invType = wire.InvTypeBlock
state.relayedInvsRequestQueueSet[*iv.Hash] = struct{}{}
state.relayedInvsRequestQueue = append(state.relayedInvsRequestQueue, iv)
} }
} else { iv := wire.NewInvVect(invType, hash)
if _, exists := state.requestQueueSet[*iv.Hash]; !exists { state.addInvToRequestQueueNoLock(iv)
state.requestQueueSet[*iv.Hash] = struct{}{}
state.requestQueue = append(state.requestQueue, iv)
} }
} }
} }
func (state *peerSyncState) addInvToRequestQueue(iv *wire.InvVect, isRelayedInv bool) { func (state *peerSyncState) addInvToRequestQueueNoLock(iv *wire.InvVect) {
requestQueue, ok := state.requestQueues[iv.Type]
if !ok {
panic(errors.Errorf("got unsupported inventory type %s", iv.Type))
}
if _, exists := requestQueue.set[*iv.Hash]; !exists {
requestQueue.set[*iv.Hash] = struct{}{}
requestQueue.queue = append(requestQueue.queue, iv)
}
}
func (state *peerSyncState) addInvToRequestQueue(iv *wire.InvVect) {
state.requestQueueMtx.Lock() state.requestQueueMtx.Lock()
defer state.requestQueueMtx.Unlock() defer state.requestQueueMtx.Unlock()
state.addInvToRequestQueueNoLock(iv, isRelayedInv) state.addInvToRequestQueueNoLock(iv)
} }
// fetchHeaderBlocks creates and sends a request to the syncPeer for the next // fetchHeaderBlocks creates and sends a request to the syncPeer for the next
@ -1005,7 +1007,7 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
} }
// Add it to the request queue. // Add it to the request queue.
state.addInvToRequestQueue(iv, iv.Type != wire.InvTypeSyncBlock) state.addInvToRequestQueue(iv)
continue continue
} }
@ -1050,39 +1052,37 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
} }
} }
func (sm *SyncManager) addInvsToGetDataMessageFromQueue(gdmsg *wire.MsgGetData, state *peerSyncState, requestQueue []*wire.InvVect) ([]*wire.InvVect, error) { func (sm *SyncManager) addInvsToGetDataMessageFromQueue(gdmsg *wire.MsgGetData, state *peerSyncState, invType wire.InvType, maxInvsToAdd int) error {
requestQueue, ok := state.requestQueues[invType]
if !ok {
panic(errors.Errorf("got unsupported inventory type %s", invType))
}
queue := requestQueue.queue
var invsNum int var invsNum int
leftSpaceInGdmsg := wire.MaxInvPerGetDataMsg - len(gdmsg.InvList) leftSpaceInGdmsg := wire.MaxInvPerGetDataMsg - len(gdmsg.InvList)
if len(requestQueue) > leftSpaceInGdmsg { if len(queue) > leftSpaceInGdmsg {
invsNum = leftSpaceInGdmsg invsNum = leftSpaceInGdmsg
} else { } else {
invsNum = len(requestQueue) invsNum = len(queue)
}
if invsNum > maxInvsToAdd {
invsNum = maxInvsToAdd
} }
invsToAdd := make([]*wire.InvVect, 0, invsNum) invsToAdd := make([]*wire.InvVect, 0, invsNum)
addedBlocks := 0 for len(queue) != 0 && len(invsToAdd) < invsNum {
for len(requestQueue) != 0 && len(invsToAdd) < invsNum { var iv *wire.InvVect
iv := requestQueue[0] iv, queue = queue[0], queue[1:]
requestQueue[0] = nil
requestQueue = requestQueue[1:]
if iv.Type == wire.InvTypeSyncBlock || iv.Type == wire.InvTypeBlock {
if addedBlocks >= wire.MaxBlockInvPerGetDataMsg {
continue
}
addedBlocks++
}
exists, err := sm.haveInventory(iv) exists, err := sm.haveInventory(iv)
if err != nil { if err != nil {
return nil, err return err
} }
if !exists { if !exists {
invsToAdd = append(invsToAdd, iv) invsToAdd = append(invsToAdd, iv)
} }
} }
addBlockInv := func(requestQueueSet map[daghash.Hash]struct{}, iv *wire.InvVect) { addBlockInv := func(iv *wire.InvVect) {
delete(requestQueueSet, *iv.Hash)
// Request the block if there is not already a pending // Request the block if there is not already a pending
// request. // request.
if _, exists := sm.requestedBlocks[*iv.Hash]; !exists { if _, exists := sm.requestedBlocks[*iv.Hash]; !exists {
@ -1094,14 +1094,14 @@ func (sm *SyncManager) addInvsToGetDataMessageFromQueue(gdmsg *wire.MsgGetData,
} }
} }
for _, iv := range invsToAdd { for _, iv := range invsToAdd {
switch iv.Type { delete(requestQueue.set, *iv.Hash)
switch invType {
case wire.InvTypeSyncBlock: case wire.InvTypeSyncBlock:
addBlockInv(state.requestQueueSet, iv) addBlockInv(iv)
case wire.InvTypeBlock: case wire.InvTypeBlock:
addBlockInv(state.relayedInvsRequestQueueSet, iv) addBlockInv(iv)
case wire.InvTypeTx: case wire.InvTypeTx:
delete(state.relayedInvsRequestQueueSet, *iv.Hash)
// Request the transaction if there is not already a // Request the transaction if there is not already a
// pending request. // pending request.
if _, exists := sm.requestedTxns[daghash.TxID(*iv.Hash)]; !exists { if _, exists := sm.requestedTxns[daghash.TxID(*iv.Hash)]; !exists {
@ -1113,28 +1113,32 @@ func (sm *SyncManager) addInvsToGetDataMessageFromQueue(gdmsg *wire.MsgGetData,
} }
} }
if len(requestQueue) >= wire.MaxInvPerGetDataMsg { if len(queue) >= wire.MaxInvPerGetDataMsg {
break break
} }
} }
return requestQueue, nil requestQueue.queue = queue
return nil
} }
func (sm *SyncManager) sendInvsFromRequestQueue(peer *peerpkg.Peer, state *peerSyncState) error { func (sm *SyncManager) sendInvsFromRequestQueue(peer *peerpkg.Peer, state *peerSyncState) error {
state.requestQueueMtx.Lock() state.requestQueueMtx.Lock()
defer state.requestQueueMtx.Unlock() defer state.requestQueueMtx.Unlock()
gdmsg := wire.NewMsgGetData() gdmsg := wire.NewMsgGetData()
newRequestQueue, err := sm.addInvsToGetDataMessageFromQueue(gdmsg, state, state.requestQueue) err := sm.addInvsToGetDataMessageFromQueue(gdmsg, state, wire.InvTypeSyncBlock, wire.MaxSyncBlockInvPerGetDataMsg)
if err != nil { if err != nil {
return err return err
} }
state.requestQueue = newRequestQueue
if sm.current() { if sm.current() {
newRequestQueue, err := sm.addInvsToGetDataMessageFromQueue(gdmsg, state, state.relayedInvsRequestQueue) err := sm.addInvsToGetDataMessageFromQueue(gdmsg, state, wire.InvTypeBlock, wire.MaxInvPerGetDataMsg)
if err != nil {
return err
}
err = sm.addInvsToGetDataMessageFromQueue(gdmsg, state, wire.InvTypeTx, wire.MaxInvPerGetDataMsg)
if err != nil { if err != nil {
return err return err
} }
state.relayedInvsRequestQueue = newRequestQueue
} }
if len(gdmsg.InvList) > 0 { if len(gdmsg.InvList) > 0 {
peer.QueueMessage(gdmsg, nil) peer.QueueMessage(gdmsg, nil)

View File

@ -16,9 +16,9 @@ const (
// single bitcoin inv message. // single bitcoin inv message.
MaxInvPerMsg = 1 << 16 MaxInvPerMsg = 1 << 16
// MaxBlockInvPerGetDataMsg is the maximum number of block inventory vectors that can // MaxSyncBlockInvPerGetDataMsg is the maximum number of sync block inventory
// be in a single getData message. // vectors that can be in a single getData message.
MaxBlockInvPerGetDataMsg = 50 MaxSyncBlockInvPerGetDataMsg = 50
// MaxInvPerGetDataMsg is the maximum number of inventory vectors that can // MaxInvPerGetDataMsg is the maximum number of inventory vectors that can
// be in a single getData message. // be in a single getData message.