mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
mvcc: HashKV gets keep from kvindex.Keep
This commit is contained in:
parent
4c2c5b0084
commit
bb86c327e2
@ -45,8 +45,6 @@ var (
|
|||||||
ErrClosed = errors.New("mvcc: closed")
|
ErrClosed = errors.New("mvcc: closed")
|
||||||
|
|
||||||
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
|
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
|
||||||
|
|
||||||
emptyKeep = make(map[revision]struct{})
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -101,12 +99,6 @@ type store struct {
|
|||||||
fifoSched schedule.Scheduler
|
fifoSched schedule.Scheduler
|
||||||
|
|
||||||
stopc chan struct{}
|
stopc chan struct{}
|
||||||
|
|
||||||
// keepMu protects keep
|
|
||||||
keepMu sync.RWMutex
|
|
||||||
// keep contains all revisions <= compactMainRev to be kept for the
|
|
||||||
// ongoing compaction; nil otherwise.
|
|
||||||
keep map[revision]struct{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStore returns a new store. It is useful to create a store inside
|
// NewStore returns a new store. It is useful to create a store inside
|
||||||
@ -170,33 +162,25 @@ func (s *store) Hash() (hash uint32, revision int64, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
|
func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
|
||||||
s.mu.Lock()
|
s.mu.RLock()
|
||||||
s.revMu.RLock()
|
s.revMu.RLock()
|
||||||
compactRev, currentRev = s.compactMainRev, s.currentRev
|
compactRev, currentRev = s.compactMainRev, s.currentRev
|
||||||
s.revMu.RUnlock()
|
s.revMu.RUnlock()
|
||||||
|
|
||||||
if rev > 0 && rev <= compactRev {
|
if rev > 0 && rev <= compactRev {
|
||||||
s.mu.Unlock()
|
s.mu.RUnlock()
|
||||||
return 0, 0, compactRev, ErrCompacted
|
return 0, 0, compactRev, ErrCompacted
|
||||||
} else if rev > 0 && rev > currentRev {
|
} else if rev > 0 && rev > currentRev {
|
||||||
s.mu.Unlock()
|
s.mu.RUnlock()
|
||||||
return 0, currentRev, 0, ErrFutureRev
|
return 0, currentRev, 0, ErrFutureRev
|
||||||
}
|
}
|
||||||
|
|
||||||
s.keepMu.Lock()
|
keep := s.kvindex.Keep(rev)
|
||||||
if s.keep == nil {
|
|
||||||
// ForceCommit ensures that txnRead begins after backend
|
|
||||||
// has committed all the changes from the prev completed compaction.
|
|
||||||
s.b.ForceCommit()
|
|
||||||
s.keep = emptyKeep
|
|
||||||
}
|
|
||||||
keep := s.keep
|
|
||||||
s.keepMu.Unlock()
|
|
||||||
|
|
||||||
tx := s.b.ReadTx()
|
tx := s.b.ReadTx()
|
||||||
tx.Lock()
|
tx.Lock()
|
||||||
defer tx.Unlock()
|
defer tx.Unlock()
|
||||||
s.mu.Unlock()
|
s.mu.RUnlock()
|
||||||
|
|
||||||
if rev == 0 {
|
if rev == 0 {
|
||||||
rev = currentRev
|
rev = currentRev
|
||||||
@ -257,9 +241,6 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
|
|||||||
s.b.ForceCommit()
|
s.b.ForceCommit()
|
||||||
|
|
||||||
keep := s.kvindex.Compact(rev)
|
keep := s.kvindex.Compact(rev)
|
||||||
s.keepMu.Lock()
|
|
||||||
s.keep = keep
|
|
||||||
s.keepMu.Unlock()
|
|
||||||
ch := make(chan struct{})
|
ch := make(chan struct{})
|
||||||
var j = func(ctx context.Context) {
|
var j = func(ctx context.Context) {
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
@ -271,9 +252,6 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
close(ch)
|
close(ch)
|
||||||
s.keepMu.Lock()
|
|
||||||
s.keep = nil
|
|
||||||
s.keepMu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
s.fifoSched.Schedule(j)
|
s.fifoSched.Schedule(j)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user