From acb4b3f260b4729cf5e99882230a761f91c69fbf Mon Sep 17 00:00:00 2001 From: stasatdaglabs <39559713+stasatdaglabs@users.noreply.github.com> Date: Tue, 19 Nov 2019 11:22:17 +0200 Subject: [PATCH] [NOD-434] Re-request missing parents when adding a block (#476) * [NOD-434] Add the same enqueue/process mechanism as chainChangedMsgs for blockAddedMsgs. * [NOD-434] Clean up after merge. * [NOD-434] Implement mechanism for re-requesting missing parent blocks. * [NOD-434] Fixed bad error message. * [NOD-434] Split processBlockAddedMsgs. * [NOD-434] Name return values in canHandleBlockAddedMsg. * [NOD-434] Rename canHandleBlockAddedMsg to missingParentHashes and fix bad loop break. * [NOD-434] Rename the variable missingParentHashes to missingHashes. * [NOD-434] Rename a couple of variables. * [NOD-434] Rename outerloop to outerLoop. * [NOD-434] Fix typo and remove superfluous continue. * [NOD-412] Change Warnf to Errorf where appropriate. --- apiserver/sync.go | 156 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 139 insertions(+), 17 deletions(-) diff --git a/apiserver/sync.go b/apiserver/sync.go index 942fe8214..059a5e149 100644 --- a/apiserver/sync.go +++ b/apiserver/sync.go @@ -23,6 +23,25 @@ import ( "github.com/pkg/errors" ) +var ( + // pendingBlockAddedMsgs holds blockAddedMsgs in order of arrival + pendingBlockAddedMsgs []*jsonrpc.BlockAddedMsg + + // pendingChainChangedMsgs holds chainChangedMsgs in order of arrival + pendingChainChangedMsgs []*jsonrpc.ChainChangedMsg + + // missingBlocks is a map between missing block hashes and the time + // they were first found to be missing. If a block is still missing + // after blockMissingTimeout then it gets re-requested from the node. + missingBlocks = make(map[string]time.Time) +) + +const ( + // blockMissingTimeout is the amount of time after which a missing block + // gets re-requested from the node. + blockMissingTimeout = time.Second * 10 +) + // startSync keeps the node and the API server in sync. On start, it downloads // all data that's missing from the API server, and once it's done it keeps // sync with the node via notifications. @@ -65,7 +84,8 @@ loop: for { select { case blockAdded := <-client.OnBlockAdded: - handleBlockAddedMsg(client, blockAdded) + enqueueBlockAddedMsg(blockAdded) + processBlockAddedMsgs(client) case chainChanged := <-client.OnChainChanged: enqueueChainChangedMsg(chainChanged) processChainChangedMsgs() @@ -284,6 +304,12 @@ func addBlock(client *jsonrpc.Client, block string, rawBlock btcjson.GetBlockVer } dbTx.Commit() + + // If the block was previously missing, remove it from + // the missing blocks collection. + if _, ok := missingBlocks[rawBlock.Hash]; ok { + delete(missingBlocks, rawBlock.Hash) + } return nil } @@ -896,24 +922,120 @@ func updateAddedChainBlocks(dbTx *gorm.DB, addedBlock *btcjson.ChainBlock) error return nil } -// handleBlockAddedMsg handles onBlockAdded messages -func handleBlockAddedMsg(client *jsonrpc.Client, blockAdded *jsonrpc.BlockAddedMsg) { - hash := blockAdded.Header.BlockHash() - log.Debugf("Got block %s from the RPC server", hash) - block, rawBlock, err := fetchBlock(client, hash) - if err != nil { - log.Warnf("Could not fetch block %s: %s", hash, err) - return - } - err = addBlock(client, block, *rawBlock) - if err != nil { - log.Warnf("Could not insert block %s: %s", hash, err) - return - } - log.Infof("Added block %s", hash) +// enqueueBlockAddedMsg enqueues onBlockAdded messages to be handled later +func enqueueBlockAddedMsg(blockAdded *jsonrpc.BlockAddedMsg) { + pendingBlockAddedMsgs = append(pendingBlockAddedMsgs, blockAdded) } -var pendingChainChangedMsgs []*jsonrpc.ChainChangedMsg +// processBlockAddedMsgs processes all pending onBlockAdded messages. +// Messages that cannot yet be processed are re-enqueued. +func processBlockAddedMsgs(client *jsonrpc.Client) { + var unprocessedBlockAddedMsgs []*jsonrpc.BlockAddedMsg + for _, blockAdded := range pendingBlockAddedMsgs { + missingHashes, err := missingParentHashes(blockAdded) + if err != nil { + panic(errors.Errorf("Could not resolve missing parents: %s", err)) + } + for _, missingHash := range missingHashes { + err := handleMissingParent(client, missingHash) + if err != nil { + log.Errorf("Failed to handle missing parent block %s: %s", + missingHash, err) + } + } + if len(missingHashes) > 0 { + unprocessedBlockAddedMsgs = append(unprocessedBlockAddedMsgs, blockAdded) + continue + } + + hash := blockAdded.Header.BlockHash() + log.Debugf("Getting block %s from the RPC server", hash) + block, rawBlock, err := fetchBlock(client, hash) + if err != nil { + log.Warnf("Could not fetch block %s: %s", hash, err) + return + } + err = addBlock(client, block, *rawBlock) + if err != nil { + log.Errorf("Could not insert block %s: %s", hash, err) + return + } + log.Infof("Added block %s", hash) + } + pendingBlockAddedMsgs = unprocessedBlockAddedMsgs +} + +func missingParentHashes(blockAdded *jsonrpc.BlockAddedMsg) ([]string, error) { + db, err := database.DB() + if err != nil { + return nil, err + } + + // Collect all referenced parent hashes + hashesIn := make([]string, 0, len(blockAdded.Header.ParentHashes)) + for _, hash := range blockAdded.Header.ParentHashes { + hashesIn = append(hashesIn, hash.String()) + } + + // Make sure that all the parent hashes exist in the database + var dbParentBlocks []dbmodels.Block + dbResult := db. + Model(&dbmodels.Block{}). + Where("block_hash in (?)", hashesIn). + Find(&dbParentBlocks) + dbErrors := dbResult.GetErrors() + if httpserverutils.HasDBError(dbErrors) { + return nil, httpserverutils.NewErrorFromDBErrors("failed to find parent blocks: ", dbErrors) + } + if len(hashesIn) != len(dbParentBlocks) { + // Some parent hashes are missing. Collect and return them + var missingParentHashes []string + outerLoop: + for _, hash := range hashesIn { + for _, dbParentBlock := range dbParentBlocks { + if dbParentBlock.BlockHash == hash { + continue outerLoop + } + } + missingParentHashes = append(missingParentHashes, hash) + } + return missingParentHashes, nil + } + + return nil, nil +} + +// handleMissingParent handles missing parent block hashes as follows: +// a. If it's the first time we've encountered this block hash, add it to +// the missing blocks collection with time = now +// b. Otherwise, if time + blockMissingTimeout < now (that is to say, +// blockMissingTimeout had elapsed) then request the block from the +// node +func handleMissingParent(client *jsonrpc.Client, missingParentHash string) error { + firstMissingTime, ok := missingBlocks[missingParentHash] + if !ok { + log.Infof("Parent block %s is missing", missingParentHash) + missingBlocks[missingParentHash] = time.Now() + return nil + } + if firstMissingTime.Add(blockMissingTimeout).Before(time.Now()) { + hash, err := daghash.NewHashFromStr(missingParentHash) + if err != nil { + return errors.Errorf("Could not create hash: %s", err) + } + block, rawBlock, err := fetchBlock(client, hash) + if err != nil { + return errors.Errorf("Could not fetch block %s: %s", hash, err) + } + err = addBlock(client, block, *rawBlock) + if err != nil { + return errors.Errorf("Could not insert block %s: %s", hash, err) + } + log.Infof("Parent block %s was fetched after missing for over %s", + missingParentHash, blockMissingTimeout) + } + return nil +} // enqueueChainChangedMsg enqueues onChainChanged messages to be handled later func enqueueChainChangedMsg(chainChanged *jsonrpc.ChainChangedMsg) {