mirror of
https://github.com/kaspanet/kaspad.git
synced 2025-09-13 13:00:10 +00:00

* [NOD-532] Change chain to DAG in the root package. * [NOD-532] Change chain to DAG in checkpoints.go. * [NOD-532] Change chain to DAG in blockdag. * [NOD-532] Change chain to DAG in cmd. * [NOD-532] Change chain to DAG in dagconfig. * [NOD-532] Change chain to DAG in database. * [NOD-532] Change chain to DAG in mempool. * [NOD-532] Change chain to DAG in mempool. * [NOD-532] Change chain to DAG in netsync. * [NOD-532] Change chain to DAG in rpcclient. * [NOD-532] Change chain to DAG in server. * [NOD-532] Change chain to DAG in txscript. * [NOD-532] Change chain to DAG in util. * [NOD-532] Change chain to DAG in wire. * [NOD-532] Remove block heights in dagio.go examples. * [NOD-532] Rename fakeChain to fakeDAG. * [NOD-532] Fix comments, remove unused EnableBCInfoHacks flag. * [NOD-532] Fix comments and variable names. * [NOD-532] Fix comments. * [NOD-532] Fix merge errors. * [NOD-532] Formatted project.
390 lines
12 KiB
Go
390 lines
12 KiB
Go
// Copyright (c) 2016 The btcsuite developers
|
|
// Use of this source code is governed by an ISC
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package indexers
|
|
|
|
import (
|
|
"github.com/kaspanet/kaspad/blockdag"
|
|
"github.com/kaspanet/kaspad/database"
|
|
"github.com/kaspanet/kaspad/util"
|
|
"github.com/kaspanet/kaspad/util/daghash"
|
|
)
|
|
|
|
var (
|
|
// indexTipsBucketName is the name of the db bucket used to house the
|
|
// current tip of each index.
|
|
indexTipsBucketName = []byte("idxtips")
|
|
|
|
indexCurrentBlockIDBucketName = []byte("idxcurrentblockid")
|
|
)
|
|
|
|
// Manager defines an index manager that manages multiple optional indexes and
|
|
// implements the blockdag.IndexManager interface so it can be seamlessly
|
|
// plugged into normal DAG processing.
|
|
type Manager struct {
|
|
db database.DB
|
|
enabledIndexes []Indexer
|
|
}
|
|
|
|
// Ensure the Manager type implements the blockdag.IndexManager interface.
|
|
var _ blockdag.IndexManager = (*Manager)(nil)
|
|
|
|
// indexDropKey returns the key for an index which indicates it is in the
|
|
// process of being dropped.
|
|
func indexDropKey(idxKey []byte) []byte {
|
|
dropKey := make([]byte, len(idxKey)+1)
|
|
dropKey[0] = 'd'
|
|
copy(dropKey[1:], idxKey)
|
|
return dropKey
|
|
}
|
|
|
|
// maybeFinishDrops determines if each of the enabled indexes are in the middle
|
|
// of being dropped and finishes dropping them when the are. This is necessary
|
|
// because dropping and index has to be done in several atomic steps rather than
|
|
// one big atomic step due to the massive number of entries.
|
|
func (m *Manager) maybeFinishDrops(interrupt <-chan struct{}) error {
|
|
indexNeedsDrop := make([]bool, len(m.enabledIndexes))
|
|
err := m.db.View(func(dbTx database.Tx) error {
|
|
// None of the indexes needs to be dropped if the index tips
|
|
// bucket hasn't been created yet.
|
|
indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
|
|
if indexesBucket == nil {
|
|
return nil
|
|
}
|
|
|
|
// Mark the indexer as requiring a drop if one is already in
|
|
// progress.
|
|
for i, indexer := range m.enabledIndexes {
|
|
dropKey := indexDropKey(indexer.Key())
|
|
if indexesBucket.Get(dropKey) != nil {
|
|
indexNeedsDrop[i] = true
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if interruptRequested(interrupt) {
|
|
return errInterruptRequested
|
|
}
|
|
|
|
// Finish dropping any of the enabled indexes that are already in the
|
|
// middle of being dropped.
|
|
for i, indexer := range m.enabledIndexes {
|
|
if !indexNeedsDrop[i] {
|
|
continue
|
|
}
|
|
|
|
log.Infof("Resuming %s drop", indexer.Name())
|
|
err := dropIndex(m.db, indexer.Key(), indexer.Name(), interrupt)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// maybeCreateIndexes determines if each of the enabled indexes have already
|
|
// been created and creates them if not.
|
|
func (m *Manager) maybeCreateIndexes(dbTx database.Tx) error {
|
|
indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
|
|
for _, indexer := range m.enabledIndexes {
|
|
// Nothing to do if the index tip already exists.
|
|
idxKey := indexer.Key()
|
|
if indexesBucket.Get(idxKey) != nil {
|
|
continue
|
|
}
|
|
|
|
// The tip for the index does not exist, so create it and
|
|
// invoke the create callback for the index so it can perform
|
|
// any one-time initialization it requires.
|
|
if err := indexer.Create(dbTx); err != nil {
|
|
return err
|
|
}
|
|
|
|
// TODO (Mike): this is temporary solution to prevent node from not starting
|
|
// because it thinks indexers are not initialized.
|
|
// Indexers, however, do not work properly, and a general solution to their work operation is required
|
|
indexesBucket.Put(idxKey, []byte{0})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Init initializes the enabled indexes. This is called during DAG
|
|
// initialization and primarily consists of catching up all indexes to the
|
|
// current tips. This is necessary since each index can be disabled
|
|
// and re-enabled at any time and attempting to catch-up indexes at the same
|
|
// time new blocks are being downloaded would lead to an overall longer time to
|
|
// catch up due to the I/O contention.
|
|
//
|
|
// This is part of the blockdag.IndexManager interface.
|
|
func (m *Manager) Init(db database.DB, blockDAG *blockdag.BlockDAG, interrupt <-chan struct{}) error {
|
|
// Nothing to do when no indexes are enabled.
|
|
if len(m.enabledIndexes) == 0 {
|
|
return nil
|
|
}
|
|
|
|
if interruptRequested(interrupt) {
|
|
return errInterruptRequested
|
|
}
|
|
|
|
m.db = db
|
|
|
|
// Finish and drops that were previously interrupted.
|
|
if err := m.maybeFinishDrops(interrupt); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create the initial state for the indexes as needed.
|
|
err := m.db.Update(func(dbTx database.Tx) error {
|
|
// Create the bucket for the current tips as needed.
|
|
meta := dbTx.Metadata()
|
|
_, err := meta.CreateBucketIfNotExists(indexTipsBucketName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if _, err := meta.CreateBucketIfNotExists(indexCurrentBlockIDBucketName); err != nil {
|
|
return err
|
|
}
|
|
|
|
return m.maybeCreateIndexes(dbTx)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Initialize each of the enabled indexes.
|
|
for _, indexer := range m.enabledIndexes {
|
|
if err := indexer.Init(db, blockDAG); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return m.recoverIfNeeded()
|
|
}
|
|
|
|
// recoverIfNeeded checks if the node worked for some time
|
|
// without one of the current enabled indexes, and if it's
|
|
// the case, recovers the missing blocks from the index.
|
|
func (m *Manager) recoverIfNeeded() error {
|
|
return m.db.Update(func(dbTx database.Tx) error {
|
|
lastKnownBlockID := blockdag.DBFetchCurrentBlockID(dbTx)
|
|
for _, indexer := range m.enabledIndexes {
|
|
serializedCurrentIdxBlockID := dbTx.Metadata().Bucket(indexCurrentBlockIDBucketName).Get(indexer.Key())
|
|
currentIdxBlockID := uint64(0)
|
|
if serializedCurrentIdxBlockID != nil {
|
|
currentIdxBlockID = blockdag.DeserializeBlockID(serializedCurrentIdxBlockID)
|
|
}
|
|
if lastKnownBlockID > currentIdxBlockID {
|
|
err := indexer.Recover(dbTx, currentIdxBlockID, lastKnownBlockID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// ConnectBlock must be invoked when a block is added to the DAG. It
|
|
// keeps track of the state of each index it is managing, performs some sanity
|
|
// checks, and invokes each indexer.
|
|
//
|
|
// This is part of the blockdag.IndexManager interface.
|
|
func (m *Manager) ConnectBlock(dbTx database.Tx, block *util.Block, blockID uint64, dag *blockdag.BlockDAG,
|
|
txsAcceptanceData blockdag.MultiBlockTxsAcceptanceData, virtualTxsAcceptanceData blockdag.MultiBlockTxsAcceptanceData) error {
|
|
|
|
// Call each of the currently active optional indexes with the block
|
|
// being connected so they can update accordingly.
|
|
for _, index := range m.enabledIndexes {
|
|
// Notify the indexer with the connected block so it can index it.
|
|
if err := index.ConnectBlock(dbTx, block, blockID, dag, txsAcceptanceData, virtualTxsAcceptanceData); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Add the new block ID index entry for the block being connected and
|
|
// update the current internal block ID accordingly.
|
|
err := m.updateIndexersWithCurrentBlockID(dbTx, block.Hash(), blockID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) updateIndexersWithCurrentBlockID(dbTx database.Tx, blockHash *daghash.Hash, blockID uint64) error {
|
|
serializedBlockID := blockdag.SerializeBlockID(blockID)
|
|
for _, index := range m.enabledIndexes {
|
|
err := dbTx.Metadata().Bucket(indexCurrentBlockIDBucketName).Put(index.Key(), serializedBlockID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NewManager returns a new index manager with the provided indexes enabled.
|
|
//
|
|
// The manager returned satisfies the blockdag.IndexManager interface and thus
|
|
// cleanly plugs into the normal blockdag processing path.
|
|
func NewManager(enabledIndexes []Indexer) *Manager {
|
|
return &Manager{
|
|
enabledIndexes: enabledIndexes,
|
|
}
|
|
}
|
|
|
|
// dropIndex drops the passed index from the database. Since indexes can be
|
|
// massive, it deletes the index in multiple database transactions in order to
|
|
// keep memory usage to reasonable levels. It also marks the drop in progress
|
|
// so the drop can be resumed if it is stopped before it is done before the
|
|
// index can be used again.
|
|
func dropIndex(db database.DB, idxKey []byte, idxName string, interrupt <-chan struct{}) error {
|
|
// Nothing to do if the index doesn't already exist.
|
|
var needsDelete bool
|
|
err := db.View(func(dbTx database.Tx) error {
|
|
indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
|
|
if indexesBucket != nil && indexesBucket.Get(idxKey) != nil {
|
|
needsDelete = true
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !needsDelete {
|
|
log.Infof("Not dropping %s because it does not exist", idxName)
|
|
return nil
|
|
}
|
|
|
|
// Mark that the index is in the process of being dropped so that it
|
|
// can be resumed on the next start if interrupted before the process is
|
|
// complete.
|
|
log.Infof("Dropping all %s entries. This might take a while...",
|
|
idxName)
|
|
err = db.Update(func(dbTx database.Tx) error {
|
|
indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
|
|
return indexesBucket.Put(indexDropKey(idxKey), idxKey)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Since the indexes can be so large, attempting to simply delete
|
|
// the bucket in a single database transaction would result in massive
|
|
// memory usage and likely crash many systems due to ulimits. In order
|
|
// to avoid this, use a cursor to delete a maximum number of entries out
|
|
// of the bucket at a time. Recurse buckets depth-first to delete any
|
|
// sub-buckets.
|
|
const maxDeletions = 2000000
|
|
var totalDeleted uint64
|
|
|
|
// Recurse through all buckets in the index, cataloging each for
|
|
// later deletion.
|
|
var subBuckets [][][]byte
|
|
var subBucketClosure func(database.Tx, []byte, [][]byte) error
|
|
subBucketClosure = func(dbTx database.Tx,
|
|
subBucket []byte, tlBucket [][]byte) error {
|
|
// Get full bucket name and append to subBuckets for later
|
|
// deletion.
|
|
var bucketName [][]byte
|
|
if (tlBucket == nil) || (len(tlBucket) == 0) {
|
|
bucketName = append(bucketName, subBucket)
|
|
} else {
|
|
bucketName = append(tlBucket, subBucket)
|
|
}
|
|
subBuckets = append(subBuckets, bucketName)
|
|
// Recurse sub-buckets to append to subBuckets slice.
|
|
bucket := dbTx.Metadata()
|
|
for _, subBucketName := range bucketName {
|
|
bucket = bucket.Bucket(subBucketName)
|
|
}
|
|
return bucket.ForEachBucket(func(k []byte) error {
|
|
return subBucketClosure(dbTx, k, bucketName)
|
|
})
|
|
}
|
|
|
|
// Call subBucketClosure with top-level bucket.
|
|
err = db.View(func(dbTx database.Tx) error {
|
|
return subBucketClosure(dbTx, idxKey, nil)
|
|
})
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
// Iterate through each sub-bucket in reverse, deepest-first, deleting
|
|
// all keys inside them and then dropping the buckets themselves.
|
|
for i := range subBuckets {
|
|
bucketName := subBuckets[len(subBuckets)-1-i]
|
|
// Delete maxDeletions key/value pairs at a time.
|
|
for numDeleted := maxDeletions; numDeleted == maxDeletions; {
|
|
numDeleted = 0
|
|
err := db.Update(func(dbTx database.Tx) error {
|
|
subBucket := dbTx.Metadata()
|
|
for _, subBucketName := range bucketName {
|
|
subBucket = subBucket.Bucket(subBucketName)
|
|
}
|
|
cursor := subBucket.Cursor()
|
|
for ok := cursor.First(); ok; ok = cursor.Next() &&
|
|
numDeleted < maxDeletions {
|
|
|
|
if err := cursor.Delete(); err != nil {
|
|
return err
|
|
}
|
|
numDeleted++
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if numDeleted > 0 {
|
|
totalDeleted += uint64(numDeleted)
|
|
log.Infof("Deleted %d keys (%d total) from %s",
|
|
numDeleted, totalDeleted, idxName)
|
|
}
|
|
}
|
|
|
|
if interruptRequested(interrupt) {
|
|
return errInterruptRequested
|
|
}
|
|
|
|
// Drop the bucket itself.
|
|
err = db.Update(func(dbTx database.Tx) error {
|
|
bucket := dbTx.Metadata()
|
|
for j := 0; j < len(bucketName)-1; j++ {
|
|
bucket = bucket.Bucket(bucketName[j])
|
|
}
|
|
return bucket.DeleteBucket(bucketName[len(bucketName)-1])
|
|
})
|
|
}
|
|
|
|
// Remove the index tip, index bucket, and in-progress drop flag now
|
|
// that all index entries have been removed.
|
|
err = db.Update(func(dbTx database.Tx) error {
|
|
meta := dbTx.Metadata()
|
|
indexesBucket := meta.Bucket(indexTipsBucketName)
|
|
if err := indexesBucket.Delete(idxKey); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := meta.Bucket(indexCurrentBlockIDBucketName).Delete(idxKey); err != nil {
|
|
return err
|
|
}
|
|
|
|
return indexesBucket.Delete(indexDropKey(idxKey))
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Infof("Dropped %s", idxName)
|
|
return nil
|
|
}
|