Merge pull request #3073 from xiang90/storage_ver

storage: save version
This commit is contained in:
Xiang Li 2015-06-29 13:19:02 -07:00
commit 8d3e3ff25a
6 changed files with 22 additions and 20 deletions

View File

@ -8,7 +8,7 @@ import (
)
type index interface {
Get(key []byte, atRev int64) (rev, created reversion, err error)
Get(key []byte, atRev int64) (rev, created reversion, ver int64, err error)
Range(key, end []byte, atRev int64) ([][]byte, []reversion)
Put(key []byte, rev reversion)
Tombstone(key []byte, rev reversion) error
@ -42,14 +42,14 @@ func (ti *treeIndex) Put(key []byte, rev reversion) {
okeyi.put(rev.main, rev.sub)
}
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created reversion, err error) {
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created reversion, ver int64, err error) {
keyi := &keyIndex{key: key}
ti.RLock()
defer ti.RUnlock()
item := ti.tree.Get(keyi)
if item == nil {
return reversion{}, reversion{}, ErrReversionNotFound
return reversion{}, reversion{}, 0, ErrReversionNotFound
}
keyi = item.(*keyIndex)
@ -58,7 +58,7 @@ func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created reversion,
func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []reversion) {
if end == nil {
rev, _, err := ti.Get(key, atRev)
rev, _, _, err := ti.Get(key, atRev)
if err != nil {
return nil, nil
}
@ -76,7 +76,7 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []
return false
}
curKeyi := item.(*keyIndex)
rev, _, err := curKeyi.get(atRev)
rev, _, _, err := curKeyi.get(atRev)
if err != nil {
return true
}

View File

@ -91,7 +91,7 @@ func TestIndexTombstone(t *testing.T) {
if err != nil {
t.Errorf("tombstone error = %v, want nil", err)
}
rev, err := index.Get([]byte("foo"), 7)
rev, _, _, err := index.Get([]byte("foo"), 7)
if err != nil {
t.Errorf("get error = %v, want nil", err)
}
@ -197,7 +197,7 @@ func TestContinuousCompact(t *testing.T) {
func verify(t *testing.T, index index, tests []T) {
for i, tt := range tests {
h, _, err := index.Get(tt.key, tt.rev)
h, _, _, err := index.Get(tt.key, tt.rev)
if err != tt.werr {
t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
}

View File

@ -88,15 +88,15 @@ func (ki *keyIndex) tombstone(main int64, sub int64) {
ki.generations = append(ki.generations, generation{})
}
// get gets the modified and created reversion of the key that satisfies the given atRev.
// get gets the modified, created reversion and version of the key that satisfies the given atRev.
// Rev must be higher than or equal to the given atRev.
func (ki *keyIndex) get(atRev int64) (modified, created reversion, err error) {
func (ki *keyIndex) get(atRev int64) (modified, created reversion, ver int64, err error) {
if ki.isEmpty() {
log.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
}
g := ki.findGeneration(atRev)
if g.isEmpty() {
return reversion{}, reversion{}, ErrReversionNotFound
return reversion{}, reversion{}, 0, ErrReversionNotFound
}
f := func(rev reversion) bool {
@ -108,10 +108,10 @@ func (ki *keyIndex) get(atRev int64) (modified, created reversion, err error) {
n := g.walk(f)
if n != -1 {
return g.revs[n], g.created, nil
return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
}
return reversion{}, reversion{}, ErrReversionNotFound
return reversion{}, reversion{}, 0, ErrReversionNotFound
}
// compact compacts a keyIndex by removing the versions with smaller or equal

View File

@ -39,7 +39,7 @@ func TestKeyIndexGet(t *testing.T) {
}
for i, tt := range tests {
rev, _, err := ki.get(tt.rev)
rev, _, _, err := ki.get(tt.rev)
if err != tt.werr {
t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
}

View File

@ -298,7 +298,7 @@ func (s *store) put(key, value []byte, rev int64) {
c := rev
// if the key exists before, use its previous created
_, created, err := s.kvindex.Get(key, rev)
_, created, ver, err := s.kvindex.Get(key, rev)
if err == nil {
c = created.main
}
@ -306,6 +306,7 @@ func (s *store) put(key, value []byte, rev int64) {
ibytes := newRevBytes()
revToBytes(reversion{main: rev, sub: s.currentRev.sub}, ibytes)
ver = ver + 1
event := storagepb.Event{
Type: storagepb.PUT,
Kv: storagepb.KeyValue{
@ -313,6 +314,7 @@ func (s *store) put(key, value []byte, rev int64) {
Value: value,
CreateIndex: c,
ModIndex: rev,
Version: ver,
},
}
@ -355,7 +357,7 @@ func (s *store) delete(key []byte, mainrev int64) bool {
if s.currentRev.sub > 0 {
grev += 1
}
rev, _, err := s.kvindex.Get(key, grev)
rev, _, _, err := s.kvindex.Get(key, grev)
if err != nil {
// key not exist
return false

View File

@ -20,9 +20,9 @@ func TestRange(t *testing.T) {
s.Put([]byte("foo1"), []byte("bar1"))
s.Put([]byte("foo2"), []byte("bar2"))
kvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar")},
{Key: []byte("foo1"), Value: []byte("bar1")},
{Key: []byte("foo2"), Value: []byte("bar2")},
{Key: []byte("foo"), Value: []byte("bar"), CreateIndex: 1, ModIndex: 1, Version: 1},
{Key: []byte("foo1"), Value: []byte("bar1"), CreateIndex: 2, ModIndex: 2, Version: 1},
{Key: []byte("foo2"), Value: []byte("bar2"), CreateIndex: 3, ModIndex: 3, Version: 1},
}
tests := []struct {
@ -100,8 +100,8 @@ func TestRangeLimit(t *testing.T) {
s.Put([]byte("foo2"), []byte("bar2"))
s.DeleteRange([]byte("foo1"), nil)
kvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar")},
{Key: []byte("foo2"), Value: []byte("bar2")},
{Key: []byte("foo"), Value: []byte("bar"), CreateIndex: 1, ModIndex: 1, Version: 1},
{Key: []byte("foo2"), Value: []byte("bar2"), CreateIndex: 3, ModIndex: 3, Version: 1},
}
tests := []struct {