diff --git a/domain/consensus/datastructures/blockwindowheapslicestore/block_window_heap_slice_staging_shard.go b/domain/consensus/datastructures/blockwindowheapslicestore/block_window_heap_slice_staging_shard.go new file mode 100644 index 000000000..4dde5ac04 --- /dev/null +++ b/domain/consensus/datastructures/blockwindowheapslicestore/block_window_heap_slice_staging_shard.go @@ -0,0 +1,44 @@ +package blockwindowheapslicestore + +import ( + "github.com/kaspanet/kaspad/domain/consensus/model" + "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" +) + +type shardKey struct { + hash externalapi.DomainHash + windowSize int +} + +type blockWindowHeapSliceStagingShard struct { + store *blockWindowHeapSliceStore + toAdd map[shardKey][]*externalapi.BlockGHOSTDAGDataHashPair +} + +func (bss *blockWindowHeapSliceStore) stagingShard(stagingArea *model.StagingArea) *blockWindowHeapSliceStagingShard { + return stagingArea.GetOrCreateShard(bss.shardID, func() model.StagingShard { + return &blockWindowHeapSliceStagingShard{ + store: bss, + toAdd: make(map[shardKey][]*externalapi.BlockGHOSTDAGDataHashPair), + } + }).(*blockWindowHeapSliceStagingShard) +} + +func (bsss *blockWindowHeapSliceStagingShard) Commit(_ model.DBTransaction) error { + for key, heapSlice := range bsss.toAdd { + bsss.store.cache.Add(&key.hash, key.windowSize, heapSlice) + } + + return nil +} + +func (bsss *blockWindowHeapSliceStagingShard) isStaged() bool { + return len(bsss.toAdd) != 0 +} + +func newShardKey(hash *externalapi.DomainHash, windowSize int) shardKey { + return shardKey{ + hash: *hash, + windowSize: windowSize, + } +} diff --git a/domain/consensus/datastructures/blockwindowheapslicestore/block_window_heap_slice_store.go b/domain/consensus/datastructures/blockwindowheapslicestore/block_window_heap_slice_store.go new file mode 100644 index 000000000..053f72ccd --- /dev/null +++ b/domain/consensus/datastructures/blockwindowheapslicestore/block_window_heap_slice_store.go @@ -0,0 +1,47 @@ +package blockwindowheapslicestore + +import ( + "github.com/kaspanet/kaspad/domain/consensus/model" + "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" + "github.com/kaspanet/kaspad/domain/consensus/utils/lrucachehashandwindowsizetoblockghostdagdatahashpairs" + "github.com/kaspanet/kaspad/infrastructure/db/database" + "github.com/kaspanet/kaspad/util/staging" + "github.com/pkg/errors" +) + +type blockWindowHeapSliceStore struct { + shardID model.StagingShardID + cache *lrucachehashandwindowsizetoblockghostdagdatahashpairs.LRUCache +} + +// New instantiates a new WindowHeapSliceStore +func New(cacheSize int, preallocate bool) model.WindowHeapSliceStore { + return &blockWindowHeapSliceStore{ + shardID: staging.GenerateShardingID(), + cache: lrucachehashandwindowsizetoblockghostdagdatahashpairs.New(cacheSize, preallocate), + } +} + +// Stage stages the given blockStatus for the given blockHash +func (bss *blockWindowHeapSliceStore) Stage(stagingArea *model.StagingArea, blockHash *externalapi.DomainHash, windowSize int, heapSlice []*externalapi.BlockGHOSTDAGDataHashPair) { + stagingShard := bss.stagingShard(stagingArea) + stagingShard.toAdd[newShardKey(blockHash, windowSize)] = heapSlice +} + +func (bss *blockWindowHeapSliceStore) IsStaged(stagingArea *model.StagingArea) bool { + return bss.stagingShard(stagingArea).isStaged() +} + +func (bss *blockWindowHeapSliceStore) Get(stagingArea *model.StagingArea, blockHash *externalapi.DomainHash, windowSize int) ([]*externalapi.BlockGHOSTDAGDataHashPair, error) { + stagingShard := bss.stagingShard(stagingArea) + + if heapSlice, ok := stagingShard.toAdd[newShardKey(blockHash, windowSize)]; ok { + return heapSlice, nil + } + + if heapSlice, ok := bss.cache.Get(blockHash, windowSize); ok { + return heapSlice, nil + } + + return nil, errors.Wrap(database.ErrNotFound, "Window heap slice not found") +} diff --git a/domain/consensus/factory.go b/domain/consensus/factory.go index 0bd71f873..ee6dc0634 100644 --- a/domain/consensus/factory.go +++ b/domain/consensus/factory.go @@ -1,6 +1,7 @@ package consensus import ( + "github.com/kaspanet/kaspad/domain/consensus/datastructures/blockwindowheapslicestore" "github.com/kaspanet/kaspad/domain/consensus/datastructures/daawindowstore" "github.com/kaspanet/kaspad/domain/consensus/model" "github.com/kaspanet/kaspad/domain/consensus/processes/blockparentbuilder" @@ -144,9 +145,10 @@ func (f *factory) NewConsensus(config *Config, db infrastructuredatabase.Databas finalityStore := finalitystore.New(prefixBucket, 200, preallocateCaches) headersSelectedChainStore := headersselectedchainstore.New(prefixBucket, pruningWindowSizeForCaches, preallocateCaches) daaBlocksStore := daablocksstore.New(prefixBucket, pruningWindowSizeForCaches, int(config.FinalityDepth()), preallocateCaches) + windowHeapSliceStore := blockwindowheapslicestore.New(2000, preallocateCaches) blockRelationStores, reachabilityDataStores, ghostdagDataStores := dagStores(config, prefixBucket, pruningWindowSizePlusFinalityDepthForCache, pruningWindowSizeForCaches, preallocateCaches) - reachabilityManagers, dagTopologyManagers, ghostdagManagers, dagTraversalManagers := f.dagProcesses(config, dbManager, blockHeaderStore, daaWindowStore, blockRelationStores, reachabilityDataStores, ghostdagDataStores) + reachabilityManagers, dagTopologyManagers, ghostdagManagers, dagTraversalManagers := f.dagProcesses(config, dbManager, blockHeaderStore, daaWindowStore, windowHeapSliceStore, blockRelationStores, reachabilityDataStores, ghostdagDataStores) blockRelationStore := blockRelationStores[0] reachabilityDataStore := reachabilityDataStores[0] @@ -600,6 +602,7 @@ func (f *factory) dagProcesses(config *Config, dbManager model.DBManager, blockHeaderStore model.BlockHeaderStore, daaWindowStore model.BlocksWithTrustedDataDAAWindowStore, + windowHeapSliceStore model.WindowHeapSliceStore, blockRelationStores []model.BlockRelationStore, reachabilityDataStores []model.ReachabilityDataStore, ghostdagDataStores []model.GHOSTDAGDataStore) ( @@ -641,6 +644,7 @@ func (f *factory) dagProcesses(config *Config, reachabilityDataStores[i], ghostdagManagers[i], daaWindowStore, + windowHeapSliceStore, config.GenesisHash, config.DifficultyAdjustmentWindowSize) } diff --git a/domain/consensus/model/interface_datastructures_windowheapstore.go b/domain/consensus/model/interface_datastructures_windowheapstore.go new file mode 100644 index 000000000..b594fc076 --- /dev/null +++ b/domain/consensus/model/interface_datastructures_windowheapstore.go @@ -0,0 +1,11 @@ +package model + +import "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" + +// WindowHeapSliceStore caches the slices that are needed for the heap implementation of DAGTraversalManager.BlockWindow +type WindowHeapSliceStore interface { + Store + Stage(stagingArea *StagingArea, blockHash *externalapi.DomainHash, windowSize int, pairs []*externalapi.BlockGHOSTDAGDataHashPair) + IsStaged(stagingArea *StagingArea) bool + Get(stagingArea *StagingArea, blockHash *externalapi.DomainHash, windowSize int) ([]*externalapi.BlockGHOSTDAGDataHashPair, error) +} diff --git a/domain/consensus/processes/dagtraversalmanager/block_heap.go b/domain/consensus/processes/dagtraversalmanager/block_heap.go index d6b9b6f1d..f515d7913 100644 --- a/domain/consensus/processes/dagtraversalmanager/block_heap.go +++ b/domain/consensus/processes/dagtraversalmanager/block_heap.go @@ -152,6 +152,18 @@ func (dtm *dagTraversalManager) newSizedUpHeap(stagingArea *model.StagingArea, c return &h } +func (dtm *dagTraversalManager) newSizedUpHeapFromSlice(stagingArea *model.StagingArea, slice []*externalapi.BlockGHOSTDAGDataHashPair) *sizedUpBlockHeap { + sliceClone := make([]*externalapi.BlockGHOSTDAGDataHashPair, len(slice), cap(slice)) + copy(sliceClone, slice) + h := sizedUpBlockHeap{ + impl: upHeap{baseHeap{slice: sliceClone, ghostdagManager: dtm.ghostdagManager}}, + ghostdagStore: dtm.ghostdagDataStore, + dbContext: dtm.databaseContext, + stagingArea: stagingArea, + } + return &h +} + // len returns the length of this heap func (sbh *sizedUpBlockHeap) len() int { return sbh.impl.Len() diff --git a/domain/consensus/processes/dagtraversalmanager/dagtraversalmanager.go b/domain/consensus/processes/dagtraversalmanager/dagtraversalmanager.go index 5952227e2..6263cd4f3 100644 --- a/domain/consensus/processes/dagtraversalmanager/dagtraversalmanager.go +++ b/domain/consensus/processes/dagtraversalmanager/dagtraversalmanager.go @@ -18,6 +18,7 @@ type dagTraversalManager struct { daaWindowStore model.BlocksWithTrustedDataDAAWindowStore genesisHash *externalapi.DomainHash difficultyAdjustmentWindowSize int + windowHeapSliceStore model.WindowHeapSliceStore } // New instantiates a new DAGTraversalManager @@ -28,6 +29,7 @@ func New( reachabilityDataStore model.ReachabilityDataStore, ghostdagManager model.GHOSTDAGManager, daaWindowStore model.BlocksWithTrustedDataDAAWindowStore, + windowHeapSliceStore model.WindowHeapSliceStore, genesisHash *externalapi.DomainHash, difficultyAdjustmentWindowSize int) model.DAGTraversalManager { return &dagTraversalManager{ @@ -40,6 +42,7 @@ func New( genesisHash: genesisHash, difficultyAdjustmentWindowSize: difficultyAdjustmentWindowSize, + windowHeapSliceStore: windowHeapSliceStore, } } diff --git a/domain/consensus/processes/dagtraversalmanager/window.go b/domain/consensus/processes/dagtraversalmanager/window.go index c71deffe6..a896305cb 100644 --- a/domain/consensus/processes/dagtraversalmanager/window.go +++ b/domain/consensus/processes/dagtraversalmanager/window.go @@ -16,7 +16,7 @@ func (dtm *dagTraversalManager) DAABlockWindow(stagingArea *model.StagingArea, h func (dtm *dagTraversalManager) BlockWindow(stagingArea *model.StagingArea, highHash *externalapi.DomainHash, windowSize int) ([]*externalapi.DomainHash, error) { - windowHeap, err := dtm.calculateBlockWindowHeap(stagingArea, highHash, windowSize) + windowHeap, err := dtm.blockWindowHeap(stagingArea, highHash, windowSize) if err != nil { return nil, err } @@ -28,6 +28,28 @@ func (dtm *dagTraversalManager) BlockWindow(stagingArea *model.StagingArea, high return window, nil } +func (dtm *dagTraversalManager) blockWindowHeap(stagingArea *model.StagingArea, + highHash *externalapi.DomainHash, windowSize int) (*sizedUpBlockHeap, error) { + windowHeapSlice, err := dtm.windowHeapSliceStore.Get(stagingArea, highHash, windowSize) + sliceNotCached := database.IsNotFoundError(err) + if !sliceNotCached && err != nil { + return nil, err + } + if !sliceNotCached { + return dtm.newSizedUpHeapFromSlice(stagingArea, windowHeapSlice), nil + } + + heap, err := dtm.calculateBlockWindowHeap(stagingArea, highHash, windowSize) + if err != nil { + return nil, err + } + + if !highHash.Equal(model.VirtualBlockHash) { + dtm.windowHeapSliceStore.Stage(stagingArea, highHash, windowSize, heap.impl.slice) + } + return heap, nil +} + func (dtm *dagTraversalManager) calculateBlockWindowHeap(stagingArea *model.StagingArea, highHash *externalapi.DomainHash, windowSize int) (*sizedUpBlockHeap, error) { @@ -45,18 +67,54 @@ func (dtm *dagTraversalManager) calculateBlockWindowHeap(stagingArea *model.Stag return nil, err } + // If the block has a trusted DAA window attached, we just take it as is and don't use cache of selected parent to + // build the window. This is because tryPushMergeSet might not be able to find all the GHOSTDAG data that is + // associated with the block merge set. + _, err = dtm.daaWindowStore.DAAWindowBlock(dtm.databaseContext, stagingArea, current, 0) + isNonTrustedBlock := database.IsNotFoundError(err) + if !isNonTrustedBlock && err != nil { + return nil, err + } + + if isNonTrustedBlock && currentGHOSTDAGData.SelectedParent() != nil { + windowHeapSlice, err := dtm.windowHeapSliceStore.Get(stagingArea, currentGHOSTDAGData.SelectedParent(), windowSize) + selectedParentNotCached := database.IsNotFoundError(err) + if !selectedParentNotCached && err != nil { + return nil, err + } + if !selectedParentNotCached { + windowHeap := dtm.newSizedUpHeapFromSlice(stagingArea, windowHeapSlice) + if !currentGHOSTDAGData.SelectedParent().Equal(dtm.genesisHash) { + selectedParentGHOSTDAGData, err := dtm.ghostdagDataStore.Get( + dtm.databaseContext, stagingArea, currentGHOSTDAGData.SelectedParent(), false) + if err != nil { + return nil, err + } + + _, err = dtm.tryPushMergeSet(windowHeap, currentGHOSTDAGData, selectedParentGHOSTDAGData) + if err != nil { + return nil, err + } + } + + return windowHeap, nil + } + } + + // Walk down the chain until you finish or find a trusted block and then take complete the rest + // of the window with the trusted window. for { if currentGHOSTDAGData.SelectedParent().Equal(dtm.genesisHash) { break } _, err := dtm.daaWindowStore.DAAWindowBlock(dtm.databaseContext, stagingArea, current, 0) - isNotFoundError := database.IsNotFoundError(err) - if !isNotFoundError && err != nil { + currentIsNonTrustedBlock := database.IsNotFoundError(err) + if !currentIsNonTrustedBlock && err != nil { return nil, err } - if !isNotFoundError { + if !currentIsNonTrustedBlock { for i := uint64(0); ; i++ { daaBlock, err := dtm.daaWindowStore.DAAWindowBlock(dtm.databaseContext, stagingArea, current, i) if database.IsNotFoundError(err) { @@ -83,47 +141,60 @@ func (dtm *dagTraversalManager) calculateBlockWindowHeap(stagingArea *model.Stag if err != nil { return nil, err } - added, err := windowHeap.tryPushWithGHOSTDAGData(currentGHOSTDAGData.SelectedParent(), selectedParentGHOSTDAGData) + + done, err := dtm.tryPushMergeSet(windowHeap, currentGHOSTDAGData, selectedParentGHOSTDAGData) if err != nil { return nil, err } - - // If the window is full and the selected parent is less than the minimum then we break - // because this means that there cannot be any more blocks in the past with higher blueWork - if !added { + if done { break } - // Now we go over the merge set. - // Remove the SP from the blue merge set because we already added it. - mergeSetBlues := currentGHOSTDAGData.MergeSetBlues()[1:] - // Go over the merge set in reverse because it's ordered in reverse by blueWork. - for i := len(mergeSetBlues) - 1; i >= 0; i-- { - added, err := windowHeap.tryPush(mergeSetBlues[i]) - if err != nil { - return nil, err - } - // If it's smaller than minimum then we won't be able to add the rest because they're even smaller. - if !added { - break - } - } - - mergeSetReds := currentGHOSTDAGData.MergeSetReds() - for i := len(mergeSetReds) - 1; i >= 0; i-- { - added, err := windowHeap.tryPush(mergeSetReds[i]) - if err != nil { - return nil, err - } - // If it's smaller than minimum then we won't be able to add the rest because they're even smaller. - if !added { - break - } - } - current = currentGHOSTDAGData.SelectedParent() currentGHOSTDAGData = selectedParentGHOSTDAGData } return windowHeap, nil } + +func (dtm *dagTraversalManager) tryPushMergeSet(windowHeap *sizedUpBlockHeap, currentGHOSTDAGData, selectedParentGHOSTDAGData *externalapi.BlockGHOSTDAGData) (bool, error) { + added, err := windowHeap.tryPushWithGHOSTDAGData(currentGHOSTDAGData.SelectedParent(), selectedParentGHOSTDAGData) + if err != nil { + return false, err + } + + // If the window is full and the selected parent is less than the minimum then we break + // because this means that there cannot be any more blocks in the past with higher blueWork + if !added { + return true, nil + } + + // Now we go over the merge set. + // Remove the SP from the blue merge set because we already added it. + mergeSetBlues := currentGHOSTDAGData.MergeSetBlues()[1:] + // Go over the merge set in reverse because it's ordered in reverse by blueWork. + for i := len(mergeSetBlues) - 1; i >= 0; i-- { + added, err := windowHeap.tryPush(mergeSetBlues[i]) + if err != nil { + return false, err + } + // If it's smaller than minimum then we won't be able to add the rest because they're even smaller. + if !added { + break + } + } + + mergeSetReds := currentGHOSTDAGData.MergeSetReds() + for i := len(mergeSetReds) - 1; i >= 0; i-- { + added, err := windowHeap.tryPush(mergeSetReds[i]) + if err != nil { + return false, err + } + // If it's smaller than minimum then we won't be able to add the rest because they're even smaller. + if !added { + break + } + } + + return false, nil +} diff --git a/domain/consensus/utils/lrucachehashandwindowsizetoblockghostdagdatahashpairs/lrucachehashandwindowsizetoblockghostdagdatahashpairs.go b/domain/consensus/utils/lrucachehashandwindowsizetoblockghostdagdatahashpairs/lrucachehashandwindowsizetoblockghostdagdatahashpairs.go new file mode 100644 index 000000000..ef2459c73 --- /dev/null +++ b/domain/consensus/utils/lrucachehashandwindowsizetoblockghostdagdatahashpairs/lrucachehashandwindowsizetoblockghostdagdatahashpairs.go @@ -0,0 +1,79 @@ +package lrucachehashandwindowsizetoblockghostdagdatahashpairs + +import "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" + +type lruKey struct { + blockHash externalapi.DomainHash + windowSize int +} + +func newKey(blockHash *externalapi.DomainHash, windowSize int) lruKey { + return lruKey{ + blockHash: *blockHash, + windowSize: windowSize, + } +} + +// LRUCache is a least-recently-used cache from +// lruKey to *externalapi.BlockGHOSTDAGDataHashPair +type LRUCache struct { + cache map[lruKey][]*externalapi.BlockGHOSTDAGDataHashPair + capacity int +} + +// New creates a new LRUCache +func New(capacity int, preallocate bool) *LRUCache { + var cache map[lruKey][]*externalapi.BlockGHOSTDAGDataHashPair + if preallocate { + cache = make(map[lruKey][]*externalapi.BlockGHOSTDAGDataHashPair, capacity+1) + } else { + cache = make(map[lruKey][]*externalapi.BlockGHOSTDAGDataHashPair) + } + return &LRUCache{ + cache: cache, + capacity: capacity, + } +} + +// Add adds an entry to the LRUCache +func (c *LRUCache) Add(blockHash *externalapi.DomainHash, windowSize int, value []*externalapi.BlockGHOSTDAGDataHashPair) { + key := newKey(blockHash, windowSize) + c.cache[key] = value + + if len(c.cache) > c.capacity { + c.evictRandom() + } +} + +// Get returns the entry for the given key, or (nil, false) otherwise +func (c *LRUCache) Get(blockHash *externalapi.DomainHash, windowSize int) ([]*externalapi.BlockGHOSTDAGDataHashPair, bool) { + key := newKey(blockHash, windowSize) + value, ok := c.cache[key] + if !ok { + return nil, false + } + return value, true +} + +// Has returns whether the LRUCache contains the given key +func (c *LRUCache) Has(blockHash *externalapi.DomainHash, windowSize int) bool { + key := newKey(blockHash, windowSize) + _, ok := c.cache[key] + return ok +} + +// Remove removes the entry for the the given key. Does nothing if +// the entry does not exist +func (c *LRUCache) Remove(blockHash *externalapi.DomainHash, windowSize int) { + key := newKey(blockHash, windowSize) + delete(c.cache, key) +} + +func (c *LRUCache) evictRandom() { + var keyToEvict lruKey + for key := range c.cache { + keyToEvict = key + break + } + c.Remove(&keyToEvict.blockHash, keyToEvict.windowSize) +}