diff --git a/storage/kvstore.go b/storage/kvstore.go index 7cc4c19c1..c7af48df9 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -69,7 +69,7 @@ func newStore(path string) *store { func (s *store) Put(key, value []byte) int64 { id := s.TxnBegin() - s.put(key, value, s.currentRev.main+1) + s.put(key, value) s.txnEnd(id) putCounter.Inc() @@ -89,7 +89,7 @@ func (s *store) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.K func (s *store) DeleteRange(key, end []byte) (n, rev int64) { id := s.TxnBegin() - n = s.deleteRange(key, end, s.currentRev.main+1) + n = s.deleteRange(key, end) s.txnEnd(id) deleteCounter.Inc() @@ -150,7 +150,7 @@ func (s *store) TxnPut(txnID int64, key, value []byte) (rev int64, err error) { return 0, ErrTxnIDMismatch } - s.put(key, value, s.currentRev.main+1) + s.put(key, value) return int64(s.currentRev.main + 1), nil } @@ -161,7 +161,7 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err return 0, 0, ErrTxnIDMismatch } - n = s.deleteRange(key, end, s.currentRev.main+1) + n = s.deleteRange(key, end) if n != 0 || s.currentRev.sub != 0 { rev = int64(s.currentRev.main + 1) } else { @@ -319,9 +319,7 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage if err := e.Unmarshal(vs[0]); err != nil { log.Fatalf("storage: cannot unmarshal event: %v", err) } - if e.Type == storagepb.PUT { - kvs = append(kvs, *e.Kv) - } + kvs = append(kvs, *e.Kv) if limit > 0 && len(kvs) >= int(limit) { break } @@ -329,7 +327,8 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage return kvs, rev, nil } -func (s *store) put(key, value []byte, rev int64) { +func (s *store) put(key, value []byte) { + rev := s.currentRev.main + 1 c := rev // if the key exists before, use its previous created @@ -366,9 +365,8 @@ func (s *store) put(key, value []byte, rev int64) { s.currentRev.sub += 1 } -func (s *store) deleteRange(key, end []byte, rev int64) int64 { - var n int64 - rrev := rev +func (s *store) deleteRange(key, end []byte) int64 { + rrev := s.currentRev.main if s.currentRev.sub > 0 { rrev += 1 } @@ -379,45 +377,18 @@ func (s *store) deleteRange(key, end []byte, rev int64) int64 { } for _, key := range keys { - ok := s.delete(key, rev) - if ok { - n++ - } + s.delete(key) } - return n + return int64(len(keys)) } -func (s *store) delete(key []byte, mainrev int64) bool { - grev := mainrev - if s.currentRev.sub > 0 { - grev += 1 - } - rev, _, _, err := s.kvindex.Get(key, grev) - if err != nil { - // key not exist - return false - } +func (s *store) delete(key []byte) { + mainrev := s.currentRev.main + 1 tx := s.b.BatchTx() tx.Lock() defer tx.Unlock() - revbytes := newRevBytes() - revToBytes(rev, revbytes) - - _, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) - if len(vs) != 1 { - log.Fatalf("storage: delete cannot find rev (%d,%d)", rev.main, rev.sub) - } - - e := &storagepb.Event{} - if err := e.Unmarshal(vs[0]); err != nil { - log.Fatalf("storage: cannot unmarshal event: %v", err) - } - if e.Type == storagepb.DELETE { - return false - } - ibytes := newRevBytes() revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes) @@ -439,5 +410,4 @@ func (s *store) delete(key []byte, mainrev int64) bool { log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err) } s.currentRev.sub += 1 - return true } diff --git a/storage/kvstore_compaction.go b/storage/kvstore_compaction.go index d63e25f42..cec9cc435 100644 --- a/storage/kvstore_compaction.go +++ b/storage/kvstore_compaction.go @@ -31,7 +31,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc } } - if len(keys) == 0 { + if len(keys) < int(batchsize) { rbytes := make([]byte, 8+1+8) revToBytes(revision{main: compactMainRev}, rbytes) tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes) diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index 3e8f2116c..28b315dc6 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -2,46 +2,321 @@ package storage import ( "crypto/rand" + "encoding/binary" + "errors" + "io" + "math" "os" "reflect" "testing" "time" + "github.com/coreos/etcd/pkg/testutil" + "github.com/coreos/etcd/storage/backend" "github.com/coreos/etcd/storage/storagepb" ) -// TODO: improve to a unit test -func TestRangeLimitWhenKeyDeleted(t *testing.T) { - s := newStore(tmpPath) - defer os.Remove(tmpPath) - - s.Put([]byte("foo"), []byte("bar")) - s.Put([]byte("foo1"), []byte("bar1")) - s.Put([]byte("foo2"), []byte("bar2")) - s.DeleteRange([]byte("foo1"), nil) - kvs := []storagepb.KeyValue{ - {Key: []byte("foo"), Value: []byte("bar"), CreateIndex: 1, ModIndex: 1, Version: 1}, - {Key: []byte("foo2"), Value: []byte("bar2"), CreateIndex: 3, ModIndex: 3, Version: 1}, - } - +func TestStorePut(t *testing.T) { tests := []struct { - limit int64 - wkvs []storagepb.KeyValue + rev revision + r indexGetResp + + wrev revision + wev storagepb.Event + wputrev revision }{ - // no limit - {0, kvs}, - {1, kvs[:1]}, - {2, kvs}, - {3, kvs}, + { + revision{1, 0}, + indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound}, + revision{1, 1}, + storagepb.Event{ + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateIndex: 2, + ModIndex: 2, + Version: 1, + }, + }, + revision{2, 0}, + }, + { + revision{1, 1}, + indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil}, + revision{1, 2}, + storagepb.Event{ + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateIndex: 2, + ModIndex: 2, + Version: 2, + }, + }, + revision{2, 1}, + }, + { + revision{2, 0}, + indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil}, + revision{2, 1}, + storagepb.Event{ + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateIndex: 2, + ModIndex: 3, + Version: 3, + }, + }, + revision{3, 0}, + }, } for i, tt := range tests { - kvs, _, err := s.Range([]byte("foo"), []byte("foo3"), tt.limit, 0) + s, b, index := newFakeStore() + s.currentRev = tt.rev + index.indexGetRespc <- tt.r + + s.put([]byte("foo"), []byte("bar")) + + data, err := tt.wev.Marshal() if err != nil { - t.Fatalf("#%d: range error (%v)", i, err) + t.Errorf("#%d: marshal err = %v, want nil", i, err) } - if !reflect.DeepEqual(kvs, tt.wkvs) { - t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs) + wact := []testutil.Action{ + {"put", []interface{}{keyBucketName, newTestBytes(tt.wputrev), data}}, } + if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { + t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) + } + wact = []testutil.Action{ + {"get", []interface{}{[]byte("foo"), tt.wputrev.main}}, + {"put", []interface{}{[]byte("foo"), tt.wputrev}}, + } + if g := index.Action(); !reflect.DeepEqual(g, wact) { + t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) + } + if s.currentRev != tt.wrev { + t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev) + } + } +} + +func TestStoreRange(t *testing.T) { + ev := storagepb.Event{ + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateIndex: 1, + ModIndex: 2, + Version: 1, + }, + } + evb, err := ev.Marshal() + if err != nil { + t.Fatal(err) + } + currev := revision{1, 1} + wrev := int64(2) + + tests := []struct { + idxr indexRangeResp + r rangeResp + }{ + { + indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}}, + rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}}, + }, + { + indexRangeResp{[][]byte{[]byte("foo"), []byte("foo1")}, []revision{{2, 0}, {3, 0}}}, + rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}}, + }, + } + for i, tt := range tests { + s, b, index := newFakeStore() + s.currentRev = currev + b.tx.rangeRespc <- tt.r + index.indexRangeRespc <- tt.idxr + + kvs, rev, err := s.rangeKeys([]byte("foo"), []byte("goo"), 1, 0) + if err != nil { + t.Errorf("#%d: err = %v, want nil", i, err) + } + if w := []storagepb.KeyValue{*ev.Kv}; !reflect.DeepEqual(kvs, w) { + t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, w) + } + if rev != wrev { + t.Errorf("#%d: rev = %d, want %d", i, rev, wrev) + } + + wact := []testutil.Action{ + {"range", []interface{}{keyBucketName, newTestBytes(tt.idxr.revs[0]), []byte(nil), int64(0)}}, + } + if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { + t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) + } + wact = []testutil.Action{ + {"range", []interface{}{[]byte("foo"), []byte("goo"), wrev}}, + } + if g := index.Action(); !reflect.DeepEqual(g, wact) { + t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) + } + if s.currentRev != currev { + t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, currev) + } + } +} + +func TestStoreDeleteRange(t *testing.T) { + tests := []struct { + rev revision + r indexRangeResp + + wrev revision + wrrev int64 + wdelrev revision + }{ + { + revision{2, 0}, + indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}}, + revision{2, 1}, + 2, + revision{3, 0}, + }, + { + revision{2, 1}, + indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}}, + revision{2, 2}, + 3, + revision{3, 1}, + }, + } + for i, tt := range tests { + s, b, index := newFakeStore() + s.currentRev = tt.rev + index.indexRangeRespc <- tt.r + + n := s.deleteRange([]byte("foo"), []byte("goo")) + if n != 1 { + t.Errorf("#%d: n = %d, want 1", i, n) + } + + data, err := (&storagepb.Event{ + Type: storagepb.DELETE, + Kv: &storagepb.KeyValue{ + Key: []byte("foo"), + }, + }).Marshal() + if err != nil { + t.Errorf("#%d: marshal err = %v, want nil", i, err) + } + wact := []testutil.Action{ + {"put", []interface{}{keyBucketName, newTestBytes(tt.wdelrev), data}}, + } + if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { + t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) + } + wact = []testutil.Action{ + {"range", []interface{}{[]byte("foo"), []byte("goo"), tt.wrrev}}, + {"tombstone", []interface{}{[]byte("foo"), tt.wdelrev}}, + } + if g := index.Action(); !reflect.DeepEqual(g, wact) { + t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) + } + if s.currentRev != tt.wrev { + t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev) + } + } +} + +func TestStoreCompact(t *testing.T) { + s, b, index := newFakeStore() + s.currentRev = revision{3, 0} + index.indexCompactRespc <- map[revision]struct{}{revision{1, 0}: {}} + b.tx.rangeRespc <- rangeResp{[][]byte{newTestBytes(revision{1, 0}), newTestBytes(revision{2, 0})}, nil} + + s.Compact(3) + s.wg.Wait() + + if s.compactMainRev != 3 { + t.Errorf("compact main rev = %d, want 3", s.compactMainRev) + } + end := make([]byte, 8) + binary.BigEndian.PutUint64(end, uint64(4)) + wact := []testutil.Action{ + {"put", []interface{}{metaBucketName, scheduledCompactKeyName, newTestBytes(revision{3, 0})}}, + {"range", []interface{}{keyBucketName, make([]byte, 17), end, int64(10000)}}, + {"delete", []interface{}{keyBucketName, newTestBytes(revision{2, 0})}}, + {"put", []interface{}{metaBucketName, finishedCompactKeyName, newTestBytes(revision{3, 0})}}, + } + if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { + t.Errorf("tx actions = %+v, want %+v", g, wact) + } + wact = []testutil.Action{ + {"compact", []interface{}{int64(3)}}, + } + if g := index.Action(); !reflect.DeepEqual(g, wact) { + t.Errorf("index action = %+v, want %+v", g, wact) + } +} + +func TestStoreRestore(t *testing.T) { + s, b, index := newFakeStore() + + putev := storagepb.Event{ + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateIndex: 3, + ModIndex: 3, + Version: 1, + }, + } + putevb, err := putev.Marshal() + if err != nil { + t.Fatal(err) + } + delev := storagepb.Event{ + Type: storagepb.DELETE, + Kv: &storagepb.KeyValue{ + Key: []byte("foo"), + }, + } + delevb, err := delev.Marshal() + if err != nil { + t.Fatal(err) + } + b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestBytes(revision{2, 0})}} + b.tx.rangeRespc <- rangeResp{[][]byte{newTestBytes(revision{3, 0}), newTestBytes(revision{4, 0})}, [][]byte{putevb, delevb}} + b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestBytes(revision{2, 0})}} + + s.Restore() + + if s.compactMainRev != 2 { + t.Errorf("compact rev = %d, want 4", s.compactMainRev) + } + wrev := revision{4, 0} + if !reflect.DeepEqual(s.currentRev, wrev) { + t.Errorf("current rev = %v, want %v", s.currentRev, wrev) + } + wact := []testutil.Action{ + {"range", []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}}, + {"range", []interface{}{keyBucketName, newTestBytes(revision{}), newTestBytes(revision{math.MaxInt64, math.MaxInt64}), int64(0)}}, + {"range", []interface{}{metaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}}, + } + if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { + t.Errorf("tx actions = %+v, want %+v", g, wact) + } + wact = []testutil.Action{ + {"restore", []interface{}{[]byte("foo"), revision{3, 0}, revision{3, 0}, int64(1)}}, + {"tombstone", []interface{}{[]byte("foo"), revision{4, 0}}}, + } + if g := index.Action(); !reflect.DeepEqual(g, wact) { + t.Errorf("index action = %+v, want %+v", g, wact) } } @@ -101,3 +376,105 @@ func BenchmarkStorePut(b *testing.B) { s.Put(keys[i], []byte("foo")) } } + +func newTestBytes(rev revision) []byte { + bytes := newRevBytes() + revToBytes(rev, bytes) + return bytes +} + +func newFakeStore() (*store, *fakeBackend, *fakeIndex) { + b := &fakeBackend{&fakeBatchTx{rangeRespc: make(chan rangeResp, 5)}} + index := &fakeIndex{ + indexGetRespc: make(chan indexGetResp, 1), + indexRangeRespc: make(chan indexRangeResp, 1), + indexCompactRespc: make(chan map[revision]struct{}, 1), + } + return &store{ + b: b, + kvindex: index, + currentRev: revision{}, + compactMainRev: -1, + }, b, index +} + +type rangeResp struct { + keys [][]byte + vals [][]byte +} + +type fakeBatchTx struct { + testutil.Recorder + rangeRespc chan rangeResp +} + +func (b *fakeBatchTx) Lock() {} +func (b *fakeBatchTx) Unlock() {} +func (b *fakeBatchTx) UnsafeCreateBucket(name []byte) {} +func (b *fakeBatchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { + b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucketName, key, value}}) +} +func (b *fakeBatchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) { + b.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{bucketName, key, endKey, limit}}) + r := <-b.rangeRespc + return r.keys, r.vals +} +func (b *fakeBatchTx) UnsafeDelete(bucketName []byte, key []byte) { + b.Recorder.Record(testutil.Action{Name: "delete", Params: []interface{}{bucketName, key}}) +} +func (b *fakeBatchTx) Commit() {} +func (b *fakeBatchTx) CommitAndStop() {} + +type fakeBackend struct { + tx *fakeBatchTx +} + +func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx } +func (b *fakeBackend) Snapshot(w io.Writer) (n int64, err error) { return 0, errors.New("unsupported") } +func (b *fakeBackend) ForceCommit() {} +func (b *fakeBackend) Close() error { return nil } + +type indexGetResp struct { + rev revision + created revision + ver int64 + err error +} + +type indexRangeResp struct { + keys [][]byte + revs []revision +} + +type fakeIndex struct { + testutil.Recorder + indexGetRespc chan indexGetResp + indexRangeRespc chan indexRangeResp + indexCompactRespc chan map[revision]struct{} +} + +func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created revision, ver int64, err error) { + i.Recorder.Record(testutil.Action{Name: "get", Params: []interface{}{key, atRev}}) + r := <-i.indexGetRespc + return r.rev, r.created, r.ver, r.err +} +func (i *fakeIndex) Range(key, end []byte, atRev int64) ([][]byte, []revision) { + i.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{key, end, atRev}}) + r := <-i.indexRangeRespc + return r.keys, r.revs +} +func (i *fakeIndex) Put(key []byte, rev revision) { + i.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{key, rev}}) +} +func (i *fakeIndex) Restore(key []byte, created, modified revision, ver int64) { + i.Recorder.Record(testutil.Action{Name: "restore", Params: []interface{}{key, created, modified, ver}}) +} +func (i *fakeIndex) Tombstone(key []byte, rev revision) error { + i.Recorder.Record(testutil.Action{Name: "tombstone", Params: []interface{}{key, rev}}) + return nil +} +func (i *fakeIndex) Compact(rev int64) map[revision]struct{} { + i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}}) + return <-i.indexCompactRespc +} +func (i *fakeIndex) Equal(b index) bool { return false }