diff --git a/netsync/manager.go b/netsync/manager.go index 030dbe06a..da366a010 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -135,21 +135,19 @@ type headerNode struct { hash *daghash.Hash } +type requestQueueAndSet struct { + queue []*wire.InvVect + set map[daghash.Hash]struct{} +} + // peerSyncState stores additional information that the SyncManager tracks // about a peer. type peerSyncState struct { syncCandidate bool requestQueueMtx sync.Mutex - // relayedInvsRequestQueue contains invs of blocks and transactions - // which are relayed to our node. - relayedInvsRequestQueue []*wire.InvVect - // requestQueue contains all of the invs that are not relayed to - // us; we get them by requesting them or by manually creating them. - requestQueue []*wire.InvVect - relayedInvsRequestQueueSet map[daghash.Hash]struct{} - requestQueueSet map[daghash.Hash]struct{} - requestedTxns map[daghash.TxID]struct{} - requestedBlocks map[daghash.Hash]struct{} + requestQueues map[wire.InvType]*requestQueueAndSet + requestedTxns map[daghash.TxID]struct{} + requestedBlocks map[daghash.Hash]struct{} } // SyncManager is used to communicate block related messages with peers. The @@ -358,12 +356,18 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) { // Initialize the peer state isSyncCandidate := sm.isSyncCandidate(peer) + requestQueues := make(map[wire.InvType]*requestQueueAndSet) + requestQueueInvTypes := []wire.InvType{wire.InvTypeTx, wire.InvTypeBlock, wire.InvTypeSyncBlock} + for _, invType := range requestQueueInvTypes { + requestQueues[invType] = &requestQueueAndSet{ + set: make(map[daghash.Hash]struct{}), + } + } sm.peerStates[peer] = &peerSyncState{ - syncCandidate: isSyncCandidate, - requestedTxns: make(map[daghash.TxID]struct{}), - requestedBlocks: make(map[daghash.Hash]struct{}), - requestQueueSet: make(map[daghash.Hash]struct{}), - relayedInvsRequestQueueSet: make(map[daghash.Hash]struct{}), + syncCandidate: isSyncCandidate, + requestedTxns: make(map[daghash.TxID]struct{}), + requestedBlocks: make(map[daghash.Hash]struct{}), + requestQueues: requestQueues, } // Start syncing by choosing the best candidate if needed. @@ -511,7 +515,7 @@ func (sm *SyncManager) restartSyncIfNeeded() { isWaitingForBlocks := func() bool { syncPeerState.requestQueueMtx.Lock() defer syncPeerState.requestQueueMtx.Unlock() - return len(syncPeerState.requestedBlocks) != 0 || len(syncPeerState.requestQueue) != 0 + return len(syncPeerState.requestedBlocks) != 0 || len(syncPeerState.requestQueues[wire.InvTypeSyncBlock].queue) != 0 }() if isWaitingForBlocks { return @@ -708,33 +712,31 @@ func (sm *SyncManager) addBlocksToRequestQueue(state *peerSyncState, hashes []*d defer state.requestQueueMtx.Unlock() for _, hash := range hashes { if _, exists := sm.requestedBlocks[*hash]; !exists { - iv := wire.NewInvVect(wire.InvTypeBlock, hash) - state.addInvToRequestQueueNoLock(iv, isRelayedInv) + invType := wire.InvTypeSyncBlock + if isRelayedInv { + invType = wire.InvTypeBlock + } + iv := wire.NewInvVect(invType, hash) + state.addInvToRequestQueueNoLock(iv) } } } -func (state *peerSyncState) addInvToRequestQueueNoLock(iv *wire.InvVect, isRelayedInv bool) { - if iv.Type == wire.InvTypeTx { - log.Criticalf("Adding tx inv %s to request queue", iv.Hash) +func (state *peerSyncState) addInvToRequestQueueNoLock(iv *wire.InvVect) { + requestQueue, ok := state.requestQueues[iv.Type] + if !ok { + panic(errors.Errorf("got unsupported inventory type %s", iv.Type)) } - if isRelayedInv { - if _, exists := state.relayedInvsRequestQueueSet[*iv.Hash]; !exists { - state.relayedInvsRequestQueueSet[*iv.Hash] = struct{}{} - state.relayedInvsRequestQueue = append(state.relayedInvsRequestQueue, iv) - } - } else { - if _, exists := state.requestQueueSet[*iv.Hash]; !exists { - state.requestQueueSet[*iv.Hash] = struct{}{} - state.requestQueue = append(state.requestQueue, iv) - } + if _, exists := requestQueue.set[*iv.Hash]; !exists { + requestQueue.set[*iv.Hash] = struct{}{} + requestQueue.queue = append(requestQueue.queue, iv) } } -func (state *peerSyncState) addInvToRequestQueue(iv *wire.InvVect, isRelayedInv bool) { +func (state *peerSyncState) addInvToRequestQueue(iv *wire.InvVect) { state.requestQueueMtx.Lock() defer state.requestQueueMtx.Unlock() - state.addInvToRequestQueueNoLock(iv, isRelayedInv) + state.addInvToRequestQueueNoLock(iv) } // fetchHeaderBlocks creates and sends a request to the syncPeer for the next @@ -1005,7 +1007,7 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) { } // Add it to the request queue. - state.addInvToRequestQueue(iv, iv.Type != wire.InvTypeSyncBlock) + state.addInvToRequestQueue(iv) continue } @@ -1050,39 +1052,37 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) { } } -func (sm *SyncManager) addInvsToGetDataMessageFromQueue(gdmsg *wire.MsgGetData, state *peerSyncState, requestQueue []*wire.InvVect) ([]*wire.InvVect, error) { +func (sm *SyncManager) addInvsToGetDataMessageFromQueue(gdmsg *wire.MsgGetData, state *peerSyncState, invType wire.InvType, maxInvsToAdd int) error { + requestQueue, ok := state.requestQueues[invType] + if !ok { + panic(errors.Errorf("got unsupported inventory type %s", invType)) + } + queue := requestQueue.queue var invsNum int leftSpaceInGdmsg := wire.MaxInvPerGetDataMsg - len(gdmsg.InvList) - if len(requestQueue) > leftSpaceInGdmsg { + if len(queue) > leftSpaceInGdmsg { invsNum = leftSpaceInGdmsg } else { - invsNum = len(requestQueue) + invsNum = len(queue) + } + if invsNum > maxInvsToAdd { + invsNum = maxInvsToAdd } invsToAdd := make([]*wire.InvVect, 0, invsNum) - addedBlocks := 0 - for len(requestQueue) != 0 && len(invsToAdd) < invsNum { - iv := requestQueue[0] - requestQueue[0] = nil - requestQueue = requestQueue[1:] - - if iv.Type == wire.InvTypeSyncBlock || iv.Type == wire.InvTypeBlock { - if addedBlocks >= wire.MaxBlockInvPerGetDataMsg { - continue - } - addedBlocks++ - } + for len(queue) != 0 && len(invsToAdd) < invsNum { + var iv *wire.InvVect + iv, queue = queue[0], queue[1:] exists, err := sm.haveInventory(iv) if err != nil { - return nil, err + return err } if !exists { invsToAdd = append(invsToAdd, iv) } } - addBlockInv := func(requestQueueSet map[daghash.Hash]struct{}, iv *wire.InvVect) { - delete(requestQueueSet, *iv.Hash) + addBlockInv := func(iv *wire.InvVect) { // Request the block if there is not already a pending // request. if _, exists := sm.requestedBlocks[*iv.Hash]; !exists { @@ -1094,14 +1094,14 @@ func (sm *SyncManager) addInvsToGetDataMessageFromQueue(gdmsg *wire.MsgGetData, } } for _, iv := range invsToAdd { - switch iv.Type { + delete(requestQueue.set, *iv.Hash) + switch invType { case wire.InvTypeSyncBlock: - addBlockInv(state.requestQueueSet, iv) + addBlockInv(iv) case wire.InvTypeBlock: - addBlockInv(state.relayedInvsRequestQueueSet, iv) + addBlockInv(iv) case wire.InvTypeTx: - delete(state.relayedInvsRequestQueueSet, *iv.Hash) // Request the transaction if there is not already a // pending request. if _, exists := sm.requestedTxns[daghash.TxID(*iv.Hash)]; !exists { @@ -1113,28 +1113,32 @@ func (sm *SyncManager) addInvsToGetDataMessageFromQueue(gdmsg *wire.MsgGetData, } } - if len(requestQueue) >= wire.MaxInvPerGetDataMsg { + if len(queue) >= wire.MaxInvPerGetDataMsg { break } } - return requestQueue, nil + requestQueue.queue = queue + return nil } func (sm *SyncManager) sendInvsFromRequestQueue(peer *peerpkg.Peer, state *peerSyncState) error { state.requestQueueMtx.Lock() defer state.requestQueueMtx.Unlock() gdmsg := wire.NewMsgGetData() - newRequestQueue, err := sm.addInvsToGetDataMessageFromQueue(gdmsg, state, state.requestQueue) + err := sm.addInvsToGetDataMessageFromQueue(gdmsg, state, wire.InvTypeSyncBlock, wire.MaxSyncBlockInvPerGetDataMsg) if err != nil { return err } - state.requestQueue = newRequestQueue if sm.current() { - newRequestQueue, err := sm.addInvsToGetDataMessageFromQueue(gdmsg, state, state.relayedInvsRequestQueue) + err := sm.addInvsToGetDataMessageFromQueue(gdmsg, state, wire.InvTypeBlock, wire.MaxInvPerGetDataMsg) + if err != nil { + return err + } + + err = sm.addInvsToGetDataMessageFromQueue(gdmsg, state, wire.InvTypeTx, wire.MaxInvPerGetDataMsg) if err != nil { return err } - state.relayedInvsRequestQueue = newRequestQueue } if len(gdmsg.InvList) > 0 { peer.QueueMessage(gdmsg, nil) diff --git a/wire/invvect.go b/wire/invvect.go index d7dd74f34..12fb53b47 100644 --- a/wire/invvect.go +++ b/wire/invvect.go @@ -16,9 +16,9 @@ const ( // single bitcoin inv message. MaxInvPerMsg = 1 << 16 - // MaxBlockInvPerGetDataMsg is the maximum number of block inventory vectors that can - // be in a single getData message. - MaxBlockInvPerGetDataMsg = 50 + // MaxSyncBlockInvPerGetDataMsg is the maximum number of sync block inventory + // vectors that can be in a single getData message. + MaxSyncBlockInvPerGetDataMsg = 50 // MaxInvPerGetDataMsg is the maximum number of inventory vectors that can // be in a single getData message.