Improved staging shard performance (#2034)

using map instead of growing array as shard id can be 1000+ in some cases

Co-authored-by: Ori Newman <orinewman1@gmail.com>
This commit is contained in:
biospb 2022-05-04 21:24:25 +03:00 committed by GitHub
parent 2b5202be7a
commit 1660cf0cf1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,6 +1,8 @@
package model package model
import "github.com/pkg/errors" import (
"github.com/pkg/errors"
)
// StagingShard is an interface that enables every store to have it's own Commit logic // StagingShard is an interface that enables every store to have it's own Commit logic
// See StagingArea for more details // See StagingArea for more details
@ -19,14 +21,14 @@ type StagingShardID uint64
// When the StagingArea is being Committed, it goes over all it's shards, and commits those one-by-one. // When the StagingArea is being Committed, it goes over all it's shards, and commits those one-by-one.
// Since Commit happens in a DatabaseTransaction, a StagingArea is atomic. // Since Commit happens in a DatabaseTransaction, a StagingArea is atomic.
type StagingArea struct { type StagingArea struct {
shards []StagingShard shards map[StagingShardID]StagingShard
isCommitted bool isCommitted bool
} }
// NewStagingArea creates a new, empty staging area. // NewStagingArea creates a new, empty staging area.
func NewStagingArea() *StagingArea { func NewStagingArea() *StagingArea {
return &StagingArea{ return &StagingArea{
shards: []StagingShard{}, shards: make(map[StagingShardID]StagingShard),
isCommitted: false, isCommitted: false,
} }
} }
@ -34,14 +36,12 @@ func NewStagingArea() *StagingArea {
// GetOrCreateShard attempts to retrieve a shard with the given name. // GetOrCreateShard attempts to retrieve a shard with the given name.
// If it does not exist - a new shard is created using `createFunc`. // If it does not exist - a new shard is created using `createFunc`.
func (sa *StagingArea) GetOrCreateShard(shardID StagingShardID, createFunc func() StagingShard) StagingShard { func (sa *StagingArea) GetOrCreateShard(shardID StagingShardID, createFunc func() StagingShard) StagingShard {
for uint64(len(sa.shards)) <= uint64(shardID) { shard, ok := sa.shards[shardID]
sa.shards = append(sa.shards, nil) if !ok {
shard = createFunc()
sa.shards[shardID] = shard
} }
if sa.shards[shardID] == nil { return shard
sa.shards[shardID] = createFunc()
}
return sa.shards[shardID]
} }
// Commit goes over all the Shards in the StagingArea and commits them, inside the provided database transaction. // Commit goes over all the Shards in the StagingArea and commits them, inside the provided database transaction.
@ -52,9 +52,6 @@ func (sa *StagingArea) Commit(dbTx DBTransaction) error {
} }
for _, shard := range sa.shards { for _, shard := range sa.shards {
if shard == nil { // since sa.shards is an array and not a map, some shard slots might be empty.
continue
}
err := shard.Commit(dbTx) err := shard.Commit(dbTx)
if err != nil { if err != nil {
return err return err