etcd/mvcc/kvstore_test.go
Anthony Romano a375ff172e mvcc: chunk reads for restoring
Loading all keys at once would cause etcd to use twice as much
memory than it would need to serve the keys, causing RSS to spike on
boot. Instead, load the keys into the mvcc by chunk. Uses pipelining
for some concurrency.

Fixes #7822
2017-06-01 14:59:27 -07:00

660 lines
18 KiB
Go

// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"crypto/rand"
"encoding/binary"
"math"
"os"
"reflect"
"testing"
"time"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/schedule"
"github.com/coreos/etcd/pkg/testutil"
)
func TestStoreRev(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(b, &lease.FakeLessor{}, nil)
defer s.Close()
defer os.Remove(tmpPath)
for i := 1; i <= 3; i++ {
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
if r := s.Rev(); r != int64(i+1) {
t.Errorf("#%d: rev = %d, want %d", i, r, i+1)
}
}
}
func TestStorePut(t *testing.T) {
kv := mvccpb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
CreateRevision: 1,
ModRevision: 2,
Version: 1,
}
kvb, err := kv.Marshal()
if err != nil {
t.Fatal(err)
}
tests := []struct {
rev revision
r indexGetResp
rr *rangeResp
wrev revision
wkey []byte
wkv mvccpb.KeyValue
wputrev revision
}{
{
revision{1, 0},
indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound},
nil,
revision{2, 0},
newTestKeyBytes(revision{2, 0}, false),
mvccpb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
CreateRevision: 2,
ModRevision: 2,
Version: 1,
Lease: 1,
},
revision{2, 0},
},
{
revision{1, 1},
indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil},
&rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}},
revision{2, 0},
newTestKeyBytes(revision{2, 0}, false),
mvccpb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
CreateRevision: 2,
ModRevision: 2,
Version: 2,
Lease: 2,
},
revision{2, 0},
},
{
revision{2, 0},
indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil},
&rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}},
revision{3, 0},
newTestKeyBytes(revision{3, 0}, false),
mvccpb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
CreateRevision: 2,
ModRevision: 3,
Version: 3,
Lease: 3,
},
revision{3, 0},
},
}
for i, tt := range tests {
s := newFakeStore()
b := s.b.(*fakeBackend)
fi := s.kvindex.(*fakeIndex)
s.currentRev = tt.rev.main
fi.indexGetRespc <- tt.r
if tt.rr != nil {
b.tx.rangeRespc <- *tt.rr
}
s.Put([]byte("foo"), []byte("bar"), lease.LeaseID(i+1))
data, err := tt.wkv.Marshal()
if err != nil {
t.Errorf("#%d: marshal err = %v, want nil", i, err)
}
wact := []testutil.Action{
{"seqput", []interface{}{keyBucketName, tt.wkey, data}},
}
if tt.rr != nil {
wact = []testutil.Action{
{"seqput", []interface{}{keyBucketName, tt.wkey, data}},
}
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
}
wact = []testutil.Action{
{"get", []interface{}{[]byte("foo"), tt.wputrev.main}},
{"put", []interface{}{[]byte("foo"), tt.wputrev}},
}
if g := fi.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
}
if s.currentRev != tt.wrev.main {
t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
}
s.Close()
}
}
func TestStoreRange(t *testing.T) {
key := newTestKeyBytes(revision{2, 0}, false)
kv := mvccpb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
CreateRevision: 1,
ModRevision: 2,
Version: 1,
}
kvb, err := kv.Marshal()
if err != nil {
t.Fatal(err)
}
wrev := int64(2)
tests := []struct {
idxr indexRangeResp
r rangeResp
}{
{
indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
rangeResp{[][]byte{key}, [][]byte{kvb}},
},
{
indexRangeResp{[][]byte{[]byte("foo"), []byte("foo1")}, []revision{{2, 0}, {3, 0}}},
rangeResp{[][]byte{key}, [][]byte{kvb}},
},
}
ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
for i, tt := range tests {
s := newFakeStore()
b := s.b.(*fakeBackend)
fi := s.kvindex.(*fakeIndex)
s.currentRev = 2
b.tx.rangeRespc <- tt.r
fi.indexRangeRespc <- tt.idxr
ret, err := s.Range([]byte("foo"), []byte("goo"), ro)
if err != nil {
t.Errorf("#%d: err = %v, want nil", i, err)
}
if w := []mvccpb.KeyValue{kv}; !reflect.DeepEqual(ret.KVs, w) {
t.Errorf("#%d: kvs = %+v, want %+v", i, ret.KVs, w)
}
if ret.Rev != wrev {
t.Errorf("#%d: rev = %d, want %d", i, ret.Rev, wrev)
}
wstart, wend := revBytesRange(tt.idxr.revs[0])
wact := []testutil.Action{
{"range", []interface{}{keyBucketName, wstart, wend, int64(0)}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
}
wact = []testutil.Action{
{"range", []interface{}{[]byte("foo"), []byte("goo"), wrev}},
}
if g := fi.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
}
if s.currentRev != 2 {
t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, 2)
}
s.Close()
}
}
func TestStoreDeleteRange(t *testing.T) {
key := newTestKeyBytes(revision{2, 0}, false)
kv := mvccpb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
CreateRevision: 1,
ModRevision: 2,
Version: 1,
}
kvb, err := kv.Marshal()
if err != nil {
t.Fatal(err)
}
tests := []struct {
rev revision
r indexRangeResp
rr rangeResp
wkey []byte
wrev revision
wrrev int64
wdelrev revision
}{
{
revision{2, 0},
indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
rangeResp{[][]byte{key}, [][]byte{kvb}},
newTestKeyBytes(revision{3, 0}, true),
revision{3, 0},
2,
revision{3, 0},
},
}
for i, tt := range tests {
s := newFakeStore()
b := s.b.(*fakeBackend)
fi := s.kvindex.(*fakeIndex)
s.currentRev = tt.rev.main
fi.indexRangeRespc <- tt.r
b.tx.rangeRespc <- tt.rr
n, _ := s.DeleteRange([]byte("foo"), []byte("goo"))
if n != 1 {
t.Errorf("#%d: n = %d, want 1", i, n)
}
data, err := (&mvccpb.KeyValue{
Key: []byte("foo"),
}).Marshal()
if err != nil {
t.Errorf("#%d: marshal err = %v, want nil", i, err)
}
wact := []testutil.Action{
{"seqput", []interface{}{keyBucketName, tt.wkey, data}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
}
wact = []testutil.Action{
{"range", []interface{}{[]byte("foo"), []byte("goo"), tt.wrrev}},
{"tombstone", []interface{}{[]byte("foo"), tt.wdelrev}},
}
if g := fi.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
}
if s.currentRev != tt.wrev.main {
t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
}
}
}
func TestStoreCompact(t *testing.T) {
s := newFakeStore()
defer s.Close()
b := s.b.(*fakeBackend)
fi := s.kvindex.(*fakeIndex)
s.currentRev = 3
fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}}
key1 := newTestKeyBytes(revision{1, 0}, false)
key2 := newTestKeyBytes(revision{2, 0}, false)
b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, nil}
s.Compact(3)
s.fifoSched.WaitFinish(1)
if s.compactMainRev != 3 {
t.Errorf("compact main rev = %d, want 3", s.compactMainRev)
}
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(4))
wact := []testutil.Action{
{"put", []interface{}{metaBucketName, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
{"range", []interface{}{keyBucketName, make([]byte, 17), end, int64(10000)}},
{"delete", []interface{}{keyBucketName, key2}},
{"put", []interface{}{metaBucketName, finishedCompactKeyName, newTestRevBytes(revision{3, 0})}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("tx actions = %+v, want %+v", g, wact)
}
wact = []testutil.Action{
{"compact", []interface{}{int64(3)}},
}
if g := fi.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("index action = %+v, want %+v", g, wact)
}
}
func TestStoreRestore(t *testing.T) {
s := newFakeStore()
b := s.b.(*fakeBackend)
fi := s.kvindex.(*fakeIndex)
putkey := newTestKeyBytes(revision{3, 0}, false)
putkv := mvccpb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
CreateRevision: 4,
ModRevision: 4,
Version: 1,
}
putkvb, err := putkv.Marshal()
if err != nil {
t.Fatal(err)
}
delkey := newTestKeyBytes(revision{5, 0}, true)
delkv := mvccpb.KeyValue{
Key: []byte("foo"),
}
delkvb, err := delkv.Marshal()
if err != nil {
t.Fatal(err)
}
b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}}
b.tx.rangeRespc <- rangeResp{nil, nil}
s.restore()
if s.compactMainRev != 3 {
t.Errorf("compact rev = %d, want 5", s.compactMainRev)
}
if s.currentRev != 5 {
t.Errorf("current rev = %v, want 5", s.currentRev)
}
wact := []testutil.Action{
{"range", []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}},
{"range", []interface{}{metaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}},
{"range", []interface{}{keyBucketName, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("tx actions = %+v, want %+v", g, wact)
}
gens := []generation{
{created: revision{4, 0}, ver: 2, revs: []revision{{3, 0}, {5, 0}}},
{created: revision{0, 0}, ver: 0, revs: nil},
}
ki := &keyIndex{key: []byte("foo"), modified: revision{5, 0}, generations: gens}
wact = []testutil.Action{
{"insert", []interface{}{ki}},
}
if g := fi.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("index action = %+v, want %+v", g, wact)
}
}
func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s0 := NewStore(b, &lease.FakeLessor{}, nil)
defer os.Remove(tmpPath)
s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease)
s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease)
// write scheduled compaction, but not do compaction
rbytes := newRevBytes()
revToBytes(revision{main: 2}, rbytes)
tx := s0.b.BatchTx()
tx.Lock()
tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
tx.Unlock()
s0.Close()
s1 := NewStore(b, &lease.FakeLessor{}, nil)
// wait for scheduled compaction to be finished
time.Sleep(100 * time.Millisecond)
if _, err := s1.Range([]byte("foo"), nil, RangeOptions{Rev: 1}); err != ErrCompacted {
t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted)
}
// check the key in backend is deleted
revbytes := newRevBytes()
revToBytes(revision{main: 1}, revbytes)
// The disk compaction is done asynchronously and requires more time on slow disk.
// try 5 times for CI with slow IO.
for i := 0; i < 5; i++ {
tx = s1.b.BatchTx()
tx.Lock()
ks, _ := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
tx.Unlock()
if len(ks) != 0 {
time.Sleep(100 * time.Millisecond)
continue
}
return
}
t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes))
}
func TestTxnPut(t *testing.T) {
// assign arbitrary size
bytesN := 30
sliceN := 100
keys := createBytesSlice(bytesN, sliceN)
vals := createBytesSlice(bytesN, sliceN)
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(b, &lease.FakeLessor{}, nil)
defer cleanup(s, b, tmpPath)
for i := 0; i < sliceN; i++ {
txn := s.Write()
base := int64(i + 2)
if rev := txn.Put(keys[i], vals[i], lease.NoLease); rev != base {
t.Errorf("#%d: rev = %d, want %d", i, rev, base)
}
txn.End()
}
}
func TestTxnBlockBackendForceCommit(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(b, &lease.FakeLessor{}, nil)
defer os.Remove(tmpPath)
txn := s.Read()
done := make(chan struct{})
go func() {
s.b.ForceCommit()
done <- struct{}{}
}()
select {
case <-done:
t.Fatalf("failed to block ForceCommit")
case <-time.After(100 * time.Millisecond):
}
txn.End()
select {
case <-done:
case <-time.After(5 * time.Second): // wait 5 seconds for CI with slow IO
testutil.FatalStack(t, "failed to execute ForceCommit")
}
}
// TODO: test attach key to lessor
func newTestRevBytes(rev revision) []byte {
bytes := newRevBytes()
revToBytes(rev, bytes)
return bytes
}
func newTestKeyBytes(rev revision, tombstone bool) []byte {
bytes := newRevBytes()
revToBytes(rev, bytes)
if tombstone {
bytes = appendMarkTombstone(bytes)
}
return bytes
}
func newFakeStore() *store {
b := &fakeBackend{&fakeBatchTx{
Recorder: &testutil.RecorderBuffered{},
rangeRespc: make(chan rangeResp, 5)}}
fi := &fakeIndex{
Recorder: &testutil.RecorderBuffered{},
indexGetRespc: make(chan indexGetResp, 1),
indexRangeRespc: make(chan indexRangeResp, 1),
indexRangeEventsRespc: make(chan indexRangeEventsResp, 1),
indexCompactRespc: make(chan map[revision]struct{}, 1),
}
s := &store{
b: b,
le: &lease.FakeLessor{},
kvindex: fi,
currentRev: 0,
compactMainRev: -1,
fifoSched: schedule.NewFIFOScheduler(),
stopc: make(chan struct{}),
}
s.ReadView, s.WriteView = &readView{s}, &writeView{s}
return s
}
type rangeResp struct {
keys [][]byte
vals [][]byte
}
type fakeBatchTx struct {
testutil.Recorder
rangeRespc chan rangeResp
}
func (b *fakeBatchTx) Lock() {}
func (b *fakeBatchTx) Unlock() {}
func (b *fakeBatchTx) UnsafeCreateBucket(name []byte) {}
func (b *fakeBatchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucketName, key, value}})
}
func (b *fakeBatchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
b.Recorder.Record(testutil.Action{Name: "seqput", Params: []interface{}{bucketName, key, value}})
}
func (b *fakeBatchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
b.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{bucketName, key, endKey, limit}})
r := <-b.rangeRespc
return r.keys, r.vals
}
func (b *fakeBatchTx) UnsafeDelete(bucketName []byte, key []byte) {
b.Recorder.Record(testutil.Action{Name: "delete", Params: []interface{}{bucketName, key}})
}
func (b *fakeBatchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
return nil
}
func (b *fakeBatchTx) Commit() {}
func (b *fakeBatchTx) CommitAndStop() {}
type fakeBackend struct {
tx *fakeBatchTx
}
func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx }
func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx }
func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil }
func (b *fakeBackend) Size() int64 { return 0 }
func (b *fakeBackend) Snapshot() backend.Snapshot { return nil }
func (b *fakeBackend) ForceCommit() {}
func (b *fakeBackend) Defrag() error { return nil }
func (b *fakeBackend) Close() error { return nil }
type indexGetResp struct {
rev revision
created revision
ver int64
err error
}
type indexRangeResp struct {
keys [][]byte
revs []revision
}
type indexRangeEventsResp struct {
revs []revision
}
type fakeIndex struct {
testutil.Recorder
indexGetRespc chan indexGetResp
indexRangeRespc chan indexRangeResp
indexRangeEventsRespc chan indexRangeEventsResp
indexCompactRespc chan map[revision]struct{}
}
func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created revision, ver int64, err error) {
i.Recorder.Record(testutil.Action{Name: "get", Params: []interface{}{key, atRev}})
r := <-i.indexGetRespc
return r.rev, r.created, r.ver, r.err
}
func (i *fakeIndex) Range(key, end []byte, atRev int64) ([][]byte, []revision) {
i.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{key, end, atRev}})
r := <-i.indexRangeRespc
return r.keys, r.revs
}
func (i *fakeIndex) Put(key []byte, rev revision) {
i.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{key, rev}})
}
func (i *fakeIndex) Tombstone(key []byte, rev revision) error {
i.Recorder.Record(testutil.Action{Name: "tombstone", Params: []interface{}{key, rev}})
return nil
}
func (i *fakeIndex) RangeSince(key, end []byte, rev int64) []revision {
i.Recorder.Record(testutil.Action{Name: "rangeEvents", Params: []interface{}{key, end, rev}})
r := <-i.indexRangeEventsRespc
return r.revs
}
func (i *fakeIndex) Compact(rev int64) map[revision]struct{} {
i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}})
return <-i.indexCompactRespc
}
func (i *fakeIndex) Equal(b index) bool { return false }
func (i *fakeIndex) Insert(ki *keyIndex) {
i.Recorder.Record(testutil.Action{Name: "insert", Params: []interface{}{ki}})
}
func createBytesSlice(bytesN, sliceN int) [][]byte {
rs := [][]byte{}
for len(rs) != sliceN {
v := make([]byte, bytesN)
if _, err := rand.Read(v); err != nil {
panic(err)
}
rs = append(rs, v)
}
return rs
}