diff --git a/app/protocol/flowcontext/blocks.go b/app/protocol/flowcontext/blocks.go index b2f99d6a4..65eefcc9c 100644 --- a/app/protocol/flowcontext/blocks.go +++ b/app/protocol/flowcontext/blocks.go @@ -150,7 +150,7 @@ func (f *FlowContext) TrySetIBDRunning(ibdPeer *peerpkg.Peer) bool { return false } f.ibdPeer = ibdPeer - log.Infof("IBD started") + log.Infof("IBD started with peer %s", ibdPeer) return true } diff --git a/app/protocol/flows/v5/blockrelay/ibd.go b/app/protocol/flows/v5/blockrelay/ibd.go index 785e104d9..2b40e724e 100644 --- a/app/protocol/flows/v5/blockrelay/ibd.go +++ b/app/protocol/flows/v5/blockrelay/ibd.go @@ -81,88 +81,10 @@ func (flow *handleIBDFlow) runIBDIfNotRunning(block *externalapi.DomainBlock) er log.Debugf("Syncing blocks up to %s", relayBlockHash) log.Debugf("Trying to find highest known syncer chain block from peer %s with relay hash %s", flow.peer, relayBlockHash) - /* - Algorithm: - Request full selected chain block locator from syncer - Find the highest block which we know - Repeat the locator step over the new range until finding max(past(syncee) \cap chain(syncer)) - */ - - // Empty hashes indicate that the full chain is queried - locatorHashes, err := flow.getSyncerChainBlockLocator(nil, nil) + syncerHeaderSelectedTipHash, highestKnownSyncerChainHash, err := flow.negotiateMissingSyncerChainSegment() if err != nil { return err } - if len(locatorHashes) == 0 { - return protocolerrors.Errorf(true, "Expecting initial syncer chain block locator "+ - "to contain at least one element") - } - syncerHeaderSelectedTipHash := locatorHashes[0] - var highestKnownSyncerChainHash *externalapi.DomainHash - chainNegotiationRestartCounter := 0 - for { - var lowestUnknownSyncerChainHash, currentHighestKnownSyncerChainHash *externalapi.DomainHash - for _, syncerChainHash := range locatorHashes { - info, err := flow.Domain().Consensus().GetBlockInfo(syncerChainHash) - if err != nil { - return err - } - if info.Exists { - currentHighestKnownSyncerChainHash = syncerChainHash - break - } - lowestUnknownSyncerChainHash = syncerChainHash - } - // No shared block, break - if currentHighestKnownSyncerChainHash == nil { - break - } - // No point in zooming further - if len(locatorHashes) == 1 { - highestKnownSyncerChainHash = currentHighestKnownSyncerChainHash - break - } - // Zoom in - locatorHashes, err = flow.getSyncerChainBlockLocator( - lowestUnknownSyncerChainHash, - currentHighestKnownSyncerChainHash) - if err != nil { - return err - } - if len(locatorHashes) == 2 { - if !locatorHashes[0].Equal(lowestUnknownSyncerChainHash) || - !locatorHashes[1].Equal(currentHighestKnownSyncerChainHash) { - return protocolerrors.Errorf(true, "Expecting the high and low "+ - "hashes to match the locatorHashes if len(locatorHashes) is 2") - } - // We found our search target - highestKnownSyncerChainHash = currentHighestKnownSyncerChainHash - break - } - if len(locatorHashes) == 0 { - chainNegotiationRestartCounter++ - if chainNegotiationRestartCounter > 64 { - return protocolerrors.Errorf(false, - "Chain negotiation with syncer %s exceeded restart limit %d", flow.peer, chainNegotiationRestartCounter) - } - - // An empty locator signals that the syncer chain was modified and no longer contains one of - // the queried hashes, so we restart the search - locatorHashes, err = flow.getSyncerChainBlockLocator(nil, nil) - if err != nil { - return err - } - if len(locatorHashes) == 0 { - return protocolerrors.Errorf(true, "Expecting initial syncer chain block locator "+ - "to contain at least one element") - } - // Reset syncer's header selected tip - syncerHeaderSelectedTipHash = locatorHashes[0] - } - } - - log.Debugf("Found highest known syncer chain block %s from peer %s", - highestKnownSyncerChainHash, flow.peer) shouldDownloadHeadersProof, shouldSync, err := flow.shouldSyncAndShouldDownloadHeadersProof( block, highestKnownSyncerChainHash) @@ -227,6 +149,124 @@ func (flow *handleIBDFlow) runIBDIfNotRunning(block *externalapi.DomainBlock) er return nil } +func (flow *handleIBDFlow) negotiateMissingSyncerChainSegment() (*externalapi.DomainHash, *externalapi.DomainHash, error) { + /* + Algorithm: + Request full selected chain block locator from syncer + Find the highest block which we know + Repeat the locator step over the new range until finding max(past(syncee) \cap chain(syncer)) + */ + + // Empty hashes indicate that the full chain is queried + locatorHashes, err := flow.getSyncerChainBlockLocator(nil, nil, common.DefaultTimeout) + if err != nil { + return nil, nil, err + } + if len(locatorHashes) == 0 { + return nil, nil, protocolerrors.Errorf(true, "Expecting initial syncer chain block locator "+ + "to contain at least one element") + } + log.Debugf("IBD chain negotiation with peer %s started and received %d hashes (%s, %s)", flow.peer, + len(locatorHashes), locatorHashes[0], locatorHashes[len(locatorHashes)-1]) + syncerHeaderSelectedTipHash := locatorHashes[0] + var highestKnownSyncerChainHash *externalapi.DomainHash + chainNegotiationRestartCounter := 0 + chainNegotiationZoomCounts := 0 + initialLocatorLen := len(locatorHashes) + for { + var lowestUnknownSyncerChainHash, currentHighestKnownSyncerChainHash *externalapi.DomainHash + for _, syncerChainHash := range locatorHashes { + info, err := flow.Domain().Consensus().GetBlockInfo(syncerChainHash) + if err != nil { + return nil, nil, err + } + if info.Exists { + currentHighestKnownSyncerChainHash = syncerChainHash + break + } + lowestUnknownSyncerChainHash = syncerChainHash + } + // No unknown blocks, break. Note this can only happen in the first iteration + if lowestUnknownSyncerChainHash == nil { + highestKnownSyncerChainHash = currentHighestKnownSyncerChainHash + break + } + // No shared block, break + if currentHighestKnownSyncerChainHash == nil { + highestKnownSyncerChainHash = nil + break + } + // No point in zooming further + if len(locatorHashes) == 1 { + highestKnownSyncerChainHash = currentHighestKnownSyncerChainHash + break + } + // Zoom in + locatorHashes, err = flow.getSyncerChainBlockLocator( + lowestUnknownSyncerChainHash, + currentHighestKnownSyncerChainHash, time.Second*10) + if err != nil { + return nil, nil, err + } + if len(locatorHashes) > 0 { + if !locatorHashes[0].Equal(lowestUnknownSyncerChainHash) || + !locatorHashes[len(locatorHashes)-1].Equal(currentHighestKnownSyncerChainHash) { + return nil, nil, protocolerrors.Errorf(true, "Expecting the high and low "+ + "hashes to match the locator bounds") + } + + chainNegotiationZoomCounts++ + log.Debugf("IBD chain negotiation with peer %s zoomed in (%d) and received %d hashes (%s, %s)", flow.peer, + chainNegotiationZoomCounts, len(locatorHashes), locatorHashes[0], locatorHashes[len(locatorHashes)-1]) + + if len(locatorHashes) == 2 { + // We found our search target + highestKnownSyncerChainHash = currentHighestKnownSyncerChainHash + break + } + + if chainNegotiationZoomCounts > initialLocatorLen*2 { + // Since the zoom-in always queries two consecutive entries in the previous locator, it is + // expected to decrease in size at least every two iterations + return nil, nil, protocolerrors.Errorf(true, + "IBD chain negotiation: Number of zoom-in steps %d exceeded the upper bound of 2*%d", + chainNegotiationZoomCounts, initialLocatorLen) + } + + } else { // Empty locator signals a restart due to chain changes + chainNegotiationZoomCounts = 0 + chainNegotiationRestartCounter++ + if chainNegotiationRestartCounter > 32 { + return nil, nil, protocolerrors.Errorf(false, + "IBD chain negotiation with syncer %s exceeded restart limit %d", flow.peer, chainNegotiationRestartCounter) + } + log.Warnf("IBD chain negotiation with syncer %s restarted %d times", flow.peer, chainNegotiationRestartCounter) + + // An empty locator signals that the syncer chain was modified and no longer contains one of + // the queried hashes, so we restart the search. We use a shorter timeout here to avoid a timeout attack + locatorHashes, err = flow.getSyncerChainBlockLocator(nil, nil, time.Second*10) + if err != nil { + return nil, nil, err + } + if len(locatorHashes) == 0 { + return nil, nil, protocolerrors.Errorf(true, "Expecting initial syncer chain block locator "+ + "to contain at least one element") + } + log.Infof("IBD chain negotiation with peer %s restarted (%d) and received %d hashes (%s, %s)", flow.peer, + chainNegotiationRestartCounter, len(locatorHashes), locatorHashes[0], locatorHashes[len(locatorHashes)-1]) + + initialLocatorLen = len(locatorHashes) + // Reset syncer's header selected tip + syncerHeaderSelectedTipHash = locatorHashes[0] + } + } + + log.Debugf("Found highest known syncer chain block %s from peer %s", + highestKnownSyncerChainHash, flow.peer) + + return syncerHeaderSelectedTipHash, highestKnownSyncerChainHash, nil +} + func (flow *handleIBDFlow) isGenesisVirtualSelectedParent() (bool, error) { virtualSelectedParent, err := flow.Domain().Consensus().GetVirtualSelectedParent() if err != nil { @@ -241,23 +281,29 @@ func (flow *handleIBDFlow) logIBDFinished(isFinishedSuccessfully bool) { if !isFinishedSuccessfully { successString = "(interrupted)" } - log.Infof("IBD finished %s", successString) + log.Infof("IBD with peer %s finished %s", flow.peer, successString) } func (flow *handleIBDFlow) getSyncerChainBlockLocator( - highHash, lowHash *externalapi.DomainHash) ([]*externalapi.DomainHash, error) { + highHash, lowHash *externalapi.DomainHash, timeout time.Duration) ([]*externalapi.DomainHash, error) { requestIbdChainBlockLocatorMessage := appmessage.NewMsgIBDRequestChainBlockLocator(highHash, lowHash) err := flow.outgoingRoute.Enqueue(requestIbdChainBlockLocatorMessage) if err != nil { return nil, err } - message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) + message, err := flow.incomingRoute.DequeueWithTimeout(timeout) if err != nil { return nil, err } switch message := message.(type) { case *appmessage.MsgIBDChainBlockLocator: + if len(message.BlockLocatorHashes) > 64 { + return nil, protocolerrors.Errorf(true, + "Got block locator of size %d>64 while expecting locator to have size "+ + "which is logarithmic in DAG size (which should never exceed 2^64)", + len(message.BlockLocatorHashes)) + } return message.BlockLocatorHashes, nil default: return nil, protocolerrors.Errorf(true, "received unexpected message type. "+ @@ -271,6 +317,11 @@ func (flow *handleIBDFlow) syncPruningPointFutureHeaders(consensus externalapi.C log.Infof("Downloading headers from %s", flow.peer) + if highestKnownSyncerChainHash.Equal(syncerHeaderSelectedTipHash) { + // No need to get syncer selected tip headers, so sync relay past and return + return flow.syncMissingRelayPast(consensus, syncerHeaderSelectedTipHash, relayBlockHash) + } + err := flow.sendRequestHeaders(highestKnownSyncerChainHash, syncerHeaderSelectedTipHash) if err != nil { return err @@ -318,55 +369,7 @@ func (flow *handleIBDFlow) syncPruningPointFutureHeaders(consensus externalapi.C select { case ibdBlocksMessage, ok := <-blockHeadersMessageChan: if !ok { - // Finished downloading syncer selected tip blocks, - // check if we already have the triggering relayBlockHash - relayBlockInfo, err := consensus.GetBlockInfo(relayBlockHash) - if err != nil { - return err - } - if !relayBlockInfo.Exists { - // Send a special header request for the selected tip anticone. This is expected to - // be a small set, as it is bounded to the size of virtual's mergeset. - err = flow.sendRequestAnticone(syncerHeaderSelectedTipHash, relayBlockHash) - if err != nil { - return err - } - anticoneHeadersMessage, anticoneDone, err := flow.receiveHeaders() - if err != nil { - return err - } - if anticoneDone { - return protocolerrors.Errorf(true, - "Expected one anticone header chunk for past(%s) cap anticone(%s) but got zero", - relayBlockHash, syncerHeaderSelectedTipHash) - } - _, anticoneDone, err = flow.receiveHeaders() - if err != nil { - return err - } - if !anticoneDone { - return protocolerrors.Errorf(true, - "Expected only one anticone header chunk for past(%s) cap anticone(%s)", - relayBlockHash, syncerHeaderSelectedTipHash) - } - for _, header := range anticoneHeadersMessage.BlockHeaders { - err = flow.processHeader(consensus, header) - if err != nil { - return err - } - } - } - - // If the relayBlockHash has still not been received, the peer is misbehaving - relayBlockInfo, err = consensus.GetBlockInfo(relayBlockHash) - if err != nil { - return err - } - if !relayBlockInfo.Exists { - return protocolerrors.Errorf(true, "did not receive "+ - "relayBlockHash block %s from peer %s during block download", relayBlockHash, flow.peer) - } - return nil + return flow.syncMissingRelayPast(consensus, syncerHeaderSelectedTipHash, relayBlockHash) } for _, header := range ibdBlocksMessage.BlockHeaders { err = flow.processHeader(consensus, header) @@ -383,6 +386,58 @@ func (flow *handleIBDFlow) syncPruningPointFutureHeaders(consensus externalapi.C } } +func (flow *handleIBDFlow) syncMissingRelayPast(consensus externalapi.Consensus, syncerHeaderSelectedTipHash *externalapi.DomainHash, relayBlockHash *externalapi.DomainHash) error { + // Finished downloading syncer selected tip blocks, + // check if we already have the triggering relayBlockHash + relayBlockInfo, err := consensus.GetBlockInfo(relayBlockHash) + if err != nil { + return err + } + if !relayBlockInfo.Exists { + // Send a special header request for the selected tip anticone. This is expected to + // be a small set, as it is bounded to the size of virtual's mergeset. + err = flow.sendRequestAnticone(syncerHeaderSelectedTipHash, relayBlockHash) + if err != nil { + return err + } + anticoneHeadersMessage, anticoneDone, err := flow.receiveHeaders() + if err != nil { + return err + } + if anticoneDone { + return protocolerrors.Errorf(true, + "Expected one anticone header chunk for past(%s) cap anticone(%s) but got zero", + relayBlockHash, syncerHeaderSelectedTipHash) + } + _, anticoneDone, err = flow.receiveHeaders() + if err != nil { + return err + } + if !anticoneDone { + return protocolerrors.Errorf(true, + "Expected only one anticone header chunk for past(%s) cap anticone(%s)", + relayBlockHash, syncerHeaderSelectedTipHash) + } + for _, header := range anticoneHeadersMessage.BlockHeaders { + err = flow.processHeader(consensus, header) + if err != nil { + return err + } + } + } + + // If the relayBlockHash has still not been received, the peer is misbehaving + relayBlockInfo, err = consensus.GetBlockInfo(relayBlockHash) + if err != nil { + return err + } + if !relayBlockInfo.Exists { + return protocolerrors.Errorf(true, "did not receive "+ + "relayBlockHash block %s from peer %s during block download", relayBlockHash, flow.peer) + } + return nil +} + func (flow *handleIBDFlow) sendRequestAnticone( syncerHeaderSelectedTipHash, relayBlockHash *externalapi.DomainHash) error { diff --git a/changelog.txt b/changelog.txt index 3133752c8..9a6dbb824 100644 --- a/changelog.txt +++ b/changelog.txt @@ -1,3 +1,7 @@ +Kaspad v0.11.14 - 2022-03-20 +=========================== +* Fix a bug in the new p2p v5 IBD chain negotiation (#1981) + Kaspad v0.11.13 - 2022-03-16 =========================== * Display progress of IBD process in Kaspad logs (#1938, #1939, #1949, #1977) diff --git a/version/version.go b/version/version.go index 3ee06afae..f79c176f9 100644 --- a/version/version.go +++ b/version/version.go @@ -11,7 +11,7 @@ const validCharacters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrs const ( appMajor uint = 0 appMinor uint = 11 - appPatch uint = 13 + appPatch uint = 14 ) // appBuild is defined as a variable so it can be overridden during the build