mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: Return error from scheduleCompaction
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
@@ -236,7 +236,8 @@ func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
|
|||||||
start := time.Now()
|
start := time.Now()
|
||||||
keep := s.kvindex.Compact(rev)
|
keep := s.kvindex.Compact(rev)
|
||||||
indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
|
indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
|
||||||
if !s.scheduleCompaction(rev, keep) {
|
if err := s.scheduleCompaction(rev, keep); err != nil {
|
||||||
|
s.lg.Warn("Failed compaction", zap.Error(err))
|
||||||
s.compactBarrier(context.TODO(), ch)
|
s.compactBarrier(context.TODO(), ch)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,13 +16,14 @@ package mvcc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.etcd.io/etcd/server/v3/mvcc/buckets"
|
"go.etcd.io/etcd/server/v3/mvcc/buckets"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) bool {
|
func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) error {
|
||||||
totalStart := time.Now()
|
totalStart := time.Now()
|
||||||
defer func() { dbCompactionTotalMs.Observe(float64(time.Since(totalStart) / time.Millisecond)) }()
|
defer func() { dbCompactionTotalMs.Observe(float64(time.Since(totalStart) / time.Millisecond)) }()
|
||||||
keyCompactions := 0
|
keyCompactions := 0
|
||||||
@@ -59,7 +60,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
|
|||||||
zap.Int64("compact-revision", compactMainRev),
|
zap.Int64("compact-revision", compactMainRev),
|
||||||
zap.Duration("took", time.Since(totalStart)),
|
zap.Duration("took", time.Since(totalStart)),
|
||||||
)
|
)
|
||||||
return true
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// update last
|
// update last
|
||||||
@@ -72,7 +73,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
|
|||||||
select {
|
select {
|
||||||
case <-time.After(10 * time.Millisecond):
|
case <-time.After(10 * time.Millisecond):
|
||||||
case <-s.stopc:
|
case <-s.stopc:
|
||||||
return false
|
return fmt.Errorf("interrupted due to stop signal")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -79,7 +79,10 @@ func TestScheduleCompaction(t *testing.T) {
|
|||||||
}
|
}
|
||||||
tx.Unlock()
|
tx.Unlock()
|
||||||
|
|
||||||
s.scheduleCompaction(tt.rev, tt.keep)
|
err := s.scheduleCompaction(tt.rev, tt.keep)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
tx.Lock()
|
tx.Lock()
|
||||||
for _, rev := range tt.wrevs {
|
for _, rev := range tt.wrevs {
|
||||||
|
|||||||
Reference in New Issue
Block a user