mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: Calculate hash during compaction
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
f1a759a2c8
commit
21e5d5d2b6
@ -137,6 +137,36 @@ type kvHash struct {
|
|||||||
revision int64
|
revision int64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Change this to fuzz test
|
||||||
|
func TestCompactionHash(t *testing.T) {
|
||||||
|
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||||
|
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||||
|
|
||||||
|
var totalRevisions int64 = 1210
|
||||||
|
assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions)
|
||||||
|
assert.Less(t, int64(compactionCycle*10), totalRevisions)
|
||||||
|
var rev int64
|
||||||
|
for ; rev < totalRevisions; rev += compactionCycle {
|
||||||
|
testCompactionHash(t, s, rev, rev+compactionCycle)
|
||||||
|
}
|
||||||
|
testCompactionHash(t, s, rev, rev+totalRevisions)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testCompactionHash(t *testing.T, s *store, start, stop int64) {
|
||||||
|
for i := start; i <= stop; i++ {
|
||||||
|
s.Put([]byte(pickKey(i)), []byte(fmt.Sprint(i)), 0)
|
||||||
|
}
|
||||||
|
hash1, _, _, err := s.HashByRev(stop)
|
||||||
|
assert.NoError(t, err, "error on rev %v", stop)
|
||||||
|
|
||||||
|
_, prevCompactRev, err := s.updateCompactRev(stop)
|
||||||
|
assert.NoError(t, err, "error on rev %v", stop)
|
||||||
|
|
||||||
|
hash2, err := s.scheduleCompaction(stop, prevCompactRev)
|
||||||
|
assert.NoError(t, err, "error on rev %v", stop)
|
||||||
|
assert.Equal(t, hash1, hash2, "hashes do not match on rev %v", stop)
|
||||||
|
}
|
||||||
|
|
||||||
func pickKey(i int64) string {
|
func pickKey(i int64) string {
|
||||||
if i%(compactionCycle*2) == 30 {
|
if i%(compactionCycle*2) == 30 {
|
||||||
return "zenek"
|
return "zenek"
|
||||||
|
@ -189,26 +189,25 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev
|
|||||||
tx.RLock()
|
tx.RLock()
|
||||||
defer tx.RUnlock()
|
defer tx.RUnlock()
|
||||||
s.mu.RUnlock()
|
s.mu.RUnlock()
|
||||||
|
|
||||||
hash, err = unsafeHashByRev(tx, revision{main: compactRev + 1}, revision{main: rev + 1}, keep)
|
hash, err = unsafeHashByRev(tx, revision{main: compactRev + 1}, revision{main: rev + 1}, keep)
|
||||||
hashRevSec.Observe(time.Since(start).Seconds())
|
hashRevSec.Observe(time.Since(start).Seconds())
|
||||||
return hash, currentRev, compactRev, err
|
return hash, currentRev, compactRev, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {
|
func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
|
||||||
s.revMu.Lock()
|
s.revMu.Lock()
|
||||||
if rev <= s.compactMainRev {
|
if rev <= s.compactMainRev {
|
||||||
ch := make(chan struct{})
|
ch := make(chan struct{})
|
||||||
f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
|
f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
|
||||||
s.fifoSched.Schedule(f)
|
s.fifoSched.Schedule(f)
|
||||||
s.revMu.Unlock()
|
s.revMu.Unlock()
|
||||||
return ch, ErrCompacted
|
return ch, 0, ErrCompacted
|
||||||
}
|
}
|
||||||
if rev > s.currentRev {
|
if rev > s.currentRev {
|
||||||
s.revMu.Unlock()
|
s.revMu.Unlock()
|
||||||
return nil, ErrFutureRev
|
return nil, 0, ErrFutureRev
|
||||||
}
|
}
|
||||||
|
compactMainRev := s.compactMainRev
|
||||||
s.compactMainRev = rev
|
s.compactMainRev = rev
|
||||||
|
|
||||||
rbytes := newRevBytes()
|
rbytes := newRevBytes()
|
||||||
@ -223,17 +222,17 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {
|
|||||||
|
|
||||||
s.revMu.Unlock()
|
s.revMu.Unlock()
|
||||||
|
|
||||||
return nil, nil
|
return nil, compactMainRev, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
|
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) {
|
||||||
ch := make(chan struct{})
|
ch := make(chan struct{})
|
||||||
var j = func(ctx context.Context) {
|
var j = func(ctx context.Context) {
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
s.compactBarrier(ctx, ch)
|
s.compactBarrier(ctx, ch)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := s.scheduleCompaction(rev); err != nil {
|
if _, err := s.scheduleCompaction(rev, prevCompactRev); err != nil {
|
||||||
s.lg.Warn("Failed compaction", zap.Error(err))
|
s.lg.Warn("Failed compaction", zap.Error(err))
|
||||||
s.compactBarrier(context.TODO(), ch)
|
s.compactBarrier(context.TODO(), ch)
|
||||||
return
|
return
|
||||||
@ -247,18 +246,18 @@ func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
|
func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
|
||||||
ch, err := s.updateCompactRev(rev)
|
ch, prevCompactRev, err := s.updateCompactRev(rev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ch, err
|
return ch, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.compact(traceutil.TODO(), rev)
|
return s.compact(traceutil.TODO(), rev, prevCompactRev)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
|
func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
|
|
||||||
ch, err := s.updateCompactRev(rev)
|
ch, prevCompactRev, err := s.updateCompactRev(rev)
|
||||||
trace.Step("check and update compact revision")
|
trace.Step("check and update compact revision")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
@ -266,7 +265,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
|
|||||||
}
|
}
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
||||||
return s.compact(trace, rev)
|
return s.compact(trace, rev, prevCompactRev)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) Commit() {
|
func (s *store) Commit() {
|
||||||
|
@ -23,7 +23,7 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *store) scheduleCompaction(compactMainRev int64) error {
|
func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (uint32, error) {
|
||||||
totalStart := time.Now()
|
totalStart := time.Now()
|
||||||
keep := s.kvindex.Compact(compactMainRev)
|
keep := s.kvindex.Compact(compactMainRev)
|
||||||
indexCompactionPauseMs.Observe(float64(time.Since(totalStart) / time.Millisecond))
|
indexCompactionPauseMs.Observe(float64(time.Since(totalStart) / time.Millisecond))
|
||||||
@ -37,6 +37,8 @@ func (s *store) scheduleCompaction(compactMainRev int64) error {
|
|||||||
end := make([]byte, 8)
|
end := make([]byte, 8)
|
||||||
binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
|
binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
|
||||||
|
|
||||||
|
batchNum := s.cfg.CompactionBatchLimit
|
||||||
|
h := newKVHasher(revision{main: prevCompactRev + 1}, revision{main: compactMainRev + 1}, keep)
|
||||||
last := make([]byte, 8+1+8)
|
last := make([]byte, 8+1+8)
|
||||||
for {
|
for {
|
||||||
var rev revision
|
var rev revision
|
||||||
@ -45,13 +47,14 @@ func (s *store) scheduleCompaction(compactMainRev int64) error {
|
|||||||
|
|
||||||
tx := s.b.BatchTx()
|
tx := s.b.BatchTx()
|
||||||
tx.LockOutsideApply()
|
tx.LockOutsideApply()
|
||||||
keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(s.cfg.CompactionBatchLimit))
|
keys, values := tx.UnsafeRange(buckets.Key, last, end, int64(batchNum))
|
||||||
for _, key := range keys {
|
for i := range keys {
|
||||||
rev = bytesToRev(key)
|
rev = bytesToRev(keys[i])
|
||||||
if _, ok := keep[rev]; !ok {
|
if _, ok := keep[rev]; !ok {
|
||||||
tx.UnsafeDelete(buckets.Key, key)
|
tx.UnsafeDelete(buckets.Key, keys[i])
|
||||||
keyCompactions++
|
keyCompactions++
|
||||||
}
|
}
|
||||||
|
h.WriteKeyValue(keys[i], values[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(keys) < s.cfg.CompactionBatchLimit {
|
if len(keys) < s.cfg.CompactionBatchLimit {
|
||||||
@ -59,12 +62,14 @@ func (s *store) scheduleCompaction(compactMainRev int64) error {
|
|||||||
revToBytes(revision{main: compactMainRev}, rbytes)
|
revToBytes(revision{main: compactMainRev}, rbytes)
|
||||||
tx.UnsafePut(buckets.Meta, finishedCompactKeyName, rbytes)
|
tx.UnsafePut(buckets.Meta, finishedCompactKeyName, rbytes)
|
||||||
tx.Unlock()
|
tx.Unlock()
|
||||||
|
hash := h.Hash()
|
||||||
s.lg.Info(
|
s.lg.Info(
|
||||||
"finished scheduled compaction",
|
"finished scheduled compaction",
|
||||||
zap.Int64("compact-revision", compactMainRev),
|
zap.Int64("compact-revision", compactMainRev),
|
||||||
zap.Duration("took", time.Since(totalStart)),
|
zap.Duration("took", time.Since(totalStart)),
|
||||||
|
zap.Uint32("hash", hash),
|
||||||
)
|
)
|
||||||
return nil
|
return hash, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// update last
|
// update last
|
||||||
@ -77,7 +82,7 @@ func (s *store) scheduleCompaction(compactMainRev int64) error {
|
|||||||
select {
|
select {
|
||||||
case <-time.After(10 * time.Millisecond):
|
case <-time.After(10 * time.Millisecond):
|
||||||
case <-s.stopc:
|
case <-s.stopc:
|
||||||
return fmt.Errorf("interrupted due to stop signal")
|
return 0, fmt.Errorf("interrupted due to stop signal")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -84,9 +84,9 @@ func TestScheduleCompaction(t *testing.T) {
|
|||||||
}
|
}
|
||||||
tx.Unlock()
|
tx.Unlock()
|
||||||
|
|
||||||
err := s.scheduleCompaction(tt.rev)
|
_, err := s.scheduleCompaction(tt.rev, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
tx.Lock()
|
tx.Lock()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user