[NOD-434] Re-request missing parents when adding a block (#476)

* [NOD-434] Add the same enqueue/process mechanism as chainChangedMsgs for blockAddedMsgs.

* [NOD-434] Clean up after merge.

* [NOD-434] Implement mechanism for re-requesting missing parent blocks.

* [NOD-434] Fixed bad error message.

* [NOD-434] Split processBlockAddedMsgs.

* [NOD-434] Name return values in canHandleBlockAddedMsg.

* [NOD-434] Rename canHandleBlockAddedMsg to missingParentHashes and fix bad loop break.

* [NOD-434] Rename the variable missingParentHashes to missingHashes.

* [NOD-434] Rename a couple of variables.

* [NOD-434] Rename outerloop to outerLoop.

* [NOD-434] Fix typo and remove superfluous continue.

* [NOD-412] Change Warnf to Errorf where appropriate.
This commit is contained in:
stasatdaglabs 2019-11-19 11:22:17 +02:00 committed by Svarog
parent e0221aa8ab
commit acb4b3f260

View File

@ -23,6 +23,25 @@ import (
"github.com/pkg/errors"
)
var (
// pendingBlockAddedMsgs holds blockAddedMsgs in order of arrival
pendingBlockAddedMsgs []*jsonrpc.BlockAddedMsg
// pendingChainChangedMsgs holds chainChangedMsgs in order of arrival
pendingChainChangedMsgs []*jsonrpc.ChainChangedMsg
// missingBlocks is a map between missing block hashes and the time
// they were first found to be missing. If a block is still missing
// after blockMissingTimeout then it gets re-requested from the node.
missingBlocks = make(map[string]time.Time)
)
const (
// blockMissingTimeout is the amount of time after which a missing block
// gets re-requested from the node.
blockMissingTimeout = time.Second * 10
)
// startSync keeps the node and the API server in sync. On start, it downloads
// all data that's missing from the API server, and once it's done it keeps
// sync with the node via notifications.
@ -65,7 +84,8 @@ loop:
for {
select {
case blockAdded := <-client.OnBlockAdded:
handleBlockAddedMsg(client, blockAdded)
enqueueBlockAddedMsg(blockAdded)
processBlockAddedMsgs(client)
case chainChanged := <-client.OnChainChanged:
enqueueChainChangedMsg(chainChanged)
processChainChangedMsgs()
@ -284,6 +304,12 @@ func addBlock(client *jsonrpc.Client, block string, rawBlock btcjson.GetBlockVer
}
dbTx.Commit()
// If the block was previously missing, remove it from
// the missing blocks collection.
if _, ok := missingBlocks[rawBlock.Hash]; ok {
delete(missingBlocks, rawBlock.Hash)
}
return nil
}
@ -896,24 +922,120 @@ func updateAddedChainBlocks(dbTx *gorm.DB, addedBlock *btcjson.ChainBlock) error
return nil
}
// handleBlockAddedMsg handles onBlockAdded messages
func handleBlockAddedMsg(client *jsonrpc.Client, blockAdded *jsonrpc.BlockAddedMsg) {
hash := blockAdded.Header.BlockHash()
log.Debugf("Got block %s from the RPC server", hash)
block, rawBlock, err := fetchBlock(client, hash)
if err != nil {
log.Warnf("Could not fetch block %s: %s", hash, err)
return
}
err = addBlock(client, block, *rawBlock)
if err != nil {
log.Warnf("Could not insert block %s: %s", hash, err)
return
}
log.Infof("Added block %s", hash)
// enqueueBlockAddedMsg enqueues onBlockAdded messages to be handled later
func enqueueBlockAddedMsg(blockAdded *jsonrpc.BlockAddedMsg) {
pendingBlockAddedMsgs = append(pendingBlockAddedMsgs, blockAdded)
}
var pendingChainChangedMsgs []*jsonrpc.ChainChangedMsg
// processBlockAddedMsgs processes all pending onBlockAdded messages.
// Messages that cannot yet be processed are re-enqueued.
func processBlockAddedMsgs(client *jsonrpc.Client) {
var unprocessedBlockAddedMsgs []*jsonrpc.BlockAddedMsg
for _, blockAdded := range pendingBlockAddedMsgs {
missingHashes, err := missingParentHashes(blockAdded)
if err != nil {
panic(errors.Errorf("Could not resolve missing parents: %s", err))
}
for _, missingHash := range missingHashes {
err := handleMissingParent(client, missingHash)
if err != nil {
log.Errorf("Failed to handle missing parent block %s: %s",
missingHash, err)
}
}
if len(missingHashes) > 0 {
unprocessedBlockAddedMsgs = append(unprocessedBlockAddedMsgs, blockAdded)
continue
}
hash := blockAdded.Header.BlockHash()
log.Debugf("Getting block %s from the RPC server", hash)
block, rawBlock, err := fetchBlock(client, hash)
if err != nil {
log.Warnf("Could not fetch block %s: %s", hash, err)
return
}
err = addBlock(client, block, *rawBlock)
if err != nil {
log.Errorf("Could not insert block %s: %s", hash, err)
return
}
log.Infof("Added block %s", hash)
}
pendingBlockAddedMsgs = unprocessedBlockAddedMsgs
}
func missingParentHashes(blockAdded *jsonrpc.BlockAddedMsg) ([]string, error) {
db, err := database.DB()
if err != nil {
return nil, err
}
// Collect all referenced parent hashes
hashesIn := make([]string, 0, len(blockAdded.Header.ParentHashes))
for _, hash := range blockAdded.Header.ParentHashes {
hashesIn = append(hashesIn, hash.String())
}
// Make sure that all the parent hashes exist in the database
var dbParentBlocks []dbmodels.Block
dbResult := db.
Model(&dbmodels.Block{}).
Where("block_hash in (?)", hashesIn).
Find(&dbParentBlocks)
dbErrors := dbResult.GetErrors()
if httpserverutils.HasDBError(dbErrors) {
return nil, httpserverutils.NewErrorFromDBErrors("failed to find parent blocks: ", dbErrors)
}
if len(hashesIn) != len(dbParentBlocks) {
// Some parent hashes are missing. Collect and return them
var missingParentHashes []string
outerLoop:
for _, hash := range hashesIn {
for _, dbParentBlock := range dbParentBlocks {
if dbParentBlock.BlockHash == hash {
continue outerLoop
}
}
missingParentHashes = append(missingParentHashes, hash)
}
return missingParentHashes, nil
}
return nil, nil
}
// handleMissingParent handles missing parent block hashes as follows:
// a. If it's the first time we've encountered this block hash, add it to
// the missing blocks collection with time = now
// b. Otherwise, if time + blockMissingTimeout < now (that is to say,
// blockMissingTimeout had elapsed) then request the block from the
// node
func handleMissingParent(client *jsonrpc.Client, missingParentHash string) error {
firstMissingTime, ok := missingBlocks[missingParentHash]
if !ok {
log.Infof("Parent block %s is missing", missingParentHash)
missingBlocks[missingParentHash] = time.Now()
return nil
}
if firstMissingTime.Add(blockMissingTimeout).Before(time.Now()) {
hash, err := daghash.NewHashFromStr(missingParentHash)
if err != nil {
return errors.Errorf("Could not create hash: %s", err)
}
block, rawBlock, err := fetchBlock(client, hash)
if err != nil {
return errors.Errorf("Could not fetch block %s: %s", hash, err)
}
err = addBlock(client, block, *rawBlock)
if err != nil {
return errors.Errorf("Could not insert block %s: %s", hash, err)
}
log.Infof("Parent block %s was fetched after missing for over %s",
missingParentHash, blockMissingTimeout)
}
return nil
}
// enqueueChainChangedMsg enqueues onChainChanged messages to be handled later
func enqueueChainChangedMsg(chainChanged *jsonrpc.ChainChangedMsg) {