mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: Move reading KV index inside scheduleCompaction function
Makes it easier to test hash match between scheduleCompaction and HashByRev. Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
@@ -233,10 +233,7 @@ func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
|
||||
s.compactBarrier(ctx, ch)
|
||||
return
|
||||
}
|
||||
start := time.Now()
|
||||
keep := s.kvindex.Compact(rev)
|
||||
indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
|
||||
if err := s.scheduleCompaction(rev, keep); err != nil {
|
||||
if err := s.scheduleCompaction(rev); err != nil {
|
||||
s.lg.Warn("Failed compaction", zap.Error(err))
|
||||
s.compactBarrier(context.TODO(), ch)
|
||||
return
|
||||
|
||||
@@ -23,8 +23,12 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) error {
|
||||
func (s *store) scheduleCompaction(compactMainRev int64) error {
|
||||
totalStart := time.Now()
|
||||
keep := s.kvindex.Compact(compactMainRev)
|
||||
indexCompactionPauseMs.Observe(float64(time.Since(totalStart) / time.Millisecond))
|
||||
|
||||
totalStart = time.Now()
|
||||
defer func() { dbCompactionTotalMs.Observe(float64(time.Since(totalStart) / time.Millisecond)) }()
|
||||
keyCompactions := 0
|
||||
defer func() { dbCompactionKeysCounter.Add(float64(keyCompactions)) }()
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/buckets"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
func TestScheduleCompaction(t *testing.T) {
|
||||
@@ -68,7 +69,11 @@ func TestScheduleCompaction(t *testing.T) {
|
||||
}
|
||||
for i, tt := range tests {
|
||||
b, tmpPath := betesting.NewDefaultTmpBackend(t)
|
||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
fi := newFakeIndex()
|
||||
fi.indexCompactRespc <- tt.keep
|
||||
s.kvindex = fi
|
||||
|
||||
tx := s.b.BatchTx()
|
||||
|
||||
tx.Lock()
|
||||
@@ -79,7 +84,7 @@ func TestScheduleCompaction(t *testing.T) {
|
||||
}
|
||||
tx.Unlock()
|
||||
|
||||
err := s.scheduleCompaction(tt.rev, tt.keep)
|
||||
err := s.scheduleCompaction(tt.rev)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -839,18 +839,11 @@ func newFakeStore() *store {
|
||||
b := &fakeBackend{&fakeBatchTx{
|
||||
Recorder: &testutil.RecorderBuffered{},
|
||||
rangeRespc: make(chan rangeResp, 5)}}
|
||||
fi := &fakeIndex{
|
||||
Recorder: &testutil.RecorderBuffered{},
|
||||
indexGetRespc: make(chan indexGetResp, 1),
|
||||
indexRangeRespc: make(chan indexRangeResp, 1),
|
||||
indexRangeEventsRespc: make(chan indexRangeEventsResp, 1),
|
||||
indexCompactRespc: make(chan map[revision]struct{}, 1),
|
||||
}
|
||||
s := &store{
|
||||
cfg: StoreConfig{CompactionBatchLimit: 10000},
|
||||
b: b,
|
||||
le: &lease.FakeLessor{},
|
||||
kvindex: fi,
|
||||
kvindex: newFakeIndex(),
|
||||
currentRev: 0,
|
||||
compactMainRev: -1,
|
||||
fifoSched: schedule.NewFIFOScheduler(),
|
||||
@@ -861,6 +854,16 @@ func newFakeStore() *store {
|
||||
return s
|
||||
}
|
||||
|
||||
func newFakeIndex() *fakeIndex {
|
||||
return &fakeIndex{
|
||||
Recorder: &testutil.RecorderBuffered{},
|
||||
indexGetRespc: make(chan indexGetResp, 1),
|
||||
indexRangeRespc: make(chan indexRangeResp, 1),
|
||||
indexRangeEventsRespc: make(chan indexRangeEventsResp, 1),
|
||||
indexCompactRespc: make(chan map[revision]struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
type rangeResp struct {
|
||||
keys [][]byte
|
||||
vals [][]byte
|
||||
|
||||
Reference in New Issue
Block a user