kaspad/blockdag/dagio.go
Svarog 378f0b659a
[NOD-993] Get rid of redundant error types + Use %+v when printing startup errors (#719)
* [NOD-993] Use %+v when printing errors

* [NOD-993] Get rid of AssertError

* [NOD-993] Made ruleError use github.com/pkg/errors

* [NOD-993] remove redundant TODO

* [NOD-993] remove redundant Comment

* [NOD-993] Removed DeploymentError
2020-05-13 17:27:53 +03:00

651 lines
18 KiB
Go

// Copyright (c) 2015-2017 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package blockdag
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"github.com/kaspanet/kaspad/dagconfig"
"github.com/kaspanet/kaspad/dbaccess"
"github.com/pkg/errors"
"github.com/kaspanet/kaspad/util"
"github.com/kaspanet/kaspad/util/binaryserializer"
"github.com/kaspanet/kaspad/util/daghash"
"github.com/kaspanet/kaspad/util/subnetworkid"
"github.com/kaspanet/kaspad/wire"
)
var (
// byteOrder is the preferred byte order used for serializing numeric
// fields for storage in the database.
byteOrder = binary.LittleEndian
)
// errNotInDAG signifies that a block hash or height that is not in the
// DAG was requested.
type errNotInDAG string
// Error implements the error interface.
func (e errNotInDAG) Error() string {
return string(e)
}
// isNotInDAGErr returns whether or not the passed error is an
// errNotInDAG error.
func isNotInDAGErr(err error) bool {
var notInDAGErr errNotInDAG
return errors.As(err, &notInDAGErr)
}
// outpointIndexByteOrder is the byte order for serializing the outpoint index.
// It uses big endian to ensure that when outpoint is used as database key, the
// keys will be iterated in an ascending order by the outpoint index.
var outpointIndexByteOrder = binary.BigEndian
func serializeOutpoint(w io.Writer, outpoint *wire.Outpoint) error {
_, err := w.Write(outpoint.TxID[:])
if err != nil {
return err
}
return binaryserializer.PutUint32(w, outpointIndexByteOrder, outpoint.Index)
}
var outpointSerializeSize = daghash.TxIDSize + 4
// deserializeOutpoint decodes an outpoint from the passed serialized byte
// slice into a new wire.Outpoint using a format that is suitable for long-
// term storage. This format is described in detail above.
func deserializeOutpoint(r io.Reader) (*wire.Outpoint, error) {
outpoint := &wire.Outpoint{}
_, err := r.Read(outpoint.TxID[:])
if err != nil {
return nil, err
}
outpoint.Index, err = binaryserializer.Uint32(r, outpointIndexByteOrder)
if err != nil {
return nil, err
}
return outpoint, nil
}
// updateUTXOSet updates the UTXO set in the database based on the provided
// UTXO diff.
func updateUTXOSet(dbContext dbaccess.Context, virtualUTXODiff *UTXODiff) error {
outpointBuff := bytes.NewBuffer(make([]byte, outpointSerializeSize))
for outpoint := range virtualUTXODiff.toRemove {
outpointBuff.Reset()
err := serializeOutpoint(outpointBuff, &outpoint)
if err != nil {
return err
}
key := outpointBuff.Bytes()
err = dbaccess.RemoveFromUTXOSet(dbContext, key)
if err != nil {
return err
}
}
// We are preallocating for P2PKH entries because they are the most common ones.
// If we have entries with a compressed script bigger than P2PKH's, the buffer will grow.
utxoEntryBuff := bytes.NewBuffer(make([]byte, p2pkhUTXOEntrySerializeSize))
for outpoint, entry := range virtualUTXODiff.toAdd {
utxoEntryBuff.Reset()
outpointBuff.Reset()
// Serialize and store the UTXO entry.
err := serializeUTXOEntry(utxoEntryBuff, entry)
if err != nil {
return err
}
serializedEntry := utxoEntryBuff.Bytes()
err = serializeOutpoint(outpointBuff, &outpoint)
if err != nil {
return err
}
key := outpointBuff.Bytes()
err = dbaccess.AddToUTXOSet(dbContext, key, serializedEntry)
if err != nil {
return err
}
}
return nil
}
type dagState struct {
TipHashes []*daghash.Hash
LastFinalityPoint *daghash.Hash
LocalSubnetworkID *subnetworkid.SubnetworkID
}
// serializeDAGState returns the serialization of the DAG state.
// This is data to be stored in the DAG state bucket.
func serializeDAGState(state *dagState) ([]byte, error) {
return json.Marshal(state)
}
// deserializeDAGState deserializes the passed serialized DAG state.
// This is data stored in the DAG state bucket and is updated after
// every block is connected to the DAG.
func deserializeDAGState(serializedData []byte) (*dagState, error) {
var state *dagState
err := json.Unmarshal(serializedData, &state)
if err != nil {
return nil, err
}
return state, nil
}
// saveDAGState uses an existing database context to store the latest
// tip hashes of the DAG.
func saveDAGState(dbContext dbaccess.Context, state *dagState) error {
serializedDAGState, err := serializeDAGState(state)
if err != nil {
return err
}
return dbaccess.StoreDAGState(dbContext, serializedDAGState)
}
// createDAGState initializes the DAG state to the
// genesis block and the node's local subnetwork id.
func (dag *BlockDAG) createDAGState(localSubnetworkID *subnetworkid.SubnetworkID) error {
return saveDAGState(dbaccess.NoTx(), &dagState{
TipHashes: []*daghash.Hash{dag.dagParams.GenesisHash},
LastFinalityPoint: dag.dagParams.GenesisHash,
LocalSubnetworkID: localSubnetworkID,
})
}
// initDAGState attempts to load and initialize the DAG state from the
// database. When the db does not yet contain any DAG state, both it and the
// DAG state are initialized to the genesis block.
func (dag *BlockDAG) initDAGState() error {
// Fetch the stored DAG state from the database. If it doesn't exist,
// it means that kaspad is running for the first time.
serializedDAGState, err := dbaccess.FetchDAGState(dbaccess.NoTx())
if dbaccess.IsNotFoundError(err) {
// Initialize the database and the DAG state to the genesis block.
return dag.createDAGState(dag.subnetworkID)
}
if err != nil {
return err
}
dagState, err := deserializeDAGState(serializedDAGState)
if err != nil {
return err
}
err = dag.validateLocalSubnetworkID(dagState)
if err != nil {
return err
}
log.Debugf("Loading block index...")
unprocessedBlockNodes, err := dag.initBlockIndex()
if err != nil {
return err
}
log.Debugf("Loading UTXO set...")
fullUTXOCollection, err := dag.initUTXOSet()
if err != nil {
return err
}
log.Debugf("Loading reachability data...")
err = dag.reachabilityStore.init(dbaccess.NoTx())
if err != nil {
return err
}
log.Debugf("Loading multiset data...")
err = dag.multisetStore.init(dbaccess.NoTx())
if err != nil {
return err
}
log.Debugf("Applying the loaded utxoCollection to the virtual block...")
dag.virtual.utxoSet, err = newFullUTXOSetFromUTXOCollection(fullUTXOCollection)
if err != nil {
return errors.Wrap(err, "Error loading UTXOSet")
}
log.Debugf("Applying the stored tips to the virtual block...")
err = dag.initVirtualBlockTips(dagState)
if err != nil {
return err
}
log.Debugf("Setting the last finality point...")
dag.lastFinalityPoint = dag.index.LookupNode(dagState.LastFinalityPoint)
dag.finalizeNodesBelowFinalityPoint(false)
log.Debugf("Processing unprocessed blockNodes...")
err = dag.processUnprocessedBlockNodes(unprocessedBlockNodes)
if err != nil {
return err
}
log.Infof("DAG state initialized.")
return nil
}
func (dag *BlockDAG) validateLocalSubnetworkID(state *dagState) error {
if !state.LocalSubnetworkID.IsEqual(dag.subnetworkID) {
return errors.Errorf("Cannot start kaspad with subnetwork ID %s because"+
" its database is already built with subnetwork ID %s. If you"+
" want to switch to a new database, please reset the"+
" database by starting kaspad with --reset-db flag", dag.subnetworkID, state.LocalSubnetworkID)
}
return nil
}
func (dag *BlockDAG) initBlockIndex() (unprocessedBlockNodes []*blockNode, err error) {
blockIndexCursor, err := dbaccess.BlockIndexCursor(dbaccess.NoTx())
if err != nil {
return nil, err
}
defer blockIndexCursor.Close()
for blockIndexCursor.Next() {
serializedDBNode, err := blockIndexCursor.Value()
if err != nil {
return nil, err
}
node, err := dag.deserializeBlockNode(serializedDBNode)
if err != nil {
return nil, err
}
// Check to see if this node had been stored in the the block DB
// but not yet accepted. If so, add it to a slice to be processed later.
if node.status == statusDataStored {
unprocessedBlockNodes = append(unprocessedBlockNodes, node)
continue
}
// If the node is known to be invalid add it as-is to the block
// index and continue.
if node.status.KnownInvalid() {
dag.index.addNode(node)
continue
}
if dag.blockCount == 0 {
if !node.hash.IsEqual(dag.dagParams.GenesisHash) {
return nil, errors.Errorf("Expected "+
"first entry in block index to be genesis block, "+
"found %s", node.hash)
}
} else {
if len(node.parents) == 0 {
return nil, errors.Errorf("block %s "+
"has no parents but it's not the genesis block", node.hash)
}
}
// Add the node to its parents children, connect it,
// and add it to the block index.
node.updateParentsChildren()
dag.index.addNode(node)
dag.blockCount++
}
return unprocessedBlockNodes, nil
}
func (dag *BlockDAG) initUTXOSet() (fullUTXOCollection utxoCollection, err error) {
fullUTXOCollection = make(utxoCollection)
cursor, err := dbaccess.UTXOSetCursor(dbaccess.NoTx())
if err != nil {
return nil, err
}
defer cursor.Close()
for cursor.Next() {
// Deserialize the outpoint
key, err := cursor.Key()
if err != nil {
return nil, err
}
outpoint, err := deserializeOutpoint(bytes.NewReader(key.Suffix()))
if err != nil {
return nil, err
}
// Deserialize the utxo entry
value, err := cursor.Value()
if err != nil {
return nil, err
}
entry, err := deserializeUTXOEntry(bytes.NewReader(value))
if err != nil {
return nil, err
}
fullUTXOCollection[*outpoint] = entry
}
return fullUTXOCollection, nil
}
func (dag *BlockDAG) initVirtualBlockTips(state *dagState) error {
tips := newBlockSet()
for _, tipHash := range state.TipHashes {
tip := dag.index.LookupNode(tipHash)
if tip == nil {
return errors.Errorf("cannot find "+
"DAG tip %s in block index", state.TipHashes)
}
tips.add(tip)
}
dag.virtual.SetTips(tips)
return nil
}
func (dag *BlockDAG) processUnprocessedBlockNodes(unprocessedBlockNodes []*blockNode) error {
for _, node := range unprocessedBlockNodes {
// Check to see if the block exists in the block DB. If it
// doesn't, the database has certainly been corrupted.
blockExists, err := dbaccess.HasBlock(dbaccess.NoTx(), node.hash)
if err != nil {
return errors.Wrapf(err, "HasBlock "+
"for block %s failed: %s", node.hash, err)
}
if !blockExists {
return errors.Errorf("block %s "+
"exists in block index but not in block db", node.hash)
}
// Attempt to accept the block.
block, err := fetchBlockByHash(dbaccess.NoTx(), node.hash)
if err != nil {
return err
}
isOrphan, isDelayed, err := dag.ProcessBlock(block, BFWasStored)
if err != nil {
log.Warnf("Block %s, which was not previously processed, "+
"failed to be accepted to the DAG: %s", node.hash, err)
continue
}
// If the block is an orphan or is delayed then it couldn't have
// possibly been written to the block index in the first place.
if isOrphan {
return errors.Errorf("Block %s, which was not "+
"previously processed, turned out to be an orphan, which is "+
"impossible.", node.hash)
}
if isDelayed {
return errors.Errorf("Block %s, which was not "+
"previously processed, turned out to be delayed, which is "+
"impossible.", node.hash)
}
}
return nil
}
// deserializeBlockNode parses a value in the block index bucket and returns a block node.
func (dag *BlockDAG) deserializeBlockNode(blockRow []byte) (*blockNode, error) {
buffer := bytes.NewReader(blockRow)
var header wire.BlockHeader
err := header.Deserialize(buffer)
if err != nil {
return nil, err
}
node := &blockNode{
hash: header.BlockHash(),
version: header.Version,
bits: header.Bits,
nonce: header.Nonce,
timestamp: header.Timestamp.Unix(),
hashMerkleRoot: header.HashMerkleRoot,
acceptedIDMerkleRoot: header.AcceptedIDMerkleRoot,
utxoCommitment: header.UTXOCommitment,
}
node.children = newBlockSet()
node.parents = newBlockSet()
for _, hash := range header.ParentHashes {
parent := dag.index.LookupNode(hash)
if parent == nil {
return nil, errors.Errorf("deserializeBlockNode: Could "+
"not find parent %s for block %s", hash, header.BlockHash())
}
node.parents.add(parent)
}
statusByte, err := buffer.ReadByte()
if err != nil {
return nil, err
}
node.status = blockStatus(statusByte)
selectedParentHash := &daghash.Hash{}
if _, err := io.ReadFull(buffer, selectedParentHash[:]); err != nil {
return nil, err
}
// Because genesis doesn't have selected parent, it's serialized as zero hash
if !selectedParentHash.IsEqual(&daghash.ZeroHash) {
node.selectedParent = dag.index.LookupNode(selectedParentHash)
}
node.blueScore, err = binaryserializer.Uint64(buffer, byteOrder)
if err != nil {
return nil, err
}
bluesCount, err := wire.ReadVarInt(buffer)
if err != nil {
return nil, err
}
node.blues = make([]*blockNode, bluesCount)
for i := uint64(0); i < bluesCount; i++ {
hash := &daghash.Hash{}
if _, err := io.ReadFull(buffer, hash[:]); err != nil {
return nil, err
}
node.blues[i] = dag.index.LookupNode(hash)
}
bluesAnticoneSizesLen, err := wire.ReadVarInt(buffer)
if err != nil {
return nil, err
}
node.bluesAnticoneSizes = make(map[*blockNode]dagconfig.KType)
for i := uint64(0); i < bluesAnticoneSizesLen; i++ {
hash := &daghash.Hash{}
if _, err := io.ReadFull(buffer, hash[:]); err != nil {
return nil, err
}
bluesAnticoneSize, err := binaryserializer.Uint8(buffer)
if err != nil {
return nil, err
}
blue := dag.index.LookupNode(hash)
if blue == nil {
return nil, errors.Errorf("couldn't find block with hash %s", hash)
}
node.bluesAnticoneSizes[blue] = dagconfig.KType(bluesAnticoneSize)
}
return node, nil
}
// fetchBlockByHash retrieves the raw block for the provided hash,
// deserializes it, and returns a util.Block of it.
func fetchBlockByHash(dbContext dbaccess.Context, hash *daghash.Hash) (*util.Block, error) {
blockBytes, err := dbaccess.FetchBlock(dbContext, hash)
if err != nil {
return nil, err
}
return util.NewBlockFromBytes(blockBytes)
}
func storeBlock(dbContext *dbaccess.TxContext, block *util.Block) error {
blockBytes, err := block.Bytes()
if err != nil {
return err
}
return dbaccess.StoreBlock(dbContext, block.Hash(), blockBytes)
}
func serializeBlockNode(node *blockNode) ([]byte, error) {
w := bytes.NewBuffer(make([]byte, 0, wire.MaxBlockHeaderPayload+1))
header := node.Header()
err := header.Serialize(w)
if err != nil {
return nil, err
}
err = w.WriteByte(byte(node.status))
if err != nil {
return nil, err
}
// Because genesis doesn't have selected parent, it's serialized as zero hash
selectedParentHash := &daghash.ZeroHash
if node.selectedParent != nil {
selectedParentHash = node.selectedParent.hash
}
_, err = w.Write(selectedParentHash[:])
if err != nil {
return nil, err
}
err = binaryserializer.PutUint64(w, byteOrder, node.blueScore)
if err != nil {
return nil, err
}
err = wire.WriteVarInt(w, uint64(len(node.blues)))
if err != nil {
return nil, err
}
for _, blue := range node.blues {
_, err = w.Write(blue.hash[:])
if err != nil {
return nil, err
}
}
err = wire.WriteVarInt(w, uint64(len(node.bluesAnticoneSizes)))
if err != nil {
return nil, err
}
for blue, blueAnticoneSize := range node.bluesAnticoneSizes {
_, err = w.Write(blue.hash[:])
if err != nil {
return nil, err
}
err = binaryserializer.PutUint8(w, uint8(blueAnticoneSize))
if err != nil {
return nil, err
}
}
return w.Bytes(), nil
}
// blockIndexKey generates the binary key for an entry in the block index
// bucket. The key is composed of the block blue score encoded as a big-endian
// 64-bit unsigned int followed by the 32 byte block hash.
// The blue score component is important for iteration order.
func blockIndexKey(blockHash *daghash.Hash, blueScore uint64) []byte {
indexKey := make([]byte, daghash.HashSize+8)
binary.BigEndian.PutUint64(indexKey[0:8], blueScore)
copy(indexKey[8:daghash.HashSize+8], blockHash[:])
return indexKey
}
func blockHashFromBlockIndexKey(BlockIndexKey []byte) (*daghash.Hash, error) {
return daghash.NewHash(BlockIndexKey[8 : daghash.HashSize+8])
}
// BlockByHash returns the block from the DAG with the given hash.
//
// This function is safe for concurrent access.
func (dag *BlockDAG) BlockByHash(hash *daghash.Hash) (*util.Block, error) {
// Lookup the block hash in block index and ensure it is in the DAG
node := dag.index.LookupNode(hash)
if node == nil {
str := fmt.Sprintf("block %s is not in the DAG", hash)
return nil, errNotInDAG(str)
}
block, err := fetchBlockByHash(dbaccess.NoTx(), node.hash)
if err != nil {
return nil, err
}
return block, err
}
// BlockHashesFrom returns a slice of blocks starting from lowHash
// ordered by blueScore. If lowHash is nil then the genesis block is used.
//
// This method MUST be called with the DAG lock held
func (dag *BlockDAG) BlockHashesFrom(lowHash *daghash.Hash, limit int) ([]*daghash.Hash, error) {
blockHashes := make([]*daghash.Hash, 0, limit)
if lowHash == nil {
lowHash = dag.genesis.hash
// If we're starting from the beginning we should include the
// genesis hash in the result
blockHashes = append(blockHashes, dag.genesis.hash)
}
if !dag.IsInDAG(lowHash) {
return nil, errors.Errorf("block %s not found", lowHash)
}
blueScore, err := dag.BlueScoreByBlockHash(lowHash)
if err != nil {
return nil, err
}
key := blockIndexKey(lowHash, blueScore)
cursor, err := dbaccess.BlockIndexCursorFrom(dbaccess.NoTx(), key)
if dbaccess.IsNotFoundError(err) {
return nil, errors.Wrapf(err, "block %s not in block index", lowHash)
}
if err != nil {
return nil, err
}
defer cursor.Close()
for cursor.Next() && len(blockHashes) < limit {
key, err := cursor.Key()
if err != nil {
return nil, err
}
blockHash, err := blockHashFromBlockIndexKey(key.Suffix())
if err != nil {
return nil, err
}
blockHashes = append(blockHashes, blockHash)
}
return blockHashes, nil
}