From 163fd2d76b75e0d04d0dfbcd9c7c02a375a70d51 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 8 May 2017 19:29:53 -0700 Subject: [PATCH 1/2] mvcc: chunk reads for restoring Loading all keys at once would cause etcd to use twice as much memory than it would need to serve the keys, causing RSS to spike on boot. Instead, load the keys into the mvcc by chunk. Uses pipelining for some concurrency. Fixes #7822 --- mvcc/kvstore.go | 140 ++++++++++++++++++++++++------------------- mvcc/kvstore_test.go | 6 +- 2 files changed, 84 insertions(+), 62 deletions(-) diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 36b3d9a26..e5260638c 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -33,13 +33,6 @@ var ( keyBucketName = []byte("key") metaBucketName = []byte("meta") - // markedRevBytesLen is the byte length of marked revision. - // The first `revBytesLen` bytes represents a normal revision. The last - // one byte is the mark. - markedRevBytesLen = revBytesLen + 1 - markBytePosition = markedRevBytesLen - 1 - markTombstone byte = 't' - consistentIndexKeyName = []byte("consistent_index") scheduledCompactKeyName = []byte("scheduledCompactRev") finishedCompactKeyName = []byte("finishedCompactRev") @@ -52,6 +45,17 @@ var ( plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc") ) +const ( + // markedRevBytesLen is the byte length of marked revision. + // The first `revBytesLen` bytes represents a normal revision. The last + // one byte is the mark. + markedRevBytesLen = revBytesLen + 1 + markBytePosition = markedRevBytesLen - 1 + markTombstone byte = 't' + + restoreChunkKeys = 10000 +) + // ConsistentIndexGetter is an interface that wraps the Get method. // Consistent index is the offset of an entry in a consistent replicated log. type ConsistentIndexGetter interface { @@ -247,11 +251,6 @@ func (s *store) restore() error { keyToLease := make(map[string]lease.LeaseID) - // use an unordered map to hold the temp index data to speed up - // the initial key index recovery. - // we will convert this unordered map into the tree index later. - unordered := make(map[string]*keyIndex, 100000) - // restore index tx := s.b.BatchTx() tx.Lock() @@ -260,48 +259,41 @@ func (s *store) restore() error { s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main plog.Printf("restore compact to %d", s.compactMainRev) } - - // TODO: limit N to reduce max memory usage - keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0) - for i, key := range keys { - var kv mvccpb.KeyValue - if err := kv.Unmarshal(vals[i]); err != nil { - plog.Fatalf("cannot unmarshal event: %v", err) - } - - rev := bytesToRev(key[:revBytesLen]) - s.currentRev = rev.main - - // restore index - switch { - case isTombstone(key): - if ki, ok := unordered[string(kv.Key)]; ok { - ki.tombstone(rev.main, rev.sub) - } - delete(keyToLease, string(kv.Key)) - - default: - ki, ok := unordered[string(kv.Key)] - if ok { - ki.put(rev.main, rev.sub) - } else { - ki = &keyIndex{key: kv.Key} - ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version) - unordered[string(kv.Key)] = ki - } - - if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease { - keyToLease[string(kv.Key)] = lid - } else { - delete(keyToLease, string(kv.Key)) - } - } + _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) + scheduledCompact := int64(0) + if len(scheduledCompactBytes) != 0 { + scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main } - // restore the tree index from the unordered index. - for _, v := range unordered { - s.kvindex.Insert(v) + // index keys concurrently as they're loaded in from tx + unorderedc, donec := make(chan map[string]*keyIndex), make(chan struct{}) + go func() { + defer close(donec) + for unordered := range unorderedc { + // restore the tree index from the unordered index. + for _, v := range unordered { + s.kvindex.Insert(v) + } + } + }() + for { + keys, vals := tx.UnsafeRange(keyBucketName, min, max, restoreChunkKeys) + if len(keys) == 0 { + break + } + // unbuffered so keys don't pile up in memory + unorderedc <- s.restoreChunk(keys, vals, keyToLease) + if len(keys) < restoreChunkKeys { + // partial set implies final set + break + } + // next set begins after where this one ended + newMin := bytesToRev(keys[len(keys)-1][:revBytesLen]) + newMin.sub++ + revToBytes(newMin, min) } + close(unorderedc) + <-donec // keys in the range [compacted revision -N, compaction] might all be deleted due to compaction. // the correct revision should be set to compaction revision in the case, not the largest revision @@ -309,6 +301,9 @@ func (s *store) restore() error { if s.currentRev < s.compactMainRev { s.currentRev = s.compactMainRev } + if scheduledCompact <= s.compactMainRev { + scheduledCompact = 0 + } for key, lid := range keyToLease { if s.le == nil { @@ -320,15 +315,6 @@ func (s *store) restore() error { } } - _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) - scheduledCompact := int64(0) - if len(scheduledCompactBytes) != 0 { - scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main - if scheduledCompact <= s.compactMainRev { - scheduledCompact = 0 - } - } - tx.Unlock() if scheduledCompact != 0 { @@ -339,6 +325,40 @@ func (s *store) restore() error { return nil } +func (s *store) restoreChunk(keys, vals [][]byte, keyToLease map[string]lease.LeaseID) map[string]*keyIndex { + // assume half of keys are overwrites + unordered := make(map[string]*keyIndex, len(keys)/2) + for i, key := range keys { + var kv mvccpb.KeyValue + if err := kv.Unmarshal(vals[i]); err != nil { + plog.Fatalf("cannot unmarshal event: %v", err) + } + rev := bytesToRev(key[:revBytesLen]) + s.currentRev = rev.main + kstr := string(kv.Key) + if isTombstone(key) { + if ki, ok := unordered[kstr]; ok { + ki.tombstone(rev.main, rev.sub) + } + delete(keyToLease, kstr) + continue + } + if ki, ok := unordered[kstr]; ok { + ki.put(rev.main, rev.sub) + } else { + ki = &keyIndex{key: kv.Key} + ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version) + unordered[kstr] = ki + } + if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease { + keyToLease[kstr] = lid + } else { + delete(keyToLease, kstr) + } + } + return unordered +} + func (s *store) Close() error { close(s.stopc) s.fifoSched.Stop() diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index f1e8167c3..2d85d8baf 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -373,9 +373,11 @@ func TestStoreRestore(t *testing.T) { t.Fatal(err) } b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} - b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}} b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} + b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}} + b.tx.rangeRespc <- rangeResp{nil, nil} + s.restore() if s.compactMainRev != 3 { @@ -386,8 +388,8 @@ func TestStoreRestore(t *testing.T) { } wact := []testutil.Action{ {"range", []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}}, - {"range", []interface{}{keyBucketName, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(0)}}, {"range", []interface{}{metaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}}, + {"range", []interface{}{keyBucketName, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("tx actions = %+v, want %+v", g, wact) From 1aca63e9e0ce1022718d4a0e9943663306d6a91d Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 8 May 2017 21:45:43 -0700 Subject: [PATCH 2/2] mvcc: time restore in restore benchmark This never worked. --- mvcc/kvstore_bench_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/mvcc/kvstore_bench_test.go b/mvcc/kvstore_bench_test.go index 821fb6755..33fdc17fc 100644 --- a/mvcc/kvstore_bench_test.go +++ b/mvcc/kvstore_bench_test.go @@ -89,7 +89,8 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) { var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() s := NewStore(be, &lease.FakeLessor{}, &i) - defer cleanup(s, be, tmpPath) + // use closure to capture 's' to pick up the reassignment + defer func() { cleanup(s, be, tmpPath) }() // arbitrary number of bytes bytesN := 64 @@ -103,7 +104,11 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) { txn.End() } } + s.Close() + + b.ReportAllocs() b.ResetTimer() + s = NewStore(be, &lease.FakeLessor{}, &i) } func BenchmarkStoreRestoreRevs1(b *testing.B) {