From deca9879c2694218fb7d65240724bd2d6f03d970 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Fri, 14 Jul 2017 16:44:16 -0700 Subject: [PATCH] mvcc: add HashByRev to kv.go HashByRev computes the hash of all MVCC keys up to a given revision. --- mvcc/kv.go | 6 +++-- mvcc/kvstore.go | 72 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 2 deletions(-) diff --git a/mvcc/kv.go b/mvcc/kv.go index 6636347aa..2dad3ad8e 100644 --- a/mvcc/kv.go +++ b/mvcc/kv.go @@ -107,10 +107,12 @@ type KV interface { // Write creates a write transaction. Write() TxnWrite - // Hash retrieves the hash of KV state and revision. - // This method is designed for consistency checking purposes. + // Hash computes the hash of the KV's backend. Hash() (hash uint32, revision int64, err error) + // HashByRev computes the hash of all MVCC revisions up to a given revision. + HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) + // Compact frees all superseded keys with revisions less than rev. Compact(rev int64) (<-chan struct{}, error) diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 34fc76172..618ca0786 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -17,6 +17,7 @@ package mvcc import ( "encoding/binary" "errors" + "hash/crc32" "math" "sync" "sync/atomic" @@ -44,6 +45,8 @@ var ( ErrClosed = errors.New("mvcc: closed") plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc") + + emptyKeep = make(map[revision]struct{}) ) const ( @@ -98,6 +101,12 @@ type store struct { fifoSched schedule.Scheduler stopc chan struct{} + + // keepMu protects keep + keepMu sync.RWMutex + // keep contains all revisions <= compactMainRev to be kept for the + // ongoing compaction; nil otherwise. + keep map[revision]struct{} } // NewStore returns a new store. It is useful to create a store inside @@ -160,6 +169,63 @@ func (s *store) Hash() (hash uint32, revision int64, err error) { return h, s.currentRev, err } +func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) { + s.mu.Lock() + s.revMu.RLock() + compactRev, currentRev = s.compactMainRev, s.currentRev + s.revMu.RUnlock() + + if rev > 0 && rev <= compactRev { + s.mu.Unlock() + return 0, 0, compactRev, ErrCompacted + } else if rev > 0 && rev > currentRev { + s.mu.Unlock() + return 0, currentRev, 0, ErrFutureRev + } + + s.keepMu.Lock() + if s.keep == nil { + // ForceCommit ensures that txnRead begins after backend + // has committed all the changes from the prev completed compaction. + s.b.ForceCommit() + s.keep = emptyKeep + } + keep := s.keep + s.keepMu.Unlock() + + tx := s.b.ReadTx() + tx.Lock() + defer tx.Unlock() + s.mu.Unlock() + + if rev == 0 { + rev = currentRev + } + + upper := revision{main: rev + 1} + lower := revision{main: compactRev + 1} + h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + + h.Write(keyBucketName) + err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error { + kr := bytesToRev(k) + if !upper.GreaterThan(kr) { + return nil + } + // skip revisions that are scheduled for deletion + // due to compacting; don't skip if there isn't one. + if lower.GreaterThan(kr) && len(keep) > 0 { + if _, ok := keep[kr]; !ok { + return nil + } + } + h.Write(k) + h.Write(v) + return nil + }) + return h.Sum32(), currentRev, compactRev, err +} + func (s *store) Compact(rev int64) (<-chan struct{}, error) { s.mu.Lock() defer s.mu.Unlock() @@ -191,6 +257,9 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) { s.b.ForceCommit() keep := s.kvindex.Compact(rev) + s.keepMu.Lock() + s.keep = keep + s.keepMu.Unlock() ch := make(chan struct{}) var j = func(ctx context.Context) { if ctx.Err() != nil { @@ -202,6 +271,9 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) { return } close(ch) + s.keepMu.Lock() + s.keep = nil + s.keepMu.Unlock() } s.fifoSched.Schedule(j)