server: Move reading KV index inside scheduleCompaction function

Makes it easier to test hash match between scheduleCompaction and
HashByRev.

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2022-05-19 10:22:46 +02:00
parent 638ca1006a
commit 08a43178fa
4 changed files with 22 additions and 14 deletions

View File

@ -229,10 +229,7 @@ func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
s.compactBarrier(ctx, ch)
return
}
start := time.Now()
keep := s.kvindex.Compact(rev)
indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
if err := s.scheduleCompaction(rev, keep); err != nil {
if err := s.scheduleCompaction(rev); err != nil {
s.lg.Warn("Failed compaction", zap.Error(err))
s.compactBarrier(context.TODO(), ch)
return

View File

@ -23,8 +23,12 @@ import (
"go.uber.org/zap"
)
func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) error {
func (s *store) scheduleCompaction(compactMainRev int64) error {
totalStart := time.Now()
keep := s.kvindex.Compact(compactMainRev)
indexCompactionPauseMs.Observe(float64(time.Since(totalStart) / time.Millisecond))
totalStart = time.Now()
defer func() { dbCompactionTotalMs.Observe(float64(time.Since(totalStart) / time.Millisecond)) }()
keyCompactions := 0
defer func() { dbCompactionKeysCounter.Add(float64(keyCompactions)) }()

View File

@ -69,6 +69,10 @@ func TestScheduleCompaction(t *testing.T) {
for i, tt := range tests {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
fi := newFakeIndex()
fi.indexCompactRespc <- tt.keep
s.kvindex = fi
tx := s.b.BatchTx()
tx.Lock()
@ -79,7 +83,7 @@ func TestScheduleCompaction(t *testing.T) {
}
tx.Unlock()
err := s.scheduleCompaction(tt.rev, tt.keep)
err := s.scheduleCompaction(tt.rev)
if err != nil {
t.Fatal(err)
}

View File

@ -849,18 +849,11 @@ func newFakeStore(lg *zap.Logger) *store {
b := &fakeBackend{&fakeBatchTx{
Recorder: &testutil.RecorderBuffered{},
rangeRespc: make(chan rangeResp, 5)}}
fi := &fakeIndex{
Recorder: &testutil.RecorderBuffered{},
indexGetRespc: make(chan indexGetResp, 1),
indexRangeRespc: make(chan indexRangeResp, 1),
indexRangeEventsRespc: make(chan indexRangeEventsResp, 1),
indexCompactRespc: make(chan map[revision]struct{}, 1),
}
s := &store{
cfg: StoreConfig{CompactionBatchLimit: 10000},
b: b,
le: &lease.FakeLessor{},
kvindex: fi,
kvindex: newFakeIndex(),
currentRev: 0,
compactMainRev: -1,
fifoSched: schedule.NewFIFOScheduler(),
@ -871,6 +864,16 @@ func newFakeStore(lg *zap.Logger) *store {
return s
}
func newFakeIndex() *fakeIndex {
return &fakeIndex{
Recorder: &testutil.RecorderBuffered{},
indexGetRespc: make(chan indexGetResp, 1),
indexRangeRespc: make(chan indexRangeResp, 1),
indexRangeEventsRespc: make(chan indexRangeEventsResp, 1),
indexCompactRespc: make(chan map[revision]struct{}, 1),
}
}
type rangeResp struct {
keys [][]byte
vals [][]byte