diff --git a/apiserver/jsonrpc/log.go b/apiserver/jsonrpc/log.go new file mode 100644 index 000000000..f6a6267a2 --- /dev/null +++ b/apiserver/jsonrpc/log.go @@ -0,0 +1,16 @@ +package jsonrpc + +import ( + "github.com/daglabs/btcd/apiserver/logger" + "github.com/daglabs/btcd/rpcclient" + "github.com/daglabs/btcd/util/panics" +) + +var ( + log = logger.BackendLog.Logger("RPCC") + spawn = panics.GoroutineWrapperFunc(log, logger.BackendLog) +) + +func init() { + rpcclient.UseLogger(log, logger.BackendLog) +} diff --git a/apiserver/sync.go b/apiserver/sync.go index 01ee78a1e..ed5734a6c 100644 --- a/apiserver/sync.go +++ b/apiserver/sync.go @@ -24,24 +24,8 @@ import ( "github.com/pkg/errors" ) -var ( - // pendingBlockAddedMsgs holds blockAddedMsgs in order of arrival - pendingBlockAddedMsgs []*jsonrpc.BlockAddedMsg - - // pendingChainChangedMsgs holds chainChangedMsgs in order of arrival - pendingChainChangedMsgs []*jsonrpc.ChainChangedMsg - - // missingBlocks is a map between missing block hashes and the time - // they were first found to be missing. If a block is still missing - // after blockMissingTimeout then it gets re-requested from the node. - missingBlocks = make(map[string]time.Time) -) - -const ( - // blockMissingTimeout is the amount of time after which a missing block - // gets re-requested from the node. - blockMissingTimeout = time.Second * 10 -) +// pendingChainChangedMsgs holds chainChangedMsgs in order of arrival +var pendingChainChangedMsgs []*jsonrpc.ChainChangedMsg // startSync keeps the node and the API server in sync. On start, it downloads // all data that's missing from the API server, and once it's done it keeps @@ -85,8 +69,7 @@ func sync(client *jsonrpc.Client, doneChan chan struct{}) error { for { select { case blockAdded := <-client.OnBlockAdded: - enqueueBlockAddedMsg(blockAdded) - err := processBlockAddedMsgs(client) + err := handleBlockAddedMsg(client, blockAdded) if err != nil { return err } @@ -201,23 +184,27 @@ func findHashOfBluestBlock(mustBeChainBlock bool) (*string, error) { // fetchBlock downloads the serialized block and raw block data of // the block with hash blockHash. func fetchBlock(client *jsonrpc.Client, blockHash *daghash.Hash) ( - rawBlock string, verboseBlock *btcjson.GetBlockVerboseResult, err error) { + *rawAndVerboseBlock, error) { + log.Debugf("Getting block %s from the RPC server", blockHash) msgBlock, err := client.GetBlock(blockHash, nil) if err != nil { - return "", nil, err + return nil, err } writer := bytes.NewBuffer(make([]byte, 0, msgBlock.SerializeSize())) err = msgBlock.Serialize(writer) if err != nil { - return "", nil, err + return nil, err } - rawBlock = hex.EncodeToString(writer.Bytes()) + rawBlock := hex.EncodeToString(writer.Bytes()) - verboseBlock, err = client.GetBlockVerboseTx(blockHash, nil) + verboseBlock, err := client.GetBlockVerboseTx(blockHash, nil) if err != nil { - return "", nil, err + return nil, err } - return rawBlock, verboseBlock, nil + return &rawAndVerboseBlock{ + rawBlock: rawBlock, + verboseBlock: verboseBlock, + }, nil } // addBlocks inserts data in the given rawBlocks and verboseBlocks pairwise @@ -319,12 +306,6 @@ func addBlock(client *jsonrpc.Client, rawBlock string, verboseBlock btcjson.GetB } dbTx.Commit() - - // If the block was previously missing, remove it from - // the missing blocks collection. - if _, ok := missingBlocks[verboseBlock.Hash]; ok { - delete(missingBlocks, verboseBlock.Hash) - } return nil } @@ -937,126 +918,106 @@ func updateAddedChainBlocks(dbTx *gorm.DB, addedBlock *btcjson.ChainBlock) error return nil } -// enqueueBlockAddedMsg enqueues onBlockAdded messages to be handled later -func enqueueBlockAddedMsg(blockAdded *jsonrpc.BlockAddedMsg) { - pendingBlockAddedMsgs = append(pendingBlockAddedMsgs, blockAdded) +type rawAndVerboseBlock struct { + rawBlock string + verboseBlock *btcjson.GetBlockVerboseResult } -// processBlockAddedMsgs processes all pending onBlockAdded messages. -// Messages that cannot yet be processed are re-enqueued. -func processBlockAddedMsgs(client *jsonrpc.Client) error { - var unprocessedBlockAddedMsgs []*jsonrpc.BlockAddedMsg - for _, blockAdded := range pendingBlockAddedMsgs { - missingHashes, err := missingParentHashes(blockAdded) - if err != nil { - return errors.Errorf("Could not resolve missing parents: %s", err) - } - for _, missingHash := range missingHashes { - err := handleMissingParent(client, missingHash) - if err != nil { - log.Errorf("Failed to handle missing parent block %s: %s", - missingHash, err) - } - } - if len(missingHashes) > 0 { - unprocessedBlockAddedMsgs = append(unprocessedBlockAddedMsgs, blockAdded) - continue - } +func (r *rawAndVerboseBlock) String() string { + return r.verboseBlock.Hash +} - handleBlockAddedMsg(client, blockAdded) +func handleBlockAddedMsg(client *jsonrpc.Client, blockAdded *jsonrpc.BlockAddedMsg) error { + blocks, err := fetchBlockAndMissingAncestors(client, blockAdded.Header.BlockHash()) + if err != nil { + return err + } + for _, block := range blocks { + err = addBlock(client, block.rawBlock, *block.verboseBlock) + if err != nil { + return err + } + log.Infof("Added block %s", block.verboseBlock.Hash) } - pendingBlockAddedMsgs = unprocessedBlockAddedMsgs return nil } -func handleBlockAddedMsg(client *jsonrpc.Client, blockAdded *jsonrpc.BlockAddedMsg) { - hash := blockAdded.Header.BlockHash() - log.Debugf("Getting block %s from the RPC server", hash) - rawBlock, verboseBlock, err := fetchBlock(client, hash) - if err != nil { - log.Warnf("Could not fetch block %s: %s", hash, err) - return - } - err = addBlock(client, rawBlock, *verboseBlock) - if err != nil { - log.Errorf("Could not insert block %s: %s", hash, err) - return - } - log.Infof("Added block %s", hash) -} - -func missingParentHashes(blockAdded *jsonrpc.BlockAddedMsg) ([]string, error) { - db, err := database.DB() +func fetchBlockAndMissingAncestors(client *jsonrpc.Client, blockHash *daghash.Hash) ([]*rawAndVerboseBlock, error) { + block, err := fetchBlock(client, blockHash) if err != nil { return nil, err } + pendingBlocks := []*rawAndVerboseBlock{block} + blocksToAdd := make([]*rawAndVerboseBlock, 0) + blocksToAddSet := make(map[string]struct{}) + for len(pendingBlocks) > 0 { + var currentBlock *rawAndVerboseBlock + currentBlock, pendingBlocks = pendingBlocks[0], pendingBlocks[1:] + missingHashes, err := missingParentHashes(currentBlock.verboseBlock.ParentHashes) + if err != nil { + return nil, err + } + blocksToPrependToPending := make([]*rawAndVerboseBlock, 0, len(missingHashes)) + for _, missingHash := range missingHashes { + if _, ok := blocksToAddSet[missingHash]; ok { + continue + } + hash, err := daghash.NewHashFromStr(missingHash) + if err != nil { + return nil, err + } + block, err := fetchBlock(client, hash) + if err != nil { + return nil, err + } + blocksToPrependToPending = append(blocksToPrependToPending, block) + } + if len(blocksToPrependToPending) == 0 { + blocksToAddSet[currentBlock.verboseBlock.Hash] = struct{}{} + blocksToAdd = append(blocksToAdd, currentBlock) + continue + } + log.Debugf("Found %s missing parents for block %s and fetched them", blocksToPrependToPending, currentBlock) + blocksToPrependToPending = append(blocksToPrependToPending, currentBlock) + pendingBlocks = append(blocksToPrependToPending, pendingBlocks...) + } + return blocksToAdd, nil +} - // Collect all referenced parent hashes - hashesIn := make([]string, 0, len(blockAdded.Header.ParentHashes)) - for _, hash := range blockAdded.Header.ParentHashes { - hashesIn = append(hashesIn, hash.String()) +func missingParentHashes(parentHashes []string) ([]string, error) { + db, err := database.DB() + if err != nil { + return nil, err } // Make sure that all the parent hashes exist in the database var dbParentBlocks []dbmodels.Block dbResult := db. Model(&dbmodels.Block{}). - Where("block_hash in (?)", hashesIn). + Where("block_hash in (?)", parentHashes). Find(&dbParentBlocks) dbErrors := dbResult.GetErrors() if httpserverutils.HasDBError(dbErrors) { return nil, httpserverutils.NewErrorFromDBErrors("failed to find parent blocks: ", dbErrors) } - if len(hashesIn) != len(dbParentBlocks) { + if len(parentHashes) != len(dbParentBlocks) { // Some parent hashes are missing. Collect and return them - var missingParentHashes []string + var missingHashes []string outerLoop: - for _, hash := range hashesIn { + for _, hash := range parentHashes { for _, dbParentBlock := range dbParentBlocks { if dbParentBlock.BlockHash == hash { continue outerLoop } } - missingParentHashes = append(missingParentHashes, hash) + missingHashes = append(missingHashes, hash) } - return missingParentHashes, nil + return missingHashes, nil } return nil, nil } -// handleMissingParent handles missing parent block hashes as follows: -// a. If it's the first time we've encountered this block hash, add it to -// the missing blocks collection with time = now -// b. Otherwise, if time + blockMissingTimeout < now (that is to say, -// blockMissingTimeout had elapsed) then request the block from the -// node -func handleMissingParent(client *jsonrpc.Client, missingParentHash string) error { - firstMissingTime, ok := missingBlocks[missingParentHash] - if !ok { - log.Infof("Parent block %s is missing", missingParentHash) - missingBlocks[missingParentHash] = time.Now() - return nil - } - if firstMissingTime.Add(blockMissingTimeout).Before(time.Now()) { - hash, err := daghash.NewHashFromStr(missingParentHash) - if err != nil { - return errors.Errorf("Could not create hash: %s", err) - } - rawBlock, verboseBlock, err := fetchBlock(client, hash) - if err != nil { - return errors.Errorf("Could not fetch block %s: %s", hash, err) - } - err = addBlock(client, rawBlock, *verboseBlock) - if err != nil { - return errors.Errorf("Could not insert block %s: %s", hash, err) - } - log.Infof("Parent block %s was fetched after missing for over %s", - missingParentHash, blockMissingTimeout) - } - return nil -} - // enqueueChainChangedMsg enqueues onChainChanged messages to be handled later func enqueueChainChangedMsg(chainChanged *jsonrpc.ChainChangedMsg) { pendingChainChangedMsgs = append(pendingChainChangedMsgs, chainChanged) diff --git a/testutil/testutil.go b/testutil/testutil.go index b2d25f83f..7c9dd3d2b 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -1,9 +1,5 @@ package testutil -import ( - "fmt" -) - // AreErrorsEqual returns whether errors have the same type // and same error string from .Error(). func AreErrorsEqual(err1, err2 error) bool { @@ -13,8 +9,5 @@ func AreErrorsEqual(err1, err2 error) bool { if err1 == nil && err2 != nil { return false } - if fmt.Sprintf("%T", err1) != fmt.Sprintf("%T", err2) { - return false - } return err1.Error() == err2.Error() } diff --git a/util/daghash/hash.go b/util/daghash/hash.go index 85e06ed6e..7b57775d0 100644 --- a/util/daghash/hash.go +++ b/util/daghash/hash.go @@ -191,7 +191,7 @@ func Decode(dst *Hash, src string) error { var reversedHash Hash _, err := hex.Decode(reversedHash[HashSize-hex.DecodedLen(len(srcBytes)):], srcBytes) if err != nil { - return err + return errors.WithStack(err) } // Reverse copy from the temporary hash to destination. Because the diff --git a/util/daghash/hash_test.go b/util/daghash/hash_test.go index e69b9a88f..8175ff4e0 100644 --- a/util/daghash/hash_test.go +++ b/util/daghash/hash_test.go @@ -7,6 +7,7 @@ package daghash import ( "bytes" "encoding/hex" + "github.com/daglabs/btcd/testutil" "math/big" "reflect" "testing" @@ -214,7 +215,7 @@ func TestNewHashFromStr(t *testing.T) { t.Logf("Running %d tests", len(tests)) for i, test := range tests { result, err := NewHashFromStr(test.in) - if err != test.err { + if !testutil.AreErrorsEqual(err, test.err) { t.Errorf(unexpectedErrStr, i, err, test.err) continue } else if err != nil {