mvcc: add HashByRev to kv.go

HashByRev computes the hash of all MVCC keys up to a given revision.
This commit is contained in:
fanmin shi 2017-07-14 16:44:16 -07:00
parent fbb75d24a4
commit deca9879c2
2 changed files with 76 additions and 2 deletions

View File

@ -107,10 +107,12 @@ type KV interface {
// Write creates a write transaction.
Write() TxnWrite
// Hash retrieves the hash of KV state and revision.
// This method is designed for consistency checking purposes.
// Hash computes the hash of the KV's backend.
Hash() (hash uint32, revision int64, err error)
// HashByRev computes the hash of all MVCC revisions up to a given revision.
HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error)
// Compact frees all superseded keys with revisions less than rev.
Compact(rev int64) (<-chan struct{}, error)

View File

@ -17,6 +17,7 @@ package mvcc
import (
"encoding/binary"
"errors"
"hash/crc32"
"math"
"sync"
"sync/atomic"
@ -44,6 +45,8 @@ var (
ErrClosed = errors.New("mvcc: closed")
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
emptyKeep = make(map[revision]struct{})
)
const (
@ -98,6 +101,12 @@ type store struct {
fifoSched schedule.Scheduler
stopc chan struct{}
// keepMu protects keep
keepMu sync.RWMutex
// keep contains all revisions <= compactMainRev to be kept for the
// ongoing compaction; nil otherwise.
keep map[revision]struct{}
}
// NewStore returns a new store. It is useful to create a store inside
@ -160,6 +169,63 @@ func (s *store) Hash() (hash uint32, revision int64, err error) {
return h, s.currentRev, err
}
func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
s.mu.Lock()
s.revMu.RLock()
compactRev, currentRev = s.compactMainRev, s.currentRev
s.revMu.RUnlock()
if rev > 0 && rev <= compactRev {
s.mu.Unlock()
return 0, 0, compactRev, ErrCompacted
} else if rev > 0 && rev > currentRev {
s.mu.Unlock()
return 0, currentRev, 0, ErrFutureRev
}
s.keepMu.Lock()
if s.keep == nil {
// ForceCommit ensures that txnRead begins after backend
// has committed all the changes from the prev completed compaction.
s.b.ForceCommit()
s.keep = emptyKeep
}
keep := s.keep
s.keepMu.Unlock()
tx := s.b.ReadTx()
tx.Lock()
defer tx.Unlock()
s.mu.Unlock()
if rev == 0 {
rev = currentRev
}
upper := revision{main: rev + 1}
lower := revision{main: compactRev + 1}
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
h.Write(keyBucketName)
err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error {
kr := bytesToRev(k)
if !upper.GreaterThan(kr) {
return nil
}
// skip revisions that are scheduled for deletion
// due to compacting; don't skip if there isn't one.
if lower.GreaterThan(kr) && len(keep) > 0 {
if _, ok := keep[kr]; !ok {
return nil
}
}
h.Write(k)
h.Write(v)
return nil
})
return h.Sum32(), currentRev, compactRev, err
}
func (s *store) Compact(rev int64) (<-chan struct{}, error) {
s.mu.Lock()
defer s.mu.Unlock()
@ -191,6 +257,9 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
s.b.ForceCommit()
keep := s.kvindex.Compact(rev)
s.keepMu.Lock()
s.keep = keep
s.keepMu.Unlock()
ch := make(chan struct{})
var j = func(ctx context.Context) {
if ctx.Err() != nil {
@ -202,6 +271,9 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
return
}
close(ch)
s.keepMu.Lock()
s.keep = nil
s.keepMu.Unlock()
}
s.fifoSched.Schedule(j)