[NOD-315] Implement acceptance index (#413)

* [NOD-315] Created acceptanceindex.go including boilerplate.

* [NOD-315] Disallowed calls to notifyChainChanges and getChainFromBlock if the acceptance index is not on.

* [NOD-315] Implemented the acceptance index.

* [NOD-315] Fixed serialization/deserialization. Added test.

* [NOD-315] Fixed/added comments.

* [NOD-315] Fixed copy/paste errors.

* [NOD-315] Added an empty line for readability.
This commit is contained in:
stasatdaglabs 2019-09-19 10:38:33 +03:00 committed by Ori Newman
parent 850876e6a7
commit 1064b5009d
9 changed files with 307 additions and 38 deletions

View File

@ -1454,23 +1454,6 @@ func (dag *BlockDAG) SelectedParentChain(startHash *daghash.Hash) ([]*daghash.Ha
return hashes, nil
}
// BluesTxsAcceptanceData returns the acceptance data of all the transactions that
// were accepted by the block with hash blockHash.
func (dag *BlockDAG) BluesTxsAcceptanceData(blockHash *daghash.Hash) (MultiBlockTxsAcceptanceData, error) {
node := dag.index.LookupNode(blockHash)
if node == nil {
err := fmt.Errorf("block %s is not known", blockHash)
return nil, err
}
_, bluesTxsAcceptanceData, err := dag.pastUTXO(node)
if err != nil {
return nil, err
}
return bluesTxsAcceptanceData, nil
}
// ChainHeight return the chain-height of the selected tip. In other words - it returns
// the length of the dag's selected-parent chain
func (dag *BlockDAG) ChainHeight() uint64 {

View File

@ -0,0 +1,188 @@
package indexers
import (
"bytes"
"encoding/gob"
"fmt"
"github.com/daglabs/btcd/blockdag"
"github.com/daglabs/btcd/dagconfig"
"github.com/daglabs/btcd/database"
"github.com/daglabs/btcd/util"
"github.com/daglabs/btcd/util/daghash"
"github.com/daglabs/btcd/wire"
)
const (
// acceptanceIndexName is the human-readable name for the index.
acceptanceIndexName = "acceptance index"
)
var (
// acceptanceIndexKey is the key of the acceptance index and the db bucket used
// to house it.
acceptanceIndexKey = []byte("acceptanceidx")
)
// AcceptanceIndex implements a txAcceptanceData by block hash index. That is to say,
// it stores a mapping between a block's hash and the set of transactions that the
// block accepts among its blue blocks.
type AcceptanceIndex struct {
db database.DB
}
// NewAcceptanceIndex returns a new instance of an indexer that is used to create a
// mapping between block hashes and their txAcceptanceData.
//
// It implements the Indexer interface which plugs into the IndexManager that in
// turn is used by the blockdag package. This allows the index to be
// seamlessly maintained along with the DAG.
func NewAcceptanceIndex(_ *dagconfig.Params) *AcceptanceIndex {
return &AcceptanceIndex{}
}
// DropAcceptanceIndex drops the acceptance index from the provided database if it
// exists.
func DropAcceptanceIndex(db database.DB, interrupt <-chan struct{}) error {
return dropIndex(db, acceptanceIndexKey, acceptanceIndexName, interrupt)
}
// Key returns the database key to use for the index as a byte slice.
//
// This is part of the Indexer interface.
func (idx *AcceptanceIndex) Key() []byte {
return acceptanceIndexKey
}
// Name returns the human-readable name of the index.
//
// This is part of the Indexer interface.
func (idx *AcceptanceIndex) Name() string {
return acceptanceIndexName
}
// Create is invoked when the indexer manager determines the index needs
// to be created for the first time. It creates the bucket for the
// acceptance index.
//
// This is part of the Indexer interface.
func (idx *AcceptanceIndex) Create(dbTx database.Tx) error {
_, err := dbTx.Metadata().CreateBucket(acceptanceIndexKey)
return err
}
// Init initializes the hash-based acceptance index.
//
// This is part of the Indexer interface.
func (idx *AcceptanceIndex) Init(db database.DB, _ *blockdag.BlockDAG) error {
idx.db = db
return nil
}
// ConnectBlock is invoked by the index manager when a new block has been
// connected to the DAG.
//
// This is part of the Indexer interface.
func (idx *AcceptanceIndex) ConnectBlock(dbTx database.Tx, block *util.Block, _ *blockdag.BlockDAG,
txsAcceptanceData blockdag.MultiBlockTxsAcceptanceData, _ blockdag.MultiBlockTxsAcceptanceData) error {
return idx.dbPutTxsAcceptanceData(dbTx, block.Hash(), txsAcceptanceData)
}
// TxsAcceptanceData returns the acceptance data of all the transactions that
// were accepted by the block with hash blockHash.
func (idx *AcceptanceIndex) TxsAcceptanceData(blockHash *daghash.Hash) (blockdag.MultiBlockTxsAcceptanceData, error) {
var txsAcceptanceData blockdag.MultiBlockTxsAcceptanceData
err := idx.db.View(func(dbTx database.Tx) error {
var err error
txsAcceptanceData, err = idx.dbFetchTxsAcceptanceData(dbTx, blockHash)
return err
})
if err != nil {
return nil, err
}
return txsAcceptanceData, nil
}
func (idx *AcceptanceIndex) dbPutTxsAcceptanceData(dbTx database.Tx, hash *daghash.Hash,
txsAcceptanceData blockdag.MultiBlockTxsAcceptanceData) error {
serializedTxsAcceptanceData, err := serializeMultiBlockTxsAcceptanceData(txsAcceptanceData)
if err != nil {
return err
}
bucket := dbTx.Metadata().Bucket(acceptanceIndexKey)
return bucket.Put(hash[:], serializedTxsAcceptanceData)
}
func (idx *AcceptanceIndex) dbFetchTxsAcceptanceData(dbTx database.Tx,
hash *daghash.Hash) (blockdag.MultiBlockTxsAcceptanceData, error) {
bucket := dbTx.Metadata().Bucket(acceptanceIndexKey)
serializedTxsAcceptanceData := bucket.Get(hash[:])
if serializedTxsAcceptanceData == nil {
return nil, fmt.Errorf("no entry in the accpetance index for block with hash %s", hash)
}
return deserializeMultiBlockTxsAcceptanceData(serializedTxsAcceptanceData)
}
type serializableTxAcceptanceData struct {
MsgTx wire.MsgTx
IsAccepted bool
}
type serializableBlockTxsAcceptanceData []serializableTxAcceptanceData
type serializableMultiBlockTxsAcceptanceData map[daghash.Hash]serializableBlockTxsAcceptanceData
func serializeMultiBlockTxsAcceptanceData(
txsAcceptanceData blockdag.MultiBlockTxsAcceptanceData) ([]byte, error) {
// Convert MultiBlockTxsAcceptanceData to a serializable format
serializableData := make(serializableMultiBlockTxsAcceptanceData, len(txsAcceptanceData))
for hash, blockTxsAcceptanceData := range txsAcceptanceData {
serializableBlockData := make(serializableBlockTxsAcceptanceData, len(blockTxsAcceptanceData))
for i, txAcceptanceData := range blockTxsAcceptanceData {
serializableBlockData[i] = serializableTxAcceptanceData{
MsgTx: *txAcceptanceData.Tx.MsgTx(),
IsAccepted: txAcceptanceData.IsAccepted,
}
}
serializableData[hash] = serializableBlockData
}
// Serialize
var buffer bytes.Buffer
encoder := gob.NewEncoder(&buffer)
err := encoder.Encode(serializableData)
if err != nil {
return nil, err
}
return buffer.Bytes(), nil
}
func deserializeMultiBlockTxsAcceptanceData(
serializedTxsAcceptanceData []byte) (blockdag.MultiBlockTxsAcceptanceData, error) {
// Deserialize
buffer := bytes.NewBuffer(serializedTxsAcceptanceData)
decoder := gob.NewDecoder(buffer)
var serializedData serializableMultiBlockTxsAcceptanceData
err := decoder.Decode(&serializedData)
if err != nil {
return nil, err
}
// Convert serializable format to MultiBlockTxsAcceptanceData
txsAcceptanceData := make(blockdag.MultiBlockTxsAcceptanceData, len(serializedData))
for hash, serializableBlockData := range serializedData {
blockTxsAcceptanceData := make(blockdag.BlockTxsAcceptanceData, len(serializableBlockData))
for i, txData := range serializableBlockData {
msgTx := txData.MsgTx
blockTxsAcceptanceData[i] = blockdag.TxAcceptanceData{
Tx: util.NewTx(&msgTx),
IsAccepted: txData.IsAccepted,
}
}
txsAcceptanceData[hash] = blockTxsAcceptanceData
}
return txsAcceptanceData, nil
}

View File

@ -0,0 +1,49 @@
package indexers
import (
"github.com/daglabs/btcd/blockdag"
"github.com/daglabs/btcd/util"
"github.com/daglabs/btcd/util/daghash"
"github.com/daglabs/btcd/wire"
"reflect"
"testing"
)
func TestSerializationAnDeserialization(t *testing.T) {
txsAcceptanceData := blockdag.MultiBlockTxsAcceptanceData{}
// Create test data
hash, _ := daghash.NewHashFromStr("1111111111111111111111111111111111111111111111111111111111111111")
txIn1 := &wire.TxIn{SignatureScript: []byte{1}, PreviousOutpoint: wire.Outpoint{Index: 1}, Sequence: 0}
txIn2 := &wire.TxIn{SignatureScript: []byte{2}, PreviousOutpoint: wire.Outpoint{Index: 2}, Sequence: 0}
txOut1 := &wire.TxOut{ScriptPubKey: []byte{1}, Value: 10}
txOut2 := &wire.TxOut{ScriptPubKey: []byte{2}, Value: 20}
blockTxsAcceptanceData := blockdag.BlockTxsAcceptanceData{
{
Tx: util.NewTx(wire.NewNativeMsgTx(wire.TxVersion, []*wire.TxIn{txIn1}, []*wire.TxOut{txOut1})),
IsAccepted: true,
},
{
Tx: util.NewTx(wire.NewNativeMsgTx(wire.TxVersion, []*wire.TxIn{txIn2}, []*wire.TxOut{txOut2})),
IsAccepted: false,
},
}
txsAcceptanceData[*hash] = blockTxsAcceptanceData
// Serialize
serializedTxsAcceptanceData, err := serializeMultiBlockTxsAcceptanceData(txsAcceptanceData)
if err != nil {
t.Fatalf("TestSerializationAnDeserialization: serialization failed: %s", err)
}
// Deserialize
deserializedTxsAcceptanceData, err := deserializeMultiBlockTxsAcceptanceData(serializedTxsAcceptanceData)
if err != nil {
t.Fatalf("TestSerializationAnDeserialization: deserialization failed: %s", err)
}
// Check that they're the same
if !reflect.DeepEqual(txsAcceptanceData, deserializedTxsAcceptanceData) {
t.Fatalf("TestSerializationAnDeserialization: original data and deseralize data aren't equal")
}
}

View File

@ -148,6 +148,14 @@ func btcdMain(serverChan chan<- *server.Server) error {
return nil
}
if cfg.DropAcceptanceIndex {
if err := indexers.DropAcceptanceIndex(db, interrupt); err != nil {
btcdLog.Errorf("%s", err)
return err
}
return nil
}
if cfg.DropCfIndex {
if err := indexers.DropCfIndex(db, interrupt); err != nil {
btcdLog.Errorf("%s", err)

View File

@ -58,6 +58,7 @@ const (
ErrRPCDifficulty RPCErrorCode = -5
ErrRPCOutOfRange RPCErrorCode = -1
ErrRPCNoTxInfo RPCErrorCode = -5
ErrRPCNoAcceptanceIndex RPCErrorCode = -5
ErrRPCNoCFIndex RPCErrorCode = -5
ErrRPCNoNewestBlockInfo RPCErrorCode = -5
ErrRPCInvalidTxVout RPCErrorCode = -5

View File

@ -59,6 +59,7 @@ const (
sampleConfigFilename = "sample-btcd.conf"
defaultTxIndex = false
defaultAddrIndex = false
defaultAcceptanceIndex = false
)
var (
@ -159,6 +160,8 @@ type configFlags struct {
DropTxIndex bool `long:"droptxindex" description:"Deletes the hash-based transaction index from the database on start up and then exits."`
AddrIndex bool `long:"addrindex" description:"Maintain a full address-based transaction index which makes the searchrawtransactions RPC available"`
DropAddrIndex bool `long:"dropaddrindex" description:"Deletes the address-based transaction index from the database on start up and then exits."`
AcceptanceIndex bool `long:"acceptanceindex" description:"Maintain a full hash-based acceptance index which makes the getChainByBlock RPC available"`
DropAcceptanceIndex bool `long:"dropacceptanceindex" description:"Deletes the hash-based acceptance index from the database on start up and then exits."`
RelayNonStd bool `long:"relaynonstd" description:"Relay non-standard transactions regardless of the default settings for the active network."`
RejectNonStd bool `long:"rejectnonstd" description:"Reject non-standard transactions regardless of the default settings for the active network."`
Subnetwork string `long:"subnetwork" description:"If subnetwork ID is specified, than node will request and process only payloads from specified subnetwork. And if subnetwork ID is ommited, than payloads of all subnetworks are processed. Subnetworks with IDs 2 through 255 are reserved for future use and are currently not allowed."`
@ -318,6 +321,7 @@ func loadConfig() (*Config, []string, error) {
Generate: defaultGenerate,
TxIndex: defaultTxIndex,
AddrIndex: defaultAddrIndex,
AcceptanceIndex: defaultAcceptanceIndex,
}
// Service options which are only added on Windows.
@ -734,6 +738,16 @@ func loadConfig() (*Config, []string, error) {
return nil, nil, err
}
// --acceptanceindex and --dropacceptanceindex do not mix.
if cfg.AcceptanceIndex && cfg.DropAcceptanceIndex {
err := fmt.Errorf("%s: the --acceptanceindex and --dropacceptanceindex "+
"options may not be activated at the same time",
funcName)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
}
// Check mining addresses are valid and saved parsed versions.
cfg.MiningAddrs = make([]util.Address, 0, len(cfg.configFlags.MiningAddrs))
for _, strAddr := range cfg.configFlags.MiningAddrs {

View File

@ -280,9 +280,10 @@ type Server struct {
// if the associated index is not enabled. These fields are set during
// initial creation of the server and never changed afterwards, so they
// do not need to be protected for concurrent access.
TxIndex *indexers.TxIndex
AddrIndex *indexers.AddrIndex
CfIndex *indexers.CfIndex
TxIndex *indexers.TxIndex
AddrIndex *indexers.AddrIndex
AcceptanceIndex *indexers.AcceptanceIndex
CfIndex *indexers.CfIndex
// cfCheckptCaches stores a cached slice of filter headers for cfcheckpt
// messages for each filter type.
@ -2416,6 +2417,11 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params
s.AddrIndex = indexers.NewAddrIndex(dagParams)
indexes = append(indexes, s.AddrIndex)
}
if config.MainConfig().AcceptanceIndex {
indxLog.Info("acceptance index is enabled")
s.AcceptanceIndex = indexers.NewAcceptanceIndex(dagParams)
indexes = append(indexes, s.AcceptanceIndex)
}
if config.MainConfig().EnableCFilters {
indxLog.Info("cf index is enabled")
s.CfIndex = indexers.NewCfIndex(dagParams)

View File

@ -2202,6 +2202,15 @@ func handleGetCFilterHeader(s *Server, cmd interface{}, closeChan <-chan struct{
// handleGetChainFromBlock implements the getChainFromBlock command.
func handleGetChainFromBlock(s *Server, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) {
if s.cfg.AcceptanceIndex == nil {
return nil, &btcjson.RPCError{
Code: btcjson.ErrRPCNoAcceptanceIndex,
Message: "The acceptance index must be " +
"enabled to get the selected parent chain " +
"(specify --acceptanceindex)",
}
}
c := cmd.(*btcjson.GetChainFromBlockCmd)
var startHash *daghash.Hash
if c.StartHash != nil {
@ -2258,7 +2267,7 @@ func handleGetChainFromBlock(s *Server, cmd interface{}, closeChan <-chan struct
func collectChainBlocks(s *Server, selectedParentChain []*daghash.Hash) ([]btcjson.ChainBlock, error) {
chainBlocks := make([]btcjson.ChainBlock, 0, len(selectedParentChain))
for _, hash := range selectedParentChain {
acceptanceData, err := s.cfg.DAG.BluesTxsAcceptanceData(hash)
acceptanceData, err := s.cfg.AcceptanceIndex.TxsAcceptanceData(hash)
if err != nil {
return nil, &btcjson.RPCError{
Code: btcjson.ErrRPCInternal.Code,
@ -4287,9 +4296,10 @@ type rpcserverConfig struct {
// These fields define any optional indexes the RPC server can make use
// of to provide additional data when queried.
TxIndex *indexers.TxIndex
AddrIndex *indexers.AddrIndex
CfIndex *indexers.CfIndex
TxIndex *indexers.TxIndex
AddrIndex *indexers.AddrIndex
AcceptanceIndex *indexers.AcceptanceIndex
CfIndex *indexers.CfIndex
}
// setupRPCListeners returns a slice of listeners that are configured for use
@ -4360,20 +4370,21 @@ func NewRPCServer(
return nil, errors.New("RPCS: No valid listen address")
}
cfg := &rpcserverConfig{
Listeners: rpcListeners,
StartupTime: startupTime,
ConnMgr: &rpcConnManager{p2pServer},
SyncMgr: &rpcSyncMgr{p2pServer, p2pServer.SyncManager},
TimeSource: p2pServer.TimeSource,
DAGParams: p2pServer.DAGParams,
DB: db,
TxMemPool: p2pServer.TxMemPool,
Generator: blockTemplateGenerator,
CPUMiner: cpuminer,
TxIndex: p2pServer.TxIndex,
AddrIndex: p2pServer.AddrIndex,
CfIndex: p2pServer.CfIndex,
DAG: p2pServer.DAG,
Listeners: rpcListeners,
StartupTime: startupTime,
ConnMgr: &rpcConnManager{p2pServer},
SyncMgr: &rpcSyncMgr{p2pServer, p2pServer.SyncManager},
TimeSource: p2pServer.TimeSource,
DAGParams: p2pServer.DAGParams,
DB: db,
TxMemPool: p2pServer.TxMemPool,
Generator: blockTemplateGenerator,
CPUMiner: cpuminer,
TxIndex: p2pServer.TxIndex,
AddrIndex: p2pServer.AddrIndex,
AcceptanceIndex: p2pServer.AcceptanceIndex,
CfIndex: p2pServer.CfIndex,
DAG: p2pServer.DAG,
}
rpc := Server{
cfg: *cfg,

View File

@ -1474,6 +1474,15 @@ func handleNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) {
// handleNotifyChainChanges implements the notifyChainChanges command extension for
// websocket connections.
func handleNotifyChainChanges(wsc *wsClient, icmd interface{}) (interface{}, error) {
if wsc.server.cfg.AcceptanceIndex == nil {
return nil, &btcjson.RPCError{
Code: btcjson.ErrRPCNoAcceptanceIndex,
Message: "The acceptance index must be " +
"enabled to receive chain changes " +
"(specify --acceptanceindex)",
}
}
wsc.server.ntfnMgr.RegisterChainChanges(wsc)
return nil, nil
}