Merge pull request #3438 from yichengq/storage-test

storage: add mock tests for store struct
This commit is contained in:
Yicheng Qin 2015-09-04 10:26:08 -07:00
commit 5a5f15de39
3 changed files with 416 additions and 69 deletions

View File

@ -69,7 +69,7 @@ func newStore(path string) *store {
func (s *store) Put(key, value []byte) int64 {
id := s.TxnBegin()
s.put(key, value, s.currentRev.main+1)
s.put(key, value)
s.txnEnd(id)
putCounter.Inc()
@ -89,7 +89,7 @@ func (s *store) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.K
func (s *store) DeleteRange(key, end []byte) (n, rev int64) {
id := s.TxnBegin()
n = s.deleteRange(key, end, s.currentRev.main+1)
n = s.deleteRange(key, end)
s.txnEnd(id)
deleteCounter.Inc()
@ -150,7 +150,7 @@ func (s *store) TxnPut(txnID int64, key, value []byte) (rev int64, err error) {
return 0, ErrTxnIDMismatch
}
s.put(key, value, s.currentRev.main+1)
s.put(key, value)
return int64(s.currentRev.main + 1), nil
}
@ -161,7 +161,7 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err
return 0, 0, ErrTxnIDMismatch
}
n = s.deleteRange(key, end, s.currentRev.main+1)
n = s.deleteRange(key, end)
if n != 0 || s.currentRev.sub != 0 {
rev = int64(s.currentRev.main + 1)
} else {
@ -319,9 +319,7 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
if err := e.Unmarshal(vs[0]); err != nil {
log.Fatalf("storage: cannot unmarshal event: %v", err)
}
if e.Type == storagepb.PUT {
kvs = append(kvs, *e.Kv)
}
kvs = append(kvs, *e.Kv)
if limit > 0 && len(kvs) >= int(limit) {
break
}
@ -329,7 +327,8 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
return kvs, rev, nil
}
func (s *store) put(key, value []byte, rev int64) {
func (s *store) put(key, value []byte) {
rev := s.currentRev.main + 1
c := rev
// if the key exists before, use its previous created
@ -366,9 +365,8 @@ func (s *store) put(key, value []byte, rev int64) {
s.currentRev.sub += 1
}
func (s *store) deleteRange(key, end []byte, rev int64) int64 {
var n int64
rrev := rev
func (s *store) deleteRange(key, end []byte) int64 {
rrev := s.currentRev.main
if s.currentRev.sub > 0 {
rrev += 1
}
@ -379,45 +377,18 @@ func (s *store) deleteRange(key, end []byte, rev int64) int64 {
}
for _, key := range keys {
ok := s.delete(key, rev)
if ok {
n++
}
s.delete(key)
}
return n
return int64(len(keys))
}
func (s *store) delete(key []byte, mainrev int64) bool {
grev := mainrev
if s.currentRev.sub > 0 {
grev += 1
}
rev, _, _, err := s.kvindex.Get(key, grev)
if err != nil {
// key not exist
return false
}
func (s *store) delete(key []byte) {
mainrev := s.currentRev.main + 1
tx := s.b.BatchTx()
tx.Lock()
defer tx.Unlock()
revbytes := newRevBytes()
revToBytes(rev, revbytes)
_, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
if len(vs) != 1 {
log.Fatalf("storage: delete cannot find rev (%d,%d)", rev.main, rev.sub)
}
e := &storagepb.Event{}
if err := e.Unmarshal(vs[0]); err != nil {
log.Fatalf("storage: cannot unmarshal event: %v", err)
}
if e.Type == storagepb.DELETE {
return false
}
ibytes := newRevBytes()
revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes)
@ -439,5 +410,4 @@ func (s *store) delete(key []byte, mainrev int64) bool {
log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err)
}
s.currentRev.sub += 1
return true
}

View File

@ -31,7 +31,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
}
}
if len(keys) == 0 {
if len(keys) < int(batchsize) {
rbytes := make([]byte, 8+1+8)
revToBytes(revision{main: compactMainRev}, rbytes)
tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)

View File

@ -2,46 +2,321 @@ package storage
import (
"crypto/rand"
"encoding/binary"
"errors"
"io"
"math"
"os"
"reflect"
"testing"
"time"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/storage/backend"
"github.com/coreos/etcd/storage/storagepb"
)
// TODO: improve to a unit test
func TestRangeLimitWhenKeyDeleted(t *testing.T) {
s := newStore(tmpPath)
defer os.Remove(tmpPath)
s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo1"), []byte("bar1"))
s.Put([]byte("foo2"), []byte("bar2"))
s.DeleteRange([]byte("foo1"), nil)
kvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar"), CreateIndex: 1, ModIndex: 1, Version: 1},
{Key: []byte("foo2"), Value: []byte("bar2"), CreateIndex: 3, ModIndex: 3, Version: 1},
}
func TestStorePut(t *testing.T) {
tests := []struct {
limit int64
wkvs []storagepb.KeyValue
rev revision
r indexGetResp
wrev revision
wev storagepb.Event
wputrev revision
}{
// no limit
{0, kvs},
{1, kvs[:1]},
{2, kvs},
{3, kvs},
{
revision{1, 0},
indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound},
revision{1, 1},
storagepb.Event{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
CreateIndex: 2,
ModIndex: 2,
Version: 1,
},
},
revision{2, 0},
},
{
revision{1, 1},
indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil},
revision{1, 2},
storagepb.Event{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
CreateIndex: 2,
ModIndex: 2,
Version: 2,
},
},
revision{2, 1},
},
{
revision{2, 0},
indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil},
revision{2, 1},
storagepb.Event{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
CreateIndex: 2,
ModIndex: 3,
Version: 3,
},
},
revision{3, 0},
},
}
for i, tt := range tests {
kvs, _, err := s.Range([]byte("foo"), []byte("foo3"), tt.limit, 0)
s, b, index := newFakeStore()
s.currentRev = tt.rev
index.indexGetRespc <- tt.r
s.put([]byte("foo"), []byte("bar"))
data, err := tt.wev.Marshal()
if err != nil {
t.Fatalf("#%d: range error (%v)", i, err)
t.Errorf("#%d: marshal err = %v, want nil", i, err)
}
if !reflect.DeepEqual(kvs, tt.wkvs) {
t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs)
wact := []testutil.Action{
{"put", []interface{}{keyBucketName, newTestBytes(tt.wputrev), data}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
}
wact = []testutil.Action{
{"get", []interface{}{[]byte("foo"), tt.wputrev.main}},
{"put", []interface{}{[]byte("foo"), tt.wputrev}},
}
if g := index.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
}
if s.currentRev != tt.wrev {
t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
}
}
}
func TestStoreRange(t *testing.T) {
ev := storagepb.Event{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
CreateIndex: 1,
ModIndex: 2,
Version: 1,
},
}
evb, err := ev.Marshal()
if err != nil {
t.Fatal(err)
}
currev := revision{1, 1}
wrev := int64(2)
tests := []struct {
idxr indexRangeResp
r rangeResp
}{
{
indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}},
},
{
indexRangeResp{[][]byte{[]byte("foo"), []byte("foo1")}, []revision{{2, 0}, {3, 0}}},
rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}},
},
}
for i, tt := range tests {
s, b, index := newFakeStore()
s.currentRev = currev
b.tx.rangeRespc <- tt.r
index.indexRangeRespc <- tt.idxr
kvs, rev, err := s.rangeKeys([]byte("foo"), []byte("goo"), 1, 0)
if err != nil {
t.Errorf("#%d: err = %v, want nil", i, err)
}
if w := []storagepb.KeyValue{*ev.Kv}; !reflect.DeepEqual(kvs, w) {
t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, w)
}
if rev != wrev {
t.Errorf("#%d: rev = %d, want %d", i, rev, wrev)
}
wact := []testutil.Action{
{"range", []interface{}{keyBucketName, newTestBytes(tt.idxr.revs[0]), []byte(nil), int64(0)}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
}
wact = []testutil.Action{
{"range", []interface{}{[]byte("foo"), []byte("goo"), wrev}},
}
if g := index.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
}
if s.currentRev != currev {
t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, currev)
}
}
}
func TestStoreDeleteRange(t *testing.T) {
tests := []struct {
rev revision
r indexRangeResp
wrev revision
wrrev int64
wdelrev revision
}{
{
revision{2, 0},
indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
revision{2, 1},
2,
revision{3, 0},
},
{
revision{2, 1},
indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
revision{2, 2},
3,
revision{3, 1},
},
}
for i, tt := range tests {
s, b, index := newFakeStore()
s.currentRev = tt.rev
index.indexRangeRespc <- tt.r
n := s.deleteRange([]byte("foo"), []byte("goo"))
if n != 1 {
t.Errorf("#%d: n = %d, want 1", i, n)
}
data, err := (&storagepb.Event{
Type: storagepb.DELETE,
Kv: &storagepb.KeyValue{
Key: []byte("foo"),
},
}).Marshal()
if err != nil {
t.Errorf("#%d: marshal err = %v, want nil", i, err)
}
wact := []testutil.Action{
{"put", []interface{}{keyBucketName, newTestBytes(tt.wdelrev), data}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
}
wact = []testutil.Action{
{"range", []interface{}{[]byte("foo"), []byte("goo"), tt.wrrev}},
{"tombstone", []interface{}{[]byte("foo"), tt.wdelrev}},
}
if g := index.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
}
if s.currentRev != tt.wrev {
t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
}
}
}
func TestStoreCompact(t *testing.T) {
s, b, index := newFakeStore()
s.currentRev = revision{3, 0}
index.indexCompactRespc <- map[revision]struct{}{revision{1, 0}: {}}
b.tx.rangeRespc <- rangeResp{[][]byte{newTestBytes(revision{1, 0}), newTestBytes(revision{2, 0})}, nil}
s.Compact(3)
s.wg.Wait()
if s.compactMainRev != 3 {
t.Errorf("compact main rev = %d, want 3", s.compactMainRev)
}
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(4))
wact := []testutil.Action{
{"put", []interface{}{metaBucketName, scheduledCompactKeyName, newTestBytes(revision{3, 0})}},
{"range", []interface{}{keyBucketName, make([]byte, 17), end, int64(10000)}},
{"delete", []interface{}{keyBucketName, newTestBytes(revision{2, 0})}},
{"put", []interface{}{metaBucketName, finishedCompactKeyName, newTestBytes(revision{3, 0})}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("tx actions = %+v, want %+v", g, wact)
}
wact = []testutil.Action{
{"compact", []interface{}{int64(3)}},
}
if g := index.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("index action = %+v, want %+v", g, wact)
}
}
func TestStoreRestore(t *testing.T) {
s, b, index := newFakeStore()
putev := storagepb.Event{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
CreateIndex: 3,
ModIndex: 3,
Version: 1,
},
}
putevb, err := putev.Marshal()
if err != nil {
t.Fatal(err)
}
delev := storagepb.Event{
Type: storagepb.DELETE,
Kv: &storagepb.KeyValue{
Key: []byte("foo"),
},
}
delevb, err := delev.Marshal()
if err != nil {
t.Fatal(err)
}
b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestBytes(revision{2, 0})}}
b.tx.rangeRespc <- rangeResp{[][]byte{newTestBytes(revision{3, 0}), newTestBytes(revision{4, 0})}, [][]byte{putevb, delevb}}
b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestBytes(revision{2, 0})}}
s.Restore()
if s.compactMainRev != 2 {
t.Errorf("compact rev = %d, want 4", s.compactMainRev)
}
wrev := revision{4, 0}
if !reflect.DeepEqual(s.currentRev, wrev) {
t.Errorf("current rev = %v, want %v", s.currentRev, wrev)
}
wact := []testutil.Action{
{"range", []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}},
{"range", []interface{}{keyBucketName, newTestBytes(revision{}), newTestBytes(revision{math.MaxInt64, math.MaxInt64}), int64(0)}},
{"range", []interface{}{metaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("tx actions = %+v, want %+v", g, wact)
}
wact = []testutil.Action{
{"restore", []interface{}{[]byte("foo"), revision{3, 0}, revision{3, 0}, int64(1)}},
{"tombstone", []interface{}{[]byte("foo"), revision{4, 0}}},
}
if g := index.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("index action = %+v, want %+v", g, wact)
}
}
@ -101,3 +376,105 @@ func BenchmarkStorePut(b *testing.B) {
s.Put(keys[i], []byte("foo"))
}
}
func newTestBytes(rev revision) []byte {
bytes := newRevBytes()
revToBytes(rev, bytes)
return bytes
}
func newFakeStore() (*store, *fakeBackend, *fakeIndex) {
b := &fakeBackend{&fakeBatchTx{rangeRespc: make(chan rangeResp, 5)}}
index := &fakeIndex{
indexGetRespc: make(chan indexGetResp, 1),
indexRangeRespc: make(chan indexRangeResp, 1),
indexCompactRespc: make(chan map[revision]struct{}, 1),
}
return &store{
b: b,
kvindex: index,
currentRev: revision{},
compactMainRev: -1,
}, b, index
}
type rangeResp struct {
keys [][]byte
vals [][]byte
}
type fakeBatchTx struct {
testutil.Recorder
rangeRespc chan rangeResp
}
func (b *fakeBatchTx) Lock() {}
func (b *fakeBatchTx) Unlock() {}
func (b *fakeBatchTx) UnsafeCreateBucket(name []byte) {}
func (b *fakeBatchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucketName, key, value}})
}
func (b *fakeBatchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
b.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{bucketName, key, endKey, limit}})
r := <-b.rangeRespc
return r.keys, r.vals
}
func (b *fakeBatchTx) UnsafeDelete(bucketName []byte, key []byte) {
b.Recorder.Record(testutil.Action{Name: "delete", Params: []interface{}{bucketName, key}})
}
func (b *fakeBatchTx) Commit() {}
func (b *fakeBatchTx) CommitAndStop() {}
type fakeBackend struct {
tx *fakeBatchTx
}
func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx }
func (b *fakeBackend) Snapshot(w io.Writer) (n int64, err error) { return 0, errors.New("unsupported") }
func (b *fakeBackend) ForceCommit() {}
func (b *fakeBackend) Close() error { return nil }
type indexGetResp struct {
rev revision
created revision
ver int64
err error
}
type indexRangeResp struct {
keys [][]byte
revs []revision
}
type fakeIndex struct {
testutil.Recorder
indexGetRespc chan indexGetResp
indexRangeRespc chan indexRangeResp
indexCompactRespc chan map[revision]struct{}
}
func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created revision, ver int64, err error) {
i.Recorder.Record(testutil.Action{Name: "get", Params: []interface{}{key, atRev}})
r := <-i.indexGetRespc
return r.rev, r.created, r.ver, r.err
}
func (i *fakeIndex) Range(key, end []byte, atRev int64) ([][]byte, []revision) {
i.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{key, end, atRev}})
r := <-i.indexRangeRespc
return r.keys, r.revs
}
func (i *fakeIndex) Put(key []byte, rev revision) {
i.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{key, rev}})
}
func (i *fakeIndex) Restore(key []byte, created, modified revision, ver int64) {
i.Recorder.Record(testutil.Action{Name: "restore", Params: []interface{}{key, created, modified, ver}})
}
func (i *fakeIndex) Tombstone(key []byte, rev revision) error {
i.Recorder.Record(testutil.Action{Name: "tombstone", Params: []interface{}{key, rev}})
return nil
}
func (i *fakeIndex) Compact(rev int64) map[revision]struct{} {
i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}})
return <-i.indexCompactRespc
}
func (i *fakeIndex) Equal(b index) bool { return false }