Merge pull request #9511 from jcalvert/index_compaction_breakup

mvcc: Clone for batch index compaction and shorten lock
This commit is contained in:
Gyuho Lee 2018-04-18 15:24:21 -07:00 committed by GitHub
commit e5c9483cd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 69 additions and 27 deletions

View File

@ -34,6 +34,7 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.3.0...v3.4.0) and [
- e.g. a node is removed from cluster, or [`raftpb.MsgProp` arrives at current leader while there is an ongoing leadership transfer](https://github.com/coreos/etcd/issues/8975). - e.g. a node is removed from cluster, or [`raftpb.MsgProp` arrives at current leader while there is an ongoing leadership transfer](https://github.com/coreos/etcd/issues/8975).
- Add [`snapshot`](https://github.com/coreos/etcd/pull/9118) package for easier snapshot workflow (see [`godoc.org/github.com/etcd/snapshot`](https://godoc.org/github.com/coreos/etcd/snapshot) for more). - Add [`snapshot`](https://github.com/coreos/etcd/pull/9118) package for easier snapshot workflow (see [`godoc.org/github.com/etcd/snapshot`](https://godoc.org/github.com/coreos/etcd/snapshot) for more).
- Improve [functional tester](https://github.com/coreos/etcd/tree/master/functional) coverage: [proxy layer to run network fault tests in CI](https://github.com/coreos/etcd/pull/9081), [TLS is enabled both for server and client](https://github.com/coreos/etcd/pull/9534), [liveness mode](https://github.com/coreos/etcd/issues/9230), [shuffle test sequence](https://github.com/coreos/etcd/issues/9381), [membership reconfiguration failure cases](https://github.com/coreos/etcd/pull/9564), [disastrous quorum loss and snapshot recover from a seed member](https://github.com/coreos/etcd/pull/9565), [embedded etcd](https://github.com/coreos/etcd/pull/9572). - Improve [functional tester](https://github.com/coreos/etcd/tree/master/functional) coverage: [proxy layer to run network fault tests in CI](https://github.com/coreos/etcd/pull/9081), [TLS is enabled both for server and client](https://github.com/coreos/etcd/pull/9534), [liveness mode](https://github.com/coreos/etcd/issues/9230), [shuffle test sequence](https://github.com/coreos/etcd/issues/9381), [membership reconfiguration failure cases](https://github.com/coreos/etcd/pull/9564), [disastrous quorum loss and snapshot recover from a seed member](https://github.com/coreos/etcd/pull/9565), [embedded etcd](https://github.com/coreos/etcd/pull/9572).
- Improve [index compaction blocking](https://github.com/coreos/etcd/pull/9511) by using a copy on write clone to avoid holding the lock for the traversal of the entire index.
### Breaking Changes ### Breaking Changes

View File

@ -185,27 +185,34 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
func (ti *treeIndex) Compact(rev int64) map[revision]struct{} { func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
available := make(map[revision]struct{}) available := make(map[revision]struct{})
var emptyki []*keyIndex
if ti.lg != nil { if ti.lg != nil {
ti.lg.Info("compact tree index", zap.Int64("revision", rev)) ti.lg.Info("compact tree index", zap.Int64("revision", rev))
} else { } else {
plog.Printf("store.index: compact %d", rev) plog.Printf("store.index: compact %d", rev)
} }
// TODO: do not hold the lock for long time?
// This is probably OK. Compacting 10M keys takes O(10ms).
ti.Lock() ti.Lock()
defer ti.Unlock() clone := ti.tree.Clone()
ti.tree.Ascend(compactIndex(rev, available, &emptyki)) ti.Unlock()
for _, ki := range emptyki {
item := ti.tree.Delete(ki) clone.Ascend(func(item btree.Item) bool {
if item == nil { keyi := item.(*keyIndex)
if ti.lg != nil { //Lock is needed here to prevent modification to the keyIndex while
ti.lg.Panic("failed to delete during compaction") //compaction is going on or revision added to empty before deletion
} else { ti.Lock()
plog.Panic("store.index: unexpected delete failure during compaction") keyi.compact(rev, available)
if keyi.isEmpty() {
item := ti.tree.Delete(keyi)
if item == nil {
if ti.lg != nil {
ti.lg.Panic("failed to delete during compaction")
} else {
plog.Panic("store.index: unexpected delete failure during compaction")
}
} }
} }
} ti.Unlock()
return true
})
return available return available
} }
@ -222,17 +229,6 @@ func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
return available return available
} }
func compactIndex(rev int64, available map[revision]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool {
return func(i btree.Item) bool {
keyi := i.(*keyIndex)
keyi.compact(rev, available)
if keyi.isEmpty() {
*emptyki = append(*emptyki, keyi)
}
return true
}
}
func (ti *treeIndex) Equal(bi index) bool { func (ti *treeIndex) Equal(bi index) bool {
b := bi.(*treeIndex) b := bi.(*treeIndex)

42
mvcc/index_bench_test.go Normal file
View File

@ -0,0 +1,42 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"testing"
"go.uber.org/zap"
)
func BenchmarkIndexCompact1(b *testing.B) { benchmarkIndexCompact(b, 1) }
func BenchmarkIndexCompact100(b *testing.B) { benchmarkIndexCompact(b, 100) }
func BenchmarkIndexCompact10000(b *testing.B) { benchmarkIndexCompact(b, 10000) }
func BenchmarkIndexCompact100000(b *testing.B) { benchmarkIndexCompact(b, 100000) }
func BenchmarkIndexCompact1000000(b *testing.B) { benchmarkIndexCompact(b, 1000000) }
func benchmarkIndexCompact(b *testing.B, size int) {
log := zap.NewNop()
kvindex := newTreeIndex(log)
bytesN := 64
keys := createBytesSlice(bytesN, size)
for i := 1; i < size; i++ {
kvindex.Put(keys[i], revision{main: int64(i), sub: int64(i)})
}
b.ResetTimer()
for i := 1; i < b.N; i++ {
kvindex.Compact(int64(i))
}
}

View File

@ -217,17 +217,18 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev
func (s *store) Compact(rev int64) (<-chan struct{}, error) { func (s *store) Compact(rev int64) (<-chan struct{}, error) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock()
s.revMu.Lock() s.revMu.Lock()
defer s.revMu.Unlock()
if rev <= s.compactMainRev { if rev <= s.compactMainRev {
ch := make(chan struct{}) ch := make(chan struct{})
f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
s.fifoSched.Schedule(f) s.fifoSched.Schedule(f)
s.mu.Unlock()
s.revMu.Unlock()
return ch, ErrCompacted return ch, ErrCompacted
} }
if rev > s.currentRev { if rev > s.currentRev {
s.mu.Unlock()
s.revMu.Unlock()
return nil, ErrFutureRev return nil, ErrFutureRev
} }
@ -245,6 +246,8 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
// ensure that desired compaction is persisted // ensure that desired compaction is persisted
s.b.ForceCommit() s.b.ForceCommit()
s.mu.Unlock()
s.revMu.Unlock()
keep := s.kvindex.Compact(rev) keep := s.kvindex.Compact(rev)
ch := make(chan struct{}) ch := make(chan struct{})
var j = func(ctx context.Context) { var j = func(ctx context.Context) {