storage: save version

This commit is contained in:
Xiang Li 2015-06-29 12:47:17 -07:00
parent bd84e678e6
commit ccca2b04da
6 changed files with 22 additions and 20 deletions

View File

@ -8,7 +8,7 @@ import (
) )
type index interface { type index interface {
Get(key []byte, atRev int64) (rev, created reversion, err error) Get(key []byte, atRev int64) (rev, created reversion, ver int64, err error)
Range(key, end []byte, atRev int64) ([][]byte, []reversion) Range(key, end []byte, atRev int64) ([][]byte, []reversion)
Put(key []byte, rev reversion) Put(key []byte, rev reversion)
Tombstone(key []byte, rev reversion) error Tombstone(key []byte, rev reversion) error
@ -42,14 +42,14 @@ func (ti *treeIndex) Put(key []byte, rev reversion) {
okeyi.put(rev.main, rev.sub) okeyi.put(rev.main, rev.sub)
} }
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created reversion, err error) { func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created reversion, ver int64, err error) {
keyi := &keyIndex{key: key} keyi := &keyIndex{key: key}
ti.RLock() ti.RLock()
defer ti.RUnlock() defer ti.RUnlock()
item := ti.tree.Get(keyi) item := ti.tree.Get(keyi)
if item == nil { if item == nil {
return reversion{}, reversion{}, ErrReversionNotFound return reversion{}, reversion{}, 0, ErrReversionNotFound
} }
keyi = item.(*keyIndex) keyi = item.(*keyIndex)
@ -58,7 +58,7 @@ func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created reversion,
func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []reversion) { func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []reversion) {
if end == nil { if end == nil {
rev, _, err := ti.Get(key, atRev) rev, _, _, err := ti.Get(key, atRev)
if err != nil { if err != nil {
return nil, nil return nil, nil
} }
@ -76,7 +76,7 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []
return false return false
} }
curKeyi := item.(*keyIndex) curKeyi := item.(*keyIndex)
rev, _, err := curKeyi.get(atRev) rev, _, _, err := curKeyi.get(atRev)
if err != nil { if err != nil {
return true return true
} }

View File

@ -91,7 +91,7 @@ func TestIndexTombstone(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("tombstone error = %v, want nil", err) t.Errorf("tombstone error = %v, want nil", err)
} }
rev, err := index.Get([]byte("foo"), 7) rev, _, _, err := index.Get([]byte("foo"), 7)
if err != nil { if err != nil {
t.Errorf("get error = %v, want nil", err) t.Errorf("get error = %v, want nil", err)
} }
@ -197,7 +197,7 @@ func TestContinuousCompact(t *testing.T) {
func verify(t *testing.T, index index, tests []T) { func verify(t *testing.T, index index, tests []T) {
for i, tt := range tests { for i, tt := range tests {
h, _, err := index.Get(tt.key, tt.rev) h, _, _, err := index.Get(tt.key, tt.rev)
if err != tt.werr { if err != tt.werr {
t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
} }

View File

@ -88,15 +88,15 @@ func (ki *keyIndex) tombstone(main int64, sub int64) {
ki.generations = append(ki.generations, generation{}) ki.generations = append(ki.generations, generation{})
} }
// get gets the modified and created reversion of the key that satisfies the given atRev. // get gets the modified, created reversion and version of the key that satisfies the given atRev.
// Rev must be higher than or equal to the given atRev. // Rev must be higher than or equal to the given atRev.
func (ki *keyIndex) get(atRev int64) (modified, created reversion, err error) { func (ki *keyIndex) get(atRev int64) (modified, created reversion, ver int64, err error) {
if ki.isEmpty() { if ki.isEmpty() {
log.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key)) log.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
} }
g := ki.findGeneration(atRev) g := ki.findGeneration(atRev)
if g.isEmpty() { if g.isEmpty() {
return reversion{}, reversion{}, ErrReversionNotFound return reversion{}, reversion{}, 0, ErrReversionNotFound
} }
f := func(rev reversion) bool { f := func(rev reversion) bool {
@ -108,10 +108,10 @@ func (ki *keyIndex) get(atRev int64) (modified, created reversion, err error) {
n := g.walk(f) n := g.walk(f)
if n != -1 { if n != -1 {
return g.revs[n], g.created, nil return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
} }
return reversion{}, reversion{}, ErrReversionNotFound return reversion{}, reversion{}, 0, ErrReversionNotFound
} }
// compact compacts a keyIndex by removing the versions with smaller or equal // compact compacts a keyIndex by removing the versions with smaller or equal

View File

@ -39,7 +39,7 @@ func TestKeyIndexGet(t *testing.T) {
} }
for i, tt := range tests { for i, tt := range tests {
rev, _, err := ki.get(tt.rev) rev, _, _, err := ki.get(tt.rev)
if err != tt.werr { if err != tt.werr {
t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
} }

View File

@ -298,7 +298,7 @@ func (s *store) put(key, value []byte, rev int64) {
c := rev c := rev
// if the key exists before, use its previous created // if the key exists before, use its previous created
_, created, err := s.kvindex.Get(key, rev) _, created, ver, err := s.kvindex.Get(key, rev)
if err == nil { if err == nil {
c = created.main c = created.main
} }
@ -306,6 +306,7 @@ func (s *store) put(key, value []byte, rev int64) {
ibytes := newRevBytes() ibytes := newRevBytes()
revToBytes(reversion{main: rev, sub: s.currentRev.sub}, ibytes) revToBytes(reversion{main: rev, sub: s.currentRev.sub}, ibytes)
ver = ver + 1
event := storagepb.Event{ event := storagepb.Event{
Type: storagepb.PUT, Type: storagepb.PUT,
Kv: storagepb.KeyValue{ Kv: storagepb.KeyValue{
@ -313,6 +314,7 @@ func (s *store) put(key, value []byte, rev int64) {
Value: value, Value: value,
CreateIndex: c, CreateIndex: c,
ModIndex: rev, ModIndex: rev,
Version: ver,
}, },
} }
@ -355,7 +357,7 @@ func (s *store) delete(key []byte, mainrev int64) bool {
if s.currentRev.sub > 0 { if s.currentRev.sub > 0 {
grev += 1 grev += 1
} }
rev, _, err := s.kvindex.Get(key, grev) rev, _, _, err := s.kvindex.Get(key, grev)
if err != nil { if err != nil {
// key not exist // key not exist
return false return false

View File

@ -20,9 +20,9 @@ func TestRange(t *testing.T) {
s.Put([]byte("foo1"), []byte("bar1")) s.Put([]byte("foo1"), []byte("bar1"))
s.Put([]byte("foo2"), []byte("bar2")) s.Put([]byte("foo2"), []byte("bar2"))
kvs := []storagepb.KeyValue{ kvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar")}, {Key: []byte("foo"), Value: []byte("bar"), CreateIndex: 1, ModIndex: 1, Version: 1},
{Key: []byte("foo1"), Value: []byte("bar1")}, {Key: []byte("foo1"), Value: []byte("bar1"), CreateIndex: 2, ModIndex: 2, Version: 1},
{Key: []byte("foo2"), Value: []byte("bar2")}, {Key: []byte("foo2"), Value: []byte("bar2"), CreateIndex: 3, ModIndex: 3, Version: 1},
} }
tests := []struct { tests := []struct {
@ -100,8 +100,8 @@ func TestRangeLimit(t *testing.T) {
s.Put([]byte("foo2"), []byte("bar2")) s.Put([]byte("foo2"), []byte("bar2"))
s.DeleteRange([]byte("foo1"), nil) s.DeleteRange([]byte("foo1"), nil)
kvs := []storagepb.KeyValue{ kvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar")}, {Key: []byte("foo"), Value: []byte("bar"), CreateIndex: 1, ModIndex: 1, Version: 1},
{Key: []byte("foo2"), Value: []byte("bar2")}, {Key: []byte("foo2"), Value: []byte("bar2"), CreateIndex: 3, ModIndex: 3, Version: 1},
} }
tests := []struct { tests := []struct {