Add cache to block window (#1948)

* Add cache to block window

* Copy the window heap slice with the right capacity

* Use WindowHeapSliceStore

* Use the selected parent window as a basis (and some comments and variable renames)

* Clone slice on newSizedUpHeapFromSlice

* Rename isNotFoundError->currentIsNonTrustedBlock

* Increase windowHeapSliceStore cache size to 2000 and some cosmetic changes
This commit is contained in:
Ori Newman 2022-02-20 16:52:36 +02:00 committed by GitHub
parent 28d0f1ea2e
commit 1c18a49992
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 308 additions and 37 deletions

View File

@ -0,0 +1,44 @@
package blockwindowheapslicestore
import (
"github.com/kaspanet/kaspad/domain/consensus/model"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
)
type shardKey struct {
hash externalapi.DomainHash
windowSize int
}
type blockWindowHeapSliceStagingShard struct {
store *blockWindowHeapSliceStore
toAdd map[shardKey][]*externalapi.BlockGHOSTDAGDataHashPair
}
func (bss *blockWindowHeapSliceStore) stagingShard(stagingArea *model.StagingArea) *blockWindowHeapSliceStagingShard {
return stagingArea.GetOrCreateShard(bss.shardID, func() model.StagingShard {
return &blockWindowHeapSliceStagingShard{
store: bss,
toAdd: make(map[shardKey][]*externalapi.BlockGHOSTDAGDataHashPair),
}
}).(*blockWindowHeapSliceStagingShard)
}
func (bsss *blockWindowHeapSliceStagingShard) Commit(_ model.DBTransaction) error {
for key, heapSlice := range bsss.toAdd {
bsss.store.cache.Add(&key.hash, key.windowSize, heapSlice)
}
return nil
}
func (bsss *blockWindowHeapSliceStagingShard) isStaged() bool {
return len(bsss.toAdd) != 0
}
func newShardKey(hash *externalapi.DomainHash, windowSize int) shardKey {
return shardKey{
hash: *hash,
windowSize: windowSize,
}
}

View File

@ -0,0 +1,47 @@
package blockwindowheapslicestore
import (
"github.com/kaspanet/kaspad/domain/consensus/model"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/kaspanet/kaspad/domain/consensus/utils/lrucachehashandwindowsizetoblockghostdagdatahashpairs"
"github.com/kaspanet/kaspad/infrastructure/db/database"
"github.com/kaspanet/kaspad/util/staging"
"github.com/pkg/errors"
)
type blockWindowHeapSliceStore struct {
shardID model.StagingShardID
cache *lrucachehashandwindowsizetoblockghostdagdatahashpairs.LRUCache
}
// New instantiates a new WindowHeapSliceStore
func New(cacheSize int, preallocate bool) model.WindowHeapSliceStore {
return &blockWindowHeapSliceStore{
shardID: staging.GenerateShardingID(),
cache: lrucachehashandwindowsizetoblockghostdagdatahashpairs.New(cacheSize, preallocate),
}
}
// Stage stages the given blockStatus for the given blockHash
func (bss *blockWindowHeapSliceStore) Stage(stagingArea *model.StagingArea, blockHash *externalapi.DomainHash, windowSize int, heapSlice []*externalapi.BlockGHOSTDAGDataHashPair) {
stagingShard := bss.stagingShard(stagingArea)
stagingShard.toAdd[newShardKey(blockHash, windowSize)] = heapSlice
}
func (bss *blockWindowHeapSliceStore) IsStaged(stagingArea *model.StagingArea) bool {
return bss.stagingShard(stagingArea).isStaged()
}
func (bss *blockWindowHeapSliceStore) Get(stagingArea *model.StagingArea, blockHash *externalapi.DomainHash, windowSize int) ([]*externalapi.BlockGHOSTDAGDataHashPair, error) {
stagingShard := bss.stagingShard(stagingArea)
if heapSlice, ok := stagingShard.toAdd[newShardKey(blockHash, windowSize)]; ok {
return heapSlice, nil
}
if heapSlice, ok := bss.cache.Get(blockHash, windowSize); ok {
return heapSlice, nil
}
return nil, errors.Wrap(database.ErrNotFound, "Window heap slice not found")
}

View File

@ -1,6 +1,7 @@
package consensus package consensus
import ( import (
"github.com/kaspanet/kaspad/domain/consensus/datastructures/blockwindowheapslicestore"
"github.com/kaspanet/kaspad/domain/consensus/datastructures/daawindowstore" "github.com/kaspanet/kaspad/domain/consensus/datastructures/daawindowstore"
"github.com/kaspanet/kaspad/domain/consensus/model" "github.com/kaspanet/kaspad/domain/consensus/model"
"github.com/kaspanet/kaspad/domain/consensus/processes/blockparentbuilder" "github.com/kaspanet/kaspad/domain/consensus/processes/blockparentbuilder"
@ -144,9 +145,10 @@ func (f *factory) NewConsensus(config *Config, db infrastructuredatabase.Databas
finalityStore := finalitystore.New(prefixBucket, 200, preallocateCaches) finalityStore := finalitystore.New(prefixBucket, 200, preallocateCaches)
headersSelectedChainStore := headersselectedchainstore.New(prefixBucket, pruningWindowSizeForCaches, preallocateCaches) headersSelectedChainStore := headersselectedchainstore.New(prefixBucket, pruningWindowSizeForCaches, preallocateCaches)
daaBlocksStore := daablocksstore.New(prefixBucket, pruningWindowSizeForCaches, int(config.FinalityDepth()), preallocateCaches) daaBlocksStore := daablocksstore.New(prefixBucket, pruningWindowSizeForCaches, int(config.FinalityDepth()), preallocateCaches)
windowHeapSliceStore := blockwindowheapslicestore.New(2000, preallocateCaches)
blockRelationStores, reachabilityDataStores, ghostdagDataStores := dagStores(config, prefixBucket, pruningWindowSizePlusFinalityDepthForCache, pruningWindowSizeForCaches, preallocateCaches) blockRelationStores, reachabilityDataStores, ghostdagDataStores := dagStores(config, prefixBucket, pruningWindowSizePlusFinalityDepthForCache, pruningWindowSizeForCaches, preallocateCaches)
reachabilityManagers, dagTopologyManagers, ghostdagManagers, dagTraversalManagers := f.dagProcesses(config, dbManager, blockHeaderStore, daaWindowStore, blockRelationStores, reachabilityDataStores, ghostdagDataStores) reachabilityManagers, dagTopologyManagers, ghostdagManagers, dagTraversalManagers := f.dagProcesses(config, dbManager, blockHeaderStore, daaWindowStore, windowHeapSliceStore, blockRelationStores, reachabilityDataStores, ghostdagDataStores)
blockRelationStore := blockRelationStores[0] blockRelationStore := blockRelationStores[0]
reachabilityDataStore := reachabilityDataStores[0] reachabilityDataStore := reachabilityDataStores[0]
@ -600,6 +602,7 @@ func (f *factory) dagProcesses(config *Config,
dbManager model.DBManager, dbManager model.DBManager,
blockHeaderStore model.BlockHeaderStore, blockHeaderStore model.BlockHeaderStore,
daaWindowStore model.BlocksWithTrustedDataDAAWindowStore, daaWindowStore model.BlocksWithTrustedDataDAAWindowStore,
windowHeapSliceStore model.WindowHeapSliceStore,
blockRelationStores []model.BlockRelationStore, blockRelationStores []model.BlockRelationStore,
reachabilityDataStores []model.ReachabilityDataStore, reachabilityDataStores []model.ReachabilityDataStore,
ghostdagDataStores []model.GHOSTDAGDataStore) ( ghostdagDataStores []model.GHOSTDAGDataStore) (
@ -641,6 +644,7 @@ func (f *factory) dagProcesses(config *Config,
reachabilityDataStores[i], reachabilityDataStores[i],
ghostdagManagers[i], ghostdagManagers[i],
daaWindowStore, daaWindowStore,
windowHeapSliceStore,
config.GenesisHash, config.GenesisHash,
config.DifficultyAdjustmentWindowSize) config.DifficultyAdjustmentWindowSize)
} }

View File

@ -0,0 +1,11 @@
package model
import "github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
// WindowHeapSliceStore caches the slices that are needed for the heap implementation of DAGTraversalManager.BlockWindow
type WindowHeapSliceStore interface {
Store
Stage(stagingArea *StagingArea, blockHash *externalapi.DomainHash, windowSize int, pairs []*externalapi.BlockGHOSTDAGDataHashPair)
IsStaged(stagingArea *StagingArea) bool
Get(stagingArea *StagingArea, blockHash *externalapi.DomainHash, windowSize int) ([]*externalapi.BlockGHOSTDAGDataHashPair, error)
}

View File

@ -152,6 +152,18 @@ func (dtm *dagTraversalManager) newSizedUpHeap(stagingArea *model.StagingArea, c
return &h return &h
} }
func (dtm *dagTraversalManager) newSizedUpHeapFromSlice(stagingArea *model.StagingArea, slice []*externalapi.BlockGHOSTDAGDataHashPair) *sizedUpBlockHeap {
sliceClone := make([]*externalapi.BlockGHOSTDAGDataHashPair, len(slice), cap(slice))
copy(sliceClone, slice)
h := sizedUpBlockHeap{
impl: upHeap{baseHeap{slice: sliceClone, ghostdagManager: dtm.ghostdagManager}},
ghostdagStore: dtm.ghostdagDataStore,
dbContext: dtm.databaseContext,
stagingArea: stagingArea,
}
return &h
}
// len returns the length of this heap // len returns the length of this heap
func (sbh *sizedUpBlockHeap) len() int { func (sbh *sizedUpBlockHeap) len() int {
return sbh.impl.Len() return sbh.impl.Len()

View File

@ -18,6 +18,7 @@ type dagTraversalManager struct {
daaWindowStore model.BlocksWithTrustedDataDAAWindowStore daaWindowStore model.BlocksWithTrustedDataDAAWindowStore
genesisHash *externalapi.DomainHash genesisHash *externalapi.DomainHash
difficultyAdjustmentWindowSize int difficultyAdjustmentWindowSize int
windowHeapSliceStore model.WindowHeapSliceStore
} }
// New instantiates a new DAGTraversalManager // New instantiates a new DAGTraversalManager
@ -28,6 +29,7 @@ func New(
reachabilityDataStore model.ReachabilityDataStore, reachabilityDataStore model.ReachabilityDataStore,
ghostdagManager model.GHOSTDAGManager, ghostdagManager model.GHOSTDAGManager,
daaWindowStore model.BlocksWithTrustedDataDAAWindowStore, daaWindowStore model.BlocksWithTrustedDataDAAWindowStore,
windowHeapSliceStore model.WindowHeapSliceStore,
genesisHash *externalapi.DomainHash, genesisHash *externalapi.DomainHash,
difficultyAdjustmentWindowSize int) model.DAGTraversalManager { difficultyAdjustmentWindowSize int) model.DAGTraversalManager {
return &dagTraversalManager{ return &dagTraversalManager{
@ -40,6 +42,7 @@ func New(
genesisHash: genesisHash, genesisHash: genesisHash,
difficultyAdjustmentWindowSize: difficultyAdjustmentWindowSize, difficultyAdjustmentWindowSize: difficultyAdjustmentWindowSize,
windowHeapSliceStore: windowHeapSliceStore,
} }
} }

View File

@ -16,7 +16,7 @@ func (dtm *dagTraversalManager) DAABlockWindow(stagingArea *model.StagingArea, h
func (dtm *dagTraversalManager) BlockWindow(stagingArea *model.StagingArea, highHash *externalapi.DomainHash, func (dtm *dagTraversalManager) BlockWindow(stagingArea *model.StagingArea, highHash *externalapi.DomainHash,
windowSize int) ([]*externalapi.DomainHash, error) { windowSize int) ([]*externalapi.DomainHash, error) {
windowHeap, err := dtm.calculateBlockWindowHeap(stagingArea, highHash, windowSize) windowHeap, err := dtm.blockWindowHeap(stagingArea, highHash, windowSize)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -28,6 +28,28 @@ func (dtm *dagTraversalManager) BlockWindow(stagingArea *model.StagingArea, high
return window, nil return window, nil
} }
func (dtm *dagTraversalManager) blockWindowHeap(stagingArea *model.StagingArea,
highHash *externalapi.DomainHash, windowSize int) (*sizedUpBlockHeap, error) {
windowHeapSlice, err := dtm.windowHeapSliceStore.Get(stagingArea, highHash, windowSize)
sliceNotCached := database.IsNotFoundError(err)
if !sliceNotCached && err != nil {
return nil, err
}
if !sliceNotCached {
return dtm.newSizedUpHeapFromSlice(stagingArea, windowHeapSlice), nil
}
heap, err := dtm.calculateBlockWindowHeap(stagingArea, highHash, windowSize)
if err != nil {
return nil, err
}
if !highHash.Equal(model.VirtualBlockHash) {
dtm.windowHeapSliceStore.Stage(stagingArea, highHash, windowSize, heap.impl.slice)
}
return heap, nil
}
func (dtm *dagTraversalManager) calculateBlockWindowHeap(stagingArea *model.StagingArea, func (dtm *dagTraversalManager) calculateBlockWindowHeap(stagingArea *model.StagingArea,
highHash *externalapi.DomainHash, windowSize int) (*sizedUpBlockHeap, error) { highHash *externalapi.DomainHash, windowSize int) (*sizedUpBlockHeap, error) {
@ -45,18 +67,54 @@ func (dtm *dagTraversalManager) calculateBlockWindowHeap(stagingArea *model.Stag
return nil, err return nil, err
} }
// If the block has a trusted DAA window attached, we just take it as is and don't use cache of selected parent to
// build the window. This is because tryPushMergeSet might not be able to find all the GHOSTDAG data that is
// associated with the block merge set.
_, err = dtm.daaWindowStore.DAAWindowBlock(dtm.databaseContext, stagingArea, current, 0)
isNonTrustedBlock := database.IsNotFoundError(err)
if !isNonTrustedBlock && err != nil {
return nil, err
}
if isNonTrustedBlock && currentGHOSTDAGData.SelectedParent() != nil {
windowHeapSlice, err := dtm.windowHeapSliceStore.Get(stagingArea, currentGHOSTDAGData.SelectedParent(), windowSize)
selectedParentNotCached := database.IsNotFoundError(err)
if !selectedParentNotCached && err != nil {
return nil, err
}
if !selectedParentNotCached {
windowHeap := dtm.newSizedUpHeapFromSlice(stagingArea, windowHeapSlice)
if !currentGHOSTDAGData.SelectedParent().Equal(dtm.genesisHash) {
selectedParentGHOSTDAGData, err := dtm.ghostdagDataStore.Get(
dtm.databaseContext, stagingArea, currentGHOSTDAGData.SelectedParent(), false)
if err != nil {
return nil, err
}
_, err = dtm.tryPushMergeSet(windowHeap, currentGHOSTDAGData, selectedParentGHOSTDAGData)
if err != nil {
return nil, err
}
}
return windowHeap, nil
}
}
// Walk down the chain until you finish or find a trusted block and then take complete the rest
// of the window with the trusted window.
for { for {
if currentGHOSTDAGData.SelectedParent().Equal(dtm.genesisHash) { if currentGHOSTDAGData.SelectedParent().Equal(dtm.genesisHash) {
break break
} }
_, err := dtm.daaWindowStore.DAAWindowBlock(dtm.databaseContext, stagingArea, current, 0) _, err := dtm.daaWindowStore.DAAWindowBlock(dtm.databaseContext, stagingArea, current, 0)
isNotFoundError := database.IsNotFoundError(err) currentIsNonTrustedBlock := database.IsNotFoundError(err)
if !isNotFoundError && err != nil { if !currentIsNonTrustedBlock && err != nil {
return nil, err return nil, err
} }
if !isNotFoundError { if !currentIsNonTrustedBlock {
for i := uint64(0); ; i++ { for i := uint64(0); ; i++ {
daaBlock, err := dtm.daaWindowStore.DAAWindowBlock(dtm.databaseContext, stagingArea, current, i) daaBlock, err := dtm.daaWindowStore.DAAWindowBlock(dtm.databaseContext, stagingArea, current, i)
if database.IsNotFoundError(err) { if database.IsNotFoundError(err) {
@ -83,47 +141,60 @@ func (dtm *dagTraversalManager) calculateBlockWindowHeap(stagingArea *model.Stag
if err != nil { if err != nil {
return nil, err return nil, err
} }
added, err := windowHeap.tryPushWithGHOSTDAGData(currentGHOSTDAGData.SelectedParent(), selectedParentGHOSTDAGData)
done, err := dtm.tryPushMergeSet(windowHeap, currentGHOSTDAGData, selectedParentGHOSTDAGData)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if done {
// If the window is full and the selected parent is less than the minimum then we break
// because this means that there cannot be any more blocks in the past with higher blueWork
if !added {
break break
} }
// Now we go over the merge set.
// Remove the SP from the blue merge set because we already added it.
mergeSetBlues := currentGHOSTDAGData.MergeSetBlues()[1:]
// Go over the merge set in reverse because it's ordered in reverse by blueWork.
for i := len(mergeSetBlues) - 1; i >= 0; i-- {
added, err := windowHeap.tryPush(mergeSetBlues[i])
if err != nil {
return nil, err
}
// If it's smaller than minimum then we won't be able to add the rest because they're even smaller.
if !added {
break
}
}
mergeSetReds := currentGHOSTDAGData.MergeSetReds()
for i := len(mergeSetReds) - 1; i >= 0; i-- {
added, err := windowHeap.tryPush(mergeSetReds[i])
if err != nil {
return nil, err
}
// If it's smaller than minimum then we won't be able to add the rest because they're even smaller.
if !added {
break
}
}
current = currentGHOSTDAGData.SelectedParent() current = currentGHOSTDAGData.SelectedParent()
currentGHOSTDAGData = selectedParentGHOSTDAGData currentGHOSTDAGData = selectedParentGHOSTDAGData
} }
return windowHeap, nil return windowHeap, nil
} }
func (dtm *dagTraversalManager) tryPushMergeSet(windowHeap *sizedUpBlockHeap, currentGHOSTDAGData, selectedParentGHOSTDAGData *externalapi.BlockGHOSTDAGData) (bool, error) {
added, err := windowHeap.tryPushWithGHOSTDAGData(currentGHOSTDAGData.SelectedParent(), selectedParentGHOSTDAGData)
if err != nil {
return false, err
}
// If the window is full and the selected parent is less than the minimum then we break
// because this means that there cannot be any more blocks in the past with higher blueWork
if !added {
return true, nil
}
// Now we go over the merge set.
// Remove the SP from the blue merge set because we already added it.
mergeSetBlues := currentGHOSTDAGData.MergeSetBlues()[1:]
// Go over the merge set in reverse because it's ordered in reverse by blueWork.
for i := len(mergeSetBlues) - 1; i >= 0; i-- {
added, err := windowHeap.tryPush(mergeSetBlues[i])
if err != nil {
return false, err
}
// If it's smaller than minimum then we won't be able to add the rest because they're even smaller.
if !added {
break
}
}
mergeSetReds := currentGHOSTDAGData.MergeSetReds()
for i := len(mergeSetReds) - 1; i >= 0; i-- {
added, err := windowHeap.tryPush(mergeSetReds[i])
if err != nil {
return false, err
}
// If it's smaller than minimum then we won't be able to add the rest because they're even smaller.
if !added {
break
}
}
return false, nil
}

View File

@ -0,0 +1,79 @@
package lrucachehashandwindowsizetoblockghostdagdatahashpairs
import "github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
type lruKey struct {
blockHash externalapi.DomainHash
windowSize int
}
func newKey(blockHash *externalapi.DomainHash, windowSize int) lruKey {
return lruKey{
blockHash: *blockHash,
windowSize: windowSize,
}
}
// LRUCache is a least-recently-used cache from
// lruKey to *externalapi.BlockGHOSTDAGDataHashPair
type LRUCache struct {
cache map[lruKey][]*externalapi.BlockGHOSTDAGDataHashPair
capacity int
}
// New creates a new LRUCache
func New(capacity int, preallocate bool) *LRUCache {
var cache map[lruKey][]*externalapi.BlockGHOSTDAGDataHashPair
if preallocate {
cache = make(map[lruKey][]*externalapi.BlockGHOSTDAGDataHashPair, capacity+1)
} else {
cache = make(map[lruKey][]*externalapi.BlockGHOSTDAGDataHashPair)
}
return &LRUCache{
cache: cache,
capacity: capacity,
}
}
// Add adds an entry to the LRUCache
func (c *LRUCache) Add(blockHash *externalapi.DomainHash, windowSize int, value []*externalapi.BlockGHOSTDAGDataHashPair) {
key := newKey(blockHash, windowSize)
c.cache[key] = value
if len(c.cache) > c.capacity {
c.evictRandom()
}
}
// Get returns the entry for the given key, or (nil, false) otherwise
func (c *LRUCache) Get(blockHash *externalapi.DomainHash, windowSize int) ([]*externalapi.BlockGHOSTDAGDataHashPair, bool) {
key := newKey(blockHash, windowSize)
value, ok := c.cache[key]
if !ok {
return nil, false
}
return value, true
}
// Has returns whether the LRUCache contains the given key
func (c *LRUCache) Has(blockHash *externalapi.DomainHash, windowSize int) bool {
key := newKey(blockHash, windowSize)
_, ok := c.cache[key]
return ok
}
// Remove removes the entry for the the given key. Does nothing if
// the entry does not exist
func (c *LRUCache) Remove(blockHash *externalapi.DomainHash, windowSize int) {
key := newKey(blockHash, windowSize)
delete(c.cache, key)
}
func (c *LRUCache) evictRandom() {
var keyToEvict lruKey
for key := range c.cache {
keyToEvict = key
break
}
c.Remove(&keyToEvict.blockHash, keyToEvict.windowSize)
}