kaspad/apiserver/sync.go
stasatdaglabs adf4b4380e
[NOD-289] Implement API-Server bootstrapping and booting after downtime (#408)
* [NOD-289] Implemented database isCurrent checking and connection.

* [NOD-289] Added GetChainFromBlock to RPCClient.

* [NOD-289] Limited the amount of blocks in GetChainFromBlockResponse.

* [NOD-289] Fixed various issues that were keeping GetChainFromBlocks from working properly.

* [NOD-289] Created blockloop.go.

* [NOD-289] Updated go.mod after merge.

* [NOD-289] Implemented collection of current selected parent chain.

* [NOD-289] Fixed test. Reverted not deleting utxoDiffData from the DB.

* [NOD-289] Implemented GetBlocks.

* [NOD-289] Added comment to BlockHashesFrom.

* [NOD-289] Added GetBlocks to rpcclient.

* [NOD-289] Added verboseBlocks to GetBlocks.

* [NOD-289] Implemented block insertion.

* [NOD-289] Added AUTO_INCREMENT to tables that were missing it.

* [NOD-289] Made gasLimit in subnetwork nullable.

* [NOD-289] Renamed transactions_outputs to transaction_outputs.

* [NOD-289] Fixed weird coinbase behavior in vin.

* [NOD-289] Made collectCurrentBlocks start from the most recent startHash.

* [NOD-289] Added IsChainBlock to GetBlockVerboseResult.

* [NOD-289] Implemented adding a block from onBlockAdded.

* [NOD-289] Added removedParentChainHashes to getChainFromBlock.

* [NOD-289] Implemented updating the selected parent chain from onChainChanged.

* [NOD-289] Implemented some initial logic for updating the UTXO.

* [NOD-289] Fixed merge errors.

* [NOD-326] Fixed some more merge errors.

* [NOD-289] Added error handling for missing required records.

* [NOD-289] Implemented handling removedChainHashes.

* [NOD-289] Implemented handling addedChainBlocks.

* [NOD-289] Fixed incorrect coinbase check.

* [NOD-289] Implemented inserting the transaction output address.

* [NOD-289] Added updating block.IsChainBlock.

* [NOD-289] Split insertBlock into many small functions.

* [NOD-289] Split updateSelectedParentChain into smaller functions.

* [NOD-289] Fixed pointer errors.

* [NOD-289] Fixed a bad exists check.

* [NOD-289] Fixed a couple of small bugs.

* [NOD-289] Fixed a TxID/Hash mixup.

* [NOD-289] Added block/tx mass to getBlockVerboseResponse.

* [NOD-289] Renamed blockLoop.go to sync.go. Added comments.

* [NOD-289] Deleted apiserver README.

* [NOD-289] Fixed golint errors.

* [NOD-289] Renamed findMostRecentBlockHash to findHashOfBluestBlock.

* [NOD-289] Fixed style in syncBlocks and fixed a comment.

* [NOD-289] Copied NewErrorFromDBErrors over from NOD-324.

* [NOD-289] Created a couple of utils to make error handling with gorm slightly less painful.

* [NOD-289] Added error handling for database calls.

* [NOD-289] Fixed some more style/comments.

* [NOD-289] Fixed comments.

* [NOD-289] Renamed TransactionInput.TransactionOutput to TransactionInput.PreviousTransactionOutput.

* [NOD-289] Added a commends about pagination in getBlocks and getChainFromBlock.

* [NOD-289] Removed the coinbase field from Vin.

* [NOD-289] Deferred handling chainChangedMsgs until we have the appropriate data.

* [NOD-289] Optimized queries in updateRemovedChainHashes and updateAddedChainBlocks.

* [NOD-289] Optimized queries in insertBlockParents.

* [NOD-289] Optimized queries in insertTransactionInput.

* [NOD-289] Split Where calls to separate lines.

* [NOD-289] Fixed merge errors.

* [NOD-289] Exited early from insertBlockParents if we're the genesis block.

* [NOD-289] Improved nextChainChangedChan mechanism.

* [NOD-289] Fixed the above sync mechanism a bit.

* [NOD-289] Renamed IsDBRecordNotFoundError to HasDBRecordNotFoundError and IsDBError to HasDBError.

* [NOD-289] Replaced old error handling for db errors with the lovely new stuff.

* [NOD-289] Exited early if we already inserted a block. This saves us checking if a record already exists for some record types.

* [NOD-289] Decoupled syncBlocks from syncSelectedParentChain.

* [NOD-289] Made a comment more explicit.

* [NOD-289] Extracted net resolution to a separate function.

* [NOD-289] Extracted syncing to a separate function.

* [NOD-289] Fixed a comment.

* [NOD-289] Fixed merge erros.

* [NOD-289] Fixed a couple of bugs.

* [NOD-289] Fixed another bug.

* [NOD-289] Extracted ChainChangedMsg conversion to a separate function.

* [NOD-289] Optimized queries in canHandleChainChangedMsg.

* [NOD-289] Moved the sync function closer to its call site.

* [NOD-289] Renamed HasDBRecordNotFoundError to IsDBRecordNotFoundError.

* [NOD-289] Used count instead of first.

* [NOD-289] Renamed address to hexAddress.
2019-09-22 13:14:51 +03:00

878 lines
28 KiB
Go

package main
import (
"bytes"
"encoding/hex"
"fmt"
"github.com/daglabs/btcd/apiserver/config"
"github.com/daglabs/btcd/apiserver/database"
"github.com/daglabs/btcd/apiserver/jsonrpc"
"github.com/daglabs/btcd/apiserver/models"
"github.com/daglabs/btcd/apiserver/utils"
"github.com/daglabs/btcd/btcjson"
"github.com/daglabs/btcd/txscript"
"github.com/daglabs/btcd/util/daghash"
"github.com/daglabs/btcd/util/subnetworkid"
"github.com/jinzhu/gorm"
"strconv"
"time"
)
// startSync keeps the node and the API server in sync. On start, it downloads
// all data that's missing from the API server, and once it's done it keeps
// sync with the node via notifications.
func startSync(doneChan chan struct{}) error {
client, err := jsonrpc.GetClient()
if err != nil {
return err
}
// Mass download missing data
err = fetchInitialData(client)
if err != nil {
return err
}
// Keep the node and the API server in sync
sync(client, doneChan)
return nil
}
// fetchInitialData downloads all data that's currently missing from
// the database.
func fetchInitialData(client *jsonrpc.Client) error {
err := syncBlocks(client)
if err != nil {
return err
}
err = syncSelectedParentChain(client)
if err != nil {
return err
}
return nil
}
// sync keeps the API server in sync with the node via notifications
func sync(client *jsonrpc.Client, doneChan chan struct{}) {
// ChainChangedMsgs must be processed in order and there may be times
// when we may not be able to process them (e.g. appropriate
// BlockAddedMsgs haven't arrived yet). As such, we pop messages from
// client.OnChainChanged, make sure we're able to handle them, and
// only then push them into nextChainChangedChan for them to be
// actually handled.
blockAddedMsgHandledChan := make(chan struct{})
nextChainChangedChan := make(chan *jsonrpc.ChainChangedMsg)
spawn(func() {
for chainChanged := range client.OnChainChanged {
for {
<-blockAddedMsgHandledChan
canHandle, err := canHandleChainChangedMsg(chainChanged)
if err != nil {
panic(err)
}
if canHandle {
break
}
}
nextChainChangedChan <- chainChanged
}
})
// Handle client notifications until we're told to stop
loop:
for {
select {
case blockAdded := <-client.OnBlockAdded:
handleBlockAddedMsg(client, blockAdded)
blockAddedMsgHandledChan <- struct{}{}
case chainChanged := <-nextChainChangedChan:
handleChainChangedMsg(chainChanged)
case <-doneChan:
log.Infof("startSync stopped")
break loop
}
}
}
// syncBlocks attempts to download all DAG blocks starting with
// the bluest block, and then inserts them into the database.
func syncBlocks(client *jsonrpc.Client) error {
// Start syncing from the bluest block hash. We use blue score to
// simulate the "last" block we have because blue-block order is
// the order that the node uses in the various JSONRPC calls.
startHash, err := findHashOfBluestBlock(false)
if err != nil {
return err
}
var blocks []string
var rawBlocks []btcjson.GetBlockVerboseResult
for {
blocksResult, err := client.GetBlocks(true, false, startHash)
if err != nil {
return err
}
if len(blocksResult.Hashes) == 0 {
break
}
rawBlocksResult, err := client.GetBlocks(true, true, startHash)
if err != nil {
return err
}
startHash = &blocksResult.Hashes[len(blocksResult.Hashes)-1]
blocks = append(blocks, blocksResult.Blocks...)
rawBlocks = append(rawBlocks, rawBlocksResult.RawBlocks...)
}
return addBlocks(client, blocks, rawBlocks)
}
// syncSelectedParentChain attempts to download the selected parent
// chain starting with the bluest chain-block, and then updates the
// database accordingly.
func syncSelectedParentChain(client *jsonrpc.Client) error {
// Start syncing from the bluest chain-block hash. We use blue
// score to simulate the "last" block we have because blue-block
// order is the order that the node uses in the various JSONRPC
// calls.
startHash, err := findHashOfBluestBlock(true)
if err != nil {
return err
}
for {
chainFromBlockResult, err := client.GetChainFromBlock(false, startHash)
if err != nil {
return err
}
if len(chainFromBlockResult.AddedChainBlocks) == 0 {
break
}
startHash = &chainFromBlockResult.AddedChainBlocks[len(chainFromBlockResult.AddedChainBlocks)-1].Hash
err = updateSelectedParentChain(chainFromBlockResult.RemovedChainBlockHashes,
chainFromBlockResult.AddedChainBlocks)
if err != nil {
return err
}
}
return nil
}
// findHashOfBluestBlock finds the block with the highest
// blue score in the database. If the database is empty,
// return nil.
func findHashOfBluestBlock(mustBeChainBlock bool) (*string, error) {
dbTx, err := database.DB()
if err != nil {
return nil, err
}
var block models.Block
dbQuery := dbTx.Order("blue_score DESC")
if mustBeChainBlock {
dbQuery = dbQuery.Where(&models.Block{IsChainBlock: true})
}
dbResult := dbQuery.First(&block)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return nil, utils.NewErrorFromDBErrors("failed to find hash of bluest block: ", dbErrors)
}
if utils.IsDBRecordNotFoundError(dbErrors) {
return nil, nil
}
return &block.BlockHash, nil
}
// fetchBlock downloads the serialized block and raw block data of
// the block with hash blockHash.
func fetchBlock(client *jsonrpc.Client, blockHash *daghash.Hash) (
block string, rawBlock *btcjson.GetBlockVerboseResult, err error) {
msgBlock, err := client.GetBlock(blockHash, nil)
if err != nil {
return "", nil, err
}
writer := bytes.NewBuffer(make([]byte, 0, msgBlock.SerializeSize()))
err = msgBlock.Serialize(writer)
if err != nil {
return "", nil, err
}
block = hex.EncodeToString(writer.Bytes())
rawBlock, err = client.GetBlockVerboseTx(blockHash, nil)
if err != nil {
return "", nil, err
}
return block, rawBlock, nil
}
// addBlocks inserts data in the given blocks and rawBlocks pairwise
// into the database. See addBlock for further details.
func addBlocks(client *jsonrpc.Client, blocks []string, rawBlocks []btcjson.GetBlockVerboseResult) error {
for i, rawBlock := range rawBlocks {
block := blocks[i]
err := addBlock(client, block, rawBlock)
if err != nil {
return err
}
}
return nil
}
func doesBlockExist(dbTx *gorm.DB, blockHash string) (bool, error) {
var dbBlock models.Block
dbResult := dbTx.
Where(&models.Block{BlockHash: blockHash}).
First(&dbBlock)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return false, utils.NewErrorFromDBErrors("failed to find block: ", dbErrors)
}
return !utils.IsDBRecordNotFoundError(dbErrors), nil
}
// addBlocks inserts all the data that could be gleaned out of the serialized
// block and raw block data into the database. This includes transactions,
// subnetworks, and addresses.
// Note that if this function may take a nil dbTx, in which case it would start
// a database transaction by itself and commit it before returning.
func addBlock(client *jsonrpc.Client, block string, rawBlock btcjson.GetBlockVerboseResult) error {
db, err := database.DB()
if err != nil {
return err
}
dbTx := db.Begin()
// Skip this block if it already exists.
blockExists, err := doesBlockExist(dbTx, rawBlock.Hash)
if err != nil {
return err
}
if blockExists {
dbTx.Commit()
return nil
}
dbBlock, err := insertBlock(dbTx, rawBlock)
if err != nil {
return err
}
err = insertBlockParents(dbTx, rawBlock, dbBlock)
if err != nil {
return err
}
err = insertBlockData(dbTx, block, dbBlock)
if err != nil {
return err
}
for i, transaction := range rawBlock.RawTx {
dbSubnetwork, err := insertSubnetwork(dbTx, &transaction, client)
if err != nil {
return err
}
dbTransaction, err := insertTransaction(dbTx, &transaction, dbSubnetwork)
if err != nil {
return err
}
err = insertTransactionBlock(dbTx, dbBlock, dbTransaction, uint32(i))
if err != nil {
return err
}
err = insertTransactionInputs(dbTx, &transaction, dbTransaction)
if err != nil {
return err
}
err = insertTransactionOutputs(dbTx, &transaction, dbTransaction)
if err != nil {
return err
}
}
dbTx.Commit()
return nil
}
func insertBlock(dbTx *gorm.DB, rawBlock btcjson.GetBlockVerboseResult) (*models.Block, error) {
bits, err := strconv.ParseUint(rawBlock.Bits, 16, 32)
if err != nil {
return nil, err
}
dbBlock := models.Block{
BlockHash: rawBlock.Hash,
Version: rawBlock.Version,
HashMerkleRoot: rawBlock.HashMerkleRoot,
AcceptedIDMerkleRoot: rawBlock.AcceptedIDMerkleRoot,
UTXOCommitment: rawBlock.UTXOCommitment,
Timestamp: time.Unix(rawBlock.Time, 0),
Bits: uint32(bits),
Nonce: rawBlock.Nonce,
BlueScore: rawBlock.BlueScore,
IsChainBlock: false, // This must be false for updateSelectedParentChain to work properly
Mass: rawBlock.Mass,
}
dbResult := dbTx.Create(&dbBlock)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return nil, utils.NewErrorFromDBErrors("failed to insert block: ", dbErrors)
}
return &dbBlock, nil
}
func insertBlockParents(dbTx *gorm.DB, rawBlock btcjson.GetBlockVerboseResult, dbBlock *models.Block) error {
// Exit early if this is the genesis block
if len(rawBlock.ParentHashes) == 0 {
return nil
}
dbWhereBlockIDsIn := make([]*models.Block, len(rawBlock.ParentHashes))
for i, parentHash := range rawBlock.ParentHashes {
dbWhereBlockIDsIn[i] = &models.Block{BlockHash: parentHash}
}
var dbParents []models.Block
dbResult := dbTx.
Where(dbWhereBlockIDsIn).
First(&dbParents)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return utils.NewErrorFromDBErrors("failed to find blocks: ", dbErrors)
}
if len(dbParents) != len(rawBlock.ParentHashes) {
return fmt.Errorf("some parents are missing for block: %s", rawBlock.Hash)
}
for _, dbParent := range dbParents {
dbParentBlock := models.ParentBlock{
BlockID: dbBlock.ID,
ParentBlockID: dbParent.ID,
}
dbResult := dbTx.Create(&dbParentBlock)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return utils.NewErrorFromDBErrors("failed to insert parentBlock: ", dbErrors)
}
}
return nil
}
func insertBlockData(dbTx *gorm.DB, block string, dbBlock *models.Block) error {
blockData, err := hex.DecodeString(block)
if err != nil {
return err
}
dbRawBlock := models.RawBlock{
BlockID: dbBlock.ID,
BlockData: blockData,
}
dbResult := dbTx.Create(&dbRawBlock)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return utils.NewErrorFromDBErrors("failed to insert rawBlock: ", dbErrors)
}
return nil
}
func insertSubnetwork(dbTx *gorm.DB, transaction *btcjson.TxRawResult, client *jsonrpc.Client) (*models.Subnetwork, error) {
var dbSubnetwork models.Subnetwork
dbResult := dbTx.
Where(&models.Subnetwork{SubnetworkID: transaction.Subnetwork}).
First(&dbSubnetwork)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return nil, utils.NewErrorFromDBErrors("failed to find subnetwork: ", dbErrors)
}
if utils.IsDBRecordNotFoundError(dbErrors) {
subnetwork, err := client.GetSubnetwork(transaction.Subnetwork)
if err != nil {
return nil, err
}
dbSubnetwork = models.Subnetwork{
SubnetworkID: transaction.Subnetwork,
GasLimit: subnetwork.GasLimit,
}
dbResult := dbTx.Create(&dbSubnetwork)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return nil, utils.NewErrorFromDBErrors("failed to insert subnetwork: ", dbErrors)
}
}
return &dbSubnetwork, nil
}
func insertTransaction(dbTx *gorm.DB, transaction *btcjson.TxRawResult, dbSubnetwork *models.Subnetwork) (*models.Transaction, error) {
var dbTransaction models.Transaction
dbResult := dbTx.
Where(&models.Transaction{TransactionID: transaction.TxID}).
First(&dbTransaction)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return nil, utils.NewErrorFromDBErrors("failed to find transaction: ", dbErrors)
}
if utils.IsDBRecordNotFoundError(dbErrors) {
payload, err := hex.DecodeString(transaction.Payload)
if err != nil {
return nil, err
}
dbTransaction = models.Transaction{
TransactionHash: transaction.Hash,
TransactionID: transaction.TxID,
LockTime: transaction.LockTime,
SubnetworkID: dbSubnetwork.ID,
Gas: transaction.Gas,
Mass: transaction.Mass,
PayloadHash: transaction.PayloadHash,
Payload: payload,
}
dbResult := dbTx.Create(&dbTransaction)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return nil, utils.NewErrorFromDBErrors("failed to insert transaction: ", dbErrors)
}
}
return &dbTransaction, nil
}
func insertTransactionBlock(dbTx *gorm.DB, dbBlock *models.Block, dbTransaction *models.Transaction, index uint32) error {
var dbTransactionBlock models.TransactionBlock
dbResult := dbTx.
Where(&models.TransactionBlock{TransactionID: dbTransaction.ID, BlockID: dbBlock.ID}).
First(&dbTransactionBlock)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return utils.NewErrorFromDBErrors("failed to find transactionBlock: ", dbErrors)
}
if utils.IsDBRecordNotFoundError(dbErrors) {
dbTransactionBlock = models.TransactionBlock{
TransactionID: dbTransaction.ID,
BlockID: dbBlock.ID,
Index: index,
}
dbResult := dbTx.Create(&dbTransactionBlock)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return utils.NewErrorFromDBErrors("failed to insert transactionBlock: ", dbErrors)
}
}
return nil
}
func insertTransactionInputs(dbTx *gorm.DB, transaction *btcjson.TxRawResult, dbTransaction *models.Transaction) error {
isCoinbase, err := isTransactionCoinbase(transaction)
if err != nil {
return err
}
if !isCoinbase {
for _, input := range transaction.Vin {
err := insertTransactionInput(dbTx, dbTransaction, &input)
if err != nil {
return err
}
}
}
return nil
}
func isTransactionCoinbase(transaction *btcjson.TxRawResult) (bool, error) {
subnetwork, err := subnetworkid.NewFromStr(transaction.Subnetwork)
if err != nil {
return false, err
}
return subnetwork.IsEqual(subnetworkid.SubnetworkIDCoinbase), nil
}
func insertTransactionInput(dbTx *gorm.DB, dbTransaction *models.Transaction, input *btcjson.Vin) error {
var dbPreviousTransactionOutput models.TransactionOutput
dbResult := dbTx.
Joins("LEFT JOIN `transactions` ON `transactions`.`id` = `transaction_outputs`.`transaction_id`").
Where("`transactions`.`transactiond_id` = ? AND `transaction_outputs`.`index` = ?", input.TxID, input.Vout).
First(&dbPreviousTransactionOutput)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return utils.NewErrorFromDBErrors("failed to find previous transactionOutput: ", dbErrors)
}
if utils.IsDBRecordNotFoundError(dbErrors) {
return fmt.Errorf("missing output transaction output for txID: %s and index: %d", input.TxID, input.Vout)
}
var dbTransactionInputCount int
dbResult = dbTx.
Model(&models.TransactionInput{}).
Where(&models.TransactionInput{TransactionID: dbTransaction.ID, PreviousTransactionOutputID: dbPreviousTransactionOutput.ID}).
Count(&dbTransactionInputCount)
dbErrors = dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return utils.NewErrorFromDBErrors("failed to find transactionInput: ", dbErrors)
}
if dbTransactionInputCount == 0 {
scriptSig, err := hex.DecodeString(input.ScriptSig.Hex)
if err != nil {
return nil
}
dbTransactionInput := models.TransactionInput{
TransactionID: dbTransaction.ID,
PreviousTransactionOutputID: dbPreviousTransactionOutput.ID,
Index: input.Vout,
SignatureScript: scriptSig,
Sequence: input.Sequence,
}
dbResult := dbTx.Create(&dbTransactionInput)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return utils.NewErrorFromDBErrors("failed to insert transactionInput: ", dbErrors)
}
}
return nil
}
func insertTransactionOutputs(dbTx *gorm.DB, transaction *btcjson.TxRawResult, dbTransaction *models.Transaction) error {
for _, output := range transaction.Vout {
scriptPubKey, err := hex.DecodeString(output.ScriptPubKey.Hex)
if err != nil {
return err
}
dbAddress, err := insertAddress(dbTx, scriptPubKey)
if err != nil {
return err
}
err = insertTransactionOutput(dbTx, dbTransaction, &output, scriptPubKey, dbAddress)
if err != nil {
return err
}
}
return nil
}
func insertAddress(dbTx *gorm.DB, scriptPubKey []byte) (*models.Address, error) {
_, addr, err := txscript.ExtractScriptPubKeyAddress(scriptPubKey, config.ActiveNetParams())
if err != nil {
return nil, err
}
hexAddress := addr.EncodeAddress()
var dbAddress models.Address
dbResult := dbTx.
Where(&models.Address{Address: hexAddress}).
First(&dbAddress)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return nil, utils.NewErrorFromDBErrors("failed to find address: ", dbErrors)
}
if utils.IsDBRecordNotFoundError(dbErrors) {
dbAddress = models.Address{
Address: hexAddress,
}
dbResult := dbTx.Create(&dbAddress)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return nil, utils.NewErrorFromDBErrors("failed to insert address: ", dbErrors)
}
}
return &dbAddress, nil
}
func insertTransactionOutput(dbTx *gorm.DB, dbTransaction *models.Transaction,
output *btcjson.Vout, scriptPubKey []byte, dbAddress *models.Address) error {
var dbTransactionOutputCount int
dbResult := dbTx.
Model(&models.TransactionOutput{}).
Where(&models.TransactionOutput{TransactionID: dbTransaction.ID, Index: output.N}).
Count(&dbTransactionOutputCount)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return utils.NewErrorFromDBErrors("failed to find transactionOutput: ", dbErrors)
}
if dbTransactionOutputCount == 0 {
dbTransactionOutput := models.TransactionOutput{
TransactionID: dbTransaction.ID,
Index: output.N,
Value: output.Value,
IsSpent: false, // This must be false for updateSelectedParentChain to work properly
ScriptPubKey: scriptPubKey,
AddressID: dbAddress.ID,
}
dbResult := dbTx.Create(&dbTransactionOutput)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return utils.NewErrorFromDBErrors("failed to insert transactionOutput: ", dbErrors)
}
}
return nil
}
// updateSelectedParentChain updates the database to reflect the current selected
// parent chain. First it "unaccepts" all removedChainHashes and then it "accepts"
// all addChainBlocks.
// Note that if this function may take a nil dbTx, in which case it would start
// a database transaction by itself and commit it before returning.
func updateSelectedParentChain(removedChainHashes []string, addedChainBlocks []btcjson.ChainBlock) error {
db, err := database.DB()
if err != nil {
return err
}
dbTx := db.Begin()
for _, removedHash := range removedChainHashes {
err := updateRemovedChainHashes(dbTx, removedHash)
if err != nil {
return err
}
}
for _, addedBlock := range addedChainBlocks {
err := updateAddedChainBlocks(dbTx, &addedBlock)
if err != nil {
return err
}
}
dbTx.Commit()
return nil
}
// updateRemovedChainHashes "unaccepts" the block of the given removedHash.
// That is to say, it marks it as not in the selected parent chain in the
// following ways:
// * All its TransactionInputs.PreviousTransactionOutputs are set IsSpent = false
// * All its Transactions are set AcceptingBlockID = nil
// * The block is set IsChainBlock = false
// This function will return an error if any of the above are in an unexpected state
func updateRemovedChainHashes(dbTx *gorm.DB, removedHash string) error {
var dbBlock models.Block
dbResult := dbTx.
Where(&models.Block{BlockHash: removedHash}).
First(&dbBlock)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return utils.NewErrorFromDBErrors("failed to find block: ", dbErrors)
}
if utils.IsDBRecordNotFoundError(dbErrors) {
return fmt.Errorf("missing block for hash: %s", removedHash)
}
if !dbBlock.IsChainBlock {
return fmt.Errorf("block erroneously marked as not a chain block: %s", removedHash)
}
var dbTransactions []models.Transaction
dbResult = dbTx.
Where(&models.Transaction{AcceptingBlockID: &dbBlock.ID}).
Preload("TransactionInputs.PreviousTransactionOutput").
Find(&dbTransactions)
dbErrors = dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return utils.NewErrorFromDBErrors("failed to find transactions: ", dbErrors)
}
for _, dbTransaction := range dbTransactions {
for _, dbTransactionInput := range dbTransaction.TransactionInputs {
dbPreviousTransactionOutput := dbTransactionInput.PreviousTransactionOutput
if !dbPreviousTransactionOutput.IsSpent {
return fmt.Errorf("cannot de-spend an unspent transaction output: %s index: %d",
dbTransaction.TransactionID, dbTransactionInput.Index)
}
dbPreviousTransactionOutput.IsSpent = false
dbResult = dbTx.Save(&dbPreviousTransactionOutput)
dbErrors = dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return utils.NewErrorFromDBErrors("failed to update transactionOutput: ", dbErrors)
}
}
dbTransaction.AcceptingBlockID = nil
dbResult := dbTx.Save(&dbTransaction)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return utils.NewErrorFromDBErrors("failed to update transaction: ", dbErrors)
}
}
dbBlock.IsChainBlock = false
dbResult = dbTx.Save(&dbBlock)
dbErrors = dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return utils.NewErrorFromDBErrors("failed to update block: ", dbErrors)
}
return nil
}
// updateAddedChainBlocks "accepts" the given addedBlock. That is to say,
// it marks it as in the selected parent chain in the following ways:
// * All its TransactionInputs.PreviousTransactionOutputs are set IsSpent = true
// * All its Transactions are set AcceptingBlockID = addedBlock
// * The block is set IsChainBlock = true
// This function will return an error if any of the above are in an unexpected state
func updateAddedChainBlocks(dbTx *gorm.DB, addedBlock *btcjson.ChainBlock) error {
for _, acceptedBlock := range addedBlock.AcceptedBlocks {
var dbAccepedBlock models.Block
dbResult := dbTx.
Where(&models.Block{BlockHash: acceptedBlock.Hash}).
First(&dbAccepedBlock)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return utils.NewErrorFromDBErrors("failed to find block: ", dbErrors)
}
if utils.IsDBRecordNotFoundError(dbErrors) {
return fmt.Errorf("missing block for hash: %s", acceptedBlock.Hash)
}
if dbAccepedBlock.IsChainBlock {
return fmt.Errorf("block erroneously marked as a chain block: %s", acceptedBlock.Hash)
}
dbWhereTransactionIDsIn := make([]*models.Transaction, len(acceptedBlock.AcceptedTxIDs))
for i, acceptedTxID := range acceptedBlock.AcceptedTxIDs {
dbWhereTransactionIDsIn[i] = &models.Transaction{TransactionID: acceptedTxID}
}
var dbAcceptedTransactions []models.Transaction
dbResult = dbTx.
Where(dbWhereTransactionIDsIn).
Preload("TransactionInputs.PreviousTransactionOutput").
First(&dbAcceptedTransactions)
dbErrors = dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return utils.NewErrorFromDBErrors("failed to find transactions: ", dbErrors)
}
if len(dbAcceptedTransactions) != len(acceptedBlock.AcceptedTxIDs) {
return fmt.Errorf("some transaction are missing for block: %s", acceptedBlock.Hash)
}
for _, dbAcceptedTransaction := range dbAcceptedTransactions {
for _, dbTransactionInput := range dbAcceptedTransaction.TransactionInputs {
dbPreviousTransactionOutput := dbTransactionInput.PreviousTransactionOutput
if dbPreviousTransactionOutput.IsSpent {
return fmt.Errorf("cannot spend an already spent transaction output: %s index: %d",
dbAcceptedTransaction.TransactionID, dbTransactionInput.Index)
}
dbPreviousTransactionOutput.IsSpent = true
dbResult = dbTx.Save(&dbPreviousTransactionOutput)
dbErrors = dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return utils.NewErrorFromDBErrors("failed to update transactionOutput: ", dbErrors)
}
}
dbAcceptedTransaction.AcceptingBlockID = &dbAccepedBlock.ID
dbResult = dbTx.Save(&dbAcceptedTransaction)
dbErrors = dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return utils.NewErrorFromDBErrors("failed to update transaction: ", dbErrors)
}
}
dbAccepedBlock.IsChainBlock = true
dbResult = dbTx.Save(&dbAccepedBlock)
dbErrors = dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return utils.NewErrorFromDBErrors("failed to update block: ", dbErrors)
}
}
return nil
}
// handleBlockAddedMsg handles onBlockAdded messages
func handleBlockAddedMsg(client *jsonrpc.Client, blockAdded *jsonrpc.BlockAddedMsg) {
hash := blockAdded.Header.BlockHash()
block, rawBlock, err := fetchBlock(client, hash)
if err != nil {
log.Warnf("Could not fetch block %s: %s", hash, err)
return
}
err = addBlock(client, block, *rawBlock)
if err != nil {
log.Warnf("Could not insert block %s: %s", hash, err)
return
}
log.Infof("Added block %s", hash)
}
// canHandleChainChangedMsg checks whether we have all the necessary data
// to successfully handle a ChainChangedMsg.
func canHandleChainChangedMsg(chainChanged *jsonrpc.ChainChangedMsg) (bool, error) {
dbTx, err := database.DB()
if err != nil {
return false, err
}
// Collect all unique referenced block hashes
hashes := make(map[string]struct{})
for _, removedHash := range chainChanged.RemovedChainBlockHashes {
hashes[removedHash.String()] = struct{}{}
}
for _, addedBlock := range chainChanged.AddedChainBlocks {
hashes[addedBlock.Hash.String()] = struct{}{}
for _, acceptedBlock := range addedBlock.AcceptedBlocks {
hashes[acceptedBlock.Hash.String()] = struct{}{}
}
}
// Make sure that all the hashes exist in the database
dbWhereBlockHashesIn := make([]*models.Block, len(hashes))
i := 0
for hash := range hashes {
dbWhereBlockHashesIn[i] = &models.Block{BlockHash: hash}
i++
}
var dbBlocksCount int
dbResult := dbTx.
Where(dbWhereBlockHashesIn).
Count(&dbBlocksCount)
dbErrors := dbResult.GetErrors()
if utils.HasDBError(dbErrors) {
return false, utils.NewErrorFromDBErrors("failed to find block count: ", dbErrors)
}
if len(hashes) != dbBlocksCount {
return false, nil
}
return true, nil
}
// handleChainChangedMsg handles onChainChanged messages
func handleChainChangedMsg(chainChanged *jsonrpc.ChainChangedMsg) {
// Convert the data in chainChanged to something we can feed into
// updateSelectedParentChain
removedHashes, addedBlocks := convertChainChangedMsg(chainChanged)
err := updateSelectedParentChain(removedHashes, addedBlocks)
if err != nil {
log.Warnf("Could not update selected parent chain: %s", err)
return
}
log.Infof("Chain changed: removed &d blocks and added %d block",
len(removedHashes), len(addedBlocks))
}
func convertChainChangedMsg(chainChanged *jsonrpc.ChainChangedMsg) (
removedHashes []string, addedBlocks []btcjson.ChainBlock) {
removedHashes = make([]string, len(chainChanged.RemovedChainBlockHashes))
for i, hash := range chainChanged.RemovedChainBlockHashes {
removedHashes[i] = hash.String()
}
addedBlocks = make([]btcjson.ChainBlock, len(chainChanged.AddedChainBlocks))
for i, addedBlock := range chainChanged.AddedChainBlocks {
acceptedBlocks := make([]btcjson.AcceptedBlock, len(addedBlock.AcceptedBlocks))
for j, acceptedBlock := range addedBlock.AcceptedBlocks {
acceptedTxIDs := make([]string, len(acceptedBlock.AcceptedTxIDs))
for k, acceptedTxID := range acceptedBlock.AcceptedTxIDs {
acceptedTxIDs[k] = acceptedTxID.String()
}
acceptedBlocks[j] = btcjson.AcceptedBlock{
Hash: acceptedBlock.Hash.String(),
AcceptedTxIDs: acceptedTxIDs,
}
}
addedBlocks[i] = btcjson.ChainBlock{
Hash: addedBlock.Hash.String(),
AcceptedBlocks: acceptedBlocks,
}
}
return removedHashes, addedBlocks
}