From 21e5d5d2b6427c93e4900c944712b3175e6327ab Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 19 May 2022 12:16:52 +0200 Subject: [PATCH] server: Calculate hash during compaction Signed-off-by: Marek Siarkowicz --- server/mvcc/hash_test.go | 30 ++++++++++++++++++++++++++ server/mvcc/kvstore.go | 23 ++++++++++---------- server/mvcc/kvstore_compaction.go | 19 ++++++++++------ server/mvcc/kvstore_compaction_test.go | 4 ++-- 4 files changed, 55 insertions(+), 21 deletions(-) diff --git a/server/mvcc/hash_test.go b/server/mvcc/hash_test.go index f0c6aecb9..651551c9b 100644 --- a/server/mvcc/hash_test.go +++ b/server/mvcc/hash_test.go @@ -137,6 +137,36 @@ type kvHash struct { revision int64 } +// TODO: Change this to fuzz test +func TestCompactionHash(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + + var totalRevisions int64 = 1210 + assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions) + assert.Less(t, int64(compactionCycle*10), totalRevisions) + var rev int64 + for ; rev < totalRevisions; rev += compactionCycle { + testCompactionHash(t, s, rev, rev+compactionCycle) + } + testCompactionHash(t, s, rev, rev+totalRevisions) +} + +func testCompactionHash(t *testing.T, s *store, start, stop int64) { + for i := start; i <= stop; i++ { + s.Put([]byte(pickKey(i)), []byte(fmt.Sprint(i)), 0) + } + hash1, _, _, err := s.HashByRev(stop) + assert.NoError(t, err, "error on rev %v", stop) + + _, prevCompactRev, err := s.updateCompactRev(stop) + assert.NoError(t, err, "error on rev %v", stop) + + hash2, err := s.scheduleCompaction(stop, prevCompactRev) + assert.NoError(t, err, "error on rev %v", stop) + assert.Equal(t, hash1, hash2, "hashes do not match on rev %v", stop) +} + func pickKey(i int64) string { if i%(compactionCycle*2) == 30 { return "zenek" diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index 9d971fa8d..15eb4e6b7 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -189,26 +189,25 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev tx.RLock() defer tx.RUnlock() s.mu.RUnlock() - hash, err = unsafeHashByRev(tx, revision{main: compactRev + 1}, revision{main: rev + 1}, keep) hashRevSec.Observe(time.Since(start).Seconds()) return hash, currentRev, compactRev, err } -func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) { +func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) { s.revMu.Lock() if rev <= s.compactMainRev { ch := make(chan struct{}) f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } s.fifoSched.Schedule(f) s.revMu.Unlock() - return ch, ErrCompacted + return ch, 0, ErrCompacted } if rev > s.currentRev { s.revMu.Unlock() - return nil, ErrFutureRev + return nil, 0, ErrFutureRev } - + compactMainRev := s.compactMainRev s.compactMainRev = rev rbytes := newRevBytes() @@ -223,17 +222,17 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) { s.revMu.Unlock() - return nil, nil + return nil, compactMainRev, nil } -func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { +func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) { ch := make(chan struct{}) var j = func(ctx context.Context) { if ctx.Err() != nil { s.compactBarrier(ctx, ch) return } - if err := s.scheduleCompaction(rev); err != nil { + if _, err := s.scheduleCompaction(rev, prevCompactRev); err != nil { s.lg.Warn("Failed compaction", zap.Error(err)) s.compactBarrier(context.TODO(), ch) return @@ -247,18 +246,18 @@ func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err } func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) { - ch, err := s.updateCompactRev(rev) + ch, prevCompactRev, err := s.updateCompactRev(rev) if err != nil { return ch, err } - return s.compact(traceutil.TODO(), rev) + return s.compact(traceutil.TODO(), rev, prevCompactRev) } func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { s.mu.Lock() - ch, err := s.updateCompactRev(rev) + ch, prevCompactRev, err := s.updateCompactRev(rev) trace.Step("check and update compact revision") if err != nil { s.mu.Unlock() @@ -266,7 +265,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err } s.mu.Unlock() - return s.compact(trace, rev) + return s.compact(trace, rev, prevCompactRev) } func (s *store) Commit() { diff --git a/server/mvcc/kvstore_compaction.go b/server/mvcc/kvstore_compaction.go index b47506dd5..c07b9e96e 100644 --- a/server/mvcc/kvstore_compaction.go +++ b/server/mvcc/kvstore_compaction.go @@ -23,7 +23,7 @@ import ( "go.uber.org/zap" ) -func (s *store) scheduleCompaction(compactMainRev int64) error { +func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (uint32, error) { totalStart := time.Now() keep := s.kvindex.Compact(compactMainRev) indexCompactionPauseMs.Observe(float64(time.Since(totalStart) / time.Millisecond)) @@ -37,6 +37,8 @@ func (s *store) scheduleCompaction(compactMainRev int64) error { end := make([]byte, 8) binary.BigEndian.PutUint64(end, uint64(compactMainRev+1)) + batchNum := s.cfg.CompactionBatchLimit + h := newKVHasher(revision{main: prevCompactRev + 1}, revision{main: compactMainRev + 1}, keep) last := make([]byte, 8+1+8) for { var rev revision @@ -45,13 +47,14 @@ func (s *store) scheduleCompaction(compactMainRev int64) error { tx := s.b.BatchTx() tx.LockOutsideApply() - keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(s.cfg.CompactionBatchLimit)) - for _, key := range keys { - rev = bytesToRev(key) + keys, values := tx.UnsafeRange(buckets.Key, last, end, int64(batchNum)) + for i := range keys { + rev = bytesToRev(keys[i]) if _, ok := keep[rev]; !ok { - tx.UnsafeDelete(buckets.Key, key) + tx.UnsafeDelete(buckets.Key, keys[i]) keyCompactions++ } + h.WriteKeyValue(keys[i], values[i]) } if len(keys) < s.cfg.CompactionBatchLimit { @@ -59,12 +62,14 @@ func (s *store) scheduleCompaction(compactMainRev int64) error { revToBytes(revision{main: compactMainRev}, rbytes) tx.UnsafePut(buckets.Meta, finishedCompactKeyName, rbytes) tx.Unlock() + hash := h.Hash() s.lg.Info( "finished scheduled compaction", zap.Int64("compact-revision", compactMainRev), zap.Duration("took", time.Since(totalStart)), + zap.Uint32("hash", hash), ) - return nil + return hash, nil } // update last @@ -77,7 +82,7 @@ func (s *store) scheduleCompaction(compactMainRev int64) error { select { case <-time.After(10 * time.Millisecond): case <-s.stopc: - return fmt.Errorf("interrupted due to stop signal") + return 0, fmt.Errorf("interrupted due to stop signal") } } } diff --git a/server/mvcc/kvstore_compaction_test.go b/server/mvcc/kvstore_compaction_test.go index 8c163b542..8ff36c1f8 100644 --- a/server/mvcc/kvstore_compaction_test.go +++ b/server/mvcc/kvstore_compaction_test.go @@ -84,9 +84,9 @@ func TestScheduleCompaction(t *testing.T) { } tx.Unlock() - err := s.scheduleCompaction(tt.rev) + _, err := s.scheduleCompaction(tt.rev, 0) if err != nil { - t.Fatal(err) + t.Error(err) } tx.Lock()