diff --git a/mvcc/index.go b/mvcc/index.go index e8e489b4d..3ff53710d 100644 --- a/mvcc/index.go +++ b/mvcc/index.go @@ -30,6 +30,7 @@ type index interface { RangeSince(key, end []byte, rev int64) []revision Compact(rev int64) map[revision]struct{} Equal(b index) bool + Insert(ki *keyIndex) } type treeIndex struct { @@ -215,3 +216,9 @@ func (a *treeIndex) Equal(bi index) bool { return equal } + +func (ti *treeIndex) Insert(ki *keyIndex) { + ti.Lock() + defer ti.Unlock() + ti.tree.ReplaceOrInsert(ki) +} diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index ff038bc23..28a18a065 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -380,6 +380,11 @@ func (s *store) restore() error { keyToLease := make(map[string]lease.LeaseID) + // use an unordered map to hold the temp index data to speed up + // the initial key index recovery. + // we will convert this unordered map into the tree index later. + unordered := make(map[string]*keyIndex, 100000) + // restore index tx := s.b.BatchTx() tx.Lock() @@ -402,11 +407,20 @@ func (s *store) restore() error { // restore index switch { case isTombstone(key): - s.kvindex.Tombstone(kv.Key, rev) + if ki, ok := unordered[string(kv.Key)]; ok { + ki.tombstone(rev.main, rev.sub) + } delete(keyToLease, string(kv.Key)) default: - s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version) + ki, ok := unordered[string(kv.Key)] + if ok { + ki.put(rev.main, rev.sub) + } else { + ki = &keyIndex{key: kv.Key} + ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version) + unordered[string(kv.Key)] = ki + } if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease { keyToLease[string(kv.Key)] = lid @@ -419,6 +433,11 @@ func (s *store) restore() error { s.currentRev = rev } + // restore the tree index from the unordered index. + for _, v := range unordered { + s.kvindex.Insert(v) + } + // keys in the range [compacted revision -N, compaction] might all be deleted due to compaction. // the correct revision should be set to compaction revision in the case, not the largest revision // we have seen. diff --git a/mvcc/kvstore_bench_test.go b/mvcc/kvstore_bench_test.go index d7a67e3c2..c111613ce 100644 --- a/mvcc/kvstore_bench_test.go +++ b/mvcc/kvstore_bench_test.go @@ -85,3 +85,40 @@ func BenchmarkStoreTxnPut(b *testing.B) { s.TxnEnd(id) } } + +// benchmarkStoreRestore benchmarks the restore operation +func benchmarkStoreRestore(revsPerKey int, b *testing.B) { + var i fakeConsistentIndex + be, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(be, &lease.FakeLessor{}, &i) + defer cleanup(s, be, tmpPath) + + // arbitrary number of bytes + bytesN := 64 + keys := createBytesSlice(bytesN, b.N) + vals := createBytesSlice(bytesN, b.N) + + for i := 0; i < b.N; i++ { + for j := 0; j < revsPerKey; j++ { + id := s.TxnBegin() + if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil { + plog.Fatalf("txn put error: %v", err) + } + s.TxnEnd(id) + } + } + b.ResetTimer() + s = NewStore(be, &lease.FakeLessor{}, &i) +} + +func BenchmarkStoreRestoreRevs1(b *testing.B) { + benchmarkStoreRestore(1, b) +} + +func BenchmarkStoreRestoreRevs10(b *testing.B) { + benchmarkStoreRestore(10, b) +} + +func BenchmarkStoreRestoreRevs20(b *testing.B) { + benchmarkStoreRestore(20, b) +} diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index 72c2ca9cb..195429417 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -405,9 +405,14 @@ func TestStoreRestore(t *testing.T) { if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("tx actions = %+v, want %+v", g, wact) } + + gens := []generation{ + {created: revision{4, 0}, ver: 2, revs: []revision{{3, 0}, {5, 0}}}, + {created: revision{0, 0}, ver: 0, revs: nil}, + } + ki := &keyIndex{key: []byte("foo"), modified: revision{5, 0}, generations: gens} wact = []testutil.Action{ - {"restore", []interface{}{[]byte("foo"), revision{4, 0}, revision{3, 0}, int64(1)}}, - {"tombstone", []interface{}{[]byte("foo"), revision{5, 0}}}, + {"insert", []interface{}{ki}}, } if g := fi.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("index action = %+v, want %+v", g, wact) @@ -668,6 +673,10 @@ func (i *fakeIndex) Compact(rev int64) map[revision]struct{} { } func (i *fakeIndex) Equal(b index) bool { return false } +func (i *fakeIndex) Insert(ki *keyIndex) { + i.Recorder.Record(testutil.Action{Name: "insert", Params: []interface{}{ki}}) +} + func createBytesSlice(bytesN, sliceN int) [][]byte { rs := [][]byte{} for len(rs) != sliceN {