diff --git a/storage/index.go b/storage/index.go index 57ab5e137..cb196fed8 100644 --- a/storage/index.go +++ b/storage/index.go @@ -28,7 +28,7 @@ type index interface { Put(key []byte, rev revision) Restore(key []byte, created, modified revision, ver int64) Tombstone(key []byte, rev revision) error - RangeEvents(key, end []byte, rev int64) []revision + RangeSince(key, end []byte, rev int64) []revision Compact(rev int64) map[revision]struct{} Equal(b index) bool } @@ -134,10 +134,10 @@ func (ti *treeIndex) Tombstone(key []byte, rev revision) error { return ki.tombstone(rev.main, rev.sub) } -// RangeEvents returns all revisions from key(including) to end(excluding) +// RangeSince returns all revisions from key(including) to end(excluding) // at or after the given rev. The returned slice is sorted in the order // of revision. -func (ti *treeIndex) RangeEvents(key, end []byte, rev int64) []revision { +func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision { ti.RLock() defer ti.RUnlock() diff --git a/storage/index_test.go b/storage/index_test.go index 5acf51836..7ab07486b 100644 --- a/storage/index_test.go +++ b/storage/index_test.go @@ -136,7 +136,7 @@ func TestIndexTombstone(t *testing.T) { } } -func TestIndexRangeEvents(t *testing.T) { +func TestIndexRangeSince(t *testing.T) { allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2"), []byte("foo2"), []byte("foo1"), []byte("foo")} allRevs := []revision{{main: 1}, {main: 2}, {main: 3}, {main: 4}, {main: 5}, {main: 6}} @@ -184,7 +184,7 @@ func TestIndexRangeEvents(t *testing.T) { }, } for i, tt := range tests { - revs := index.RangeEvents(tt.key, tt.end, atRev) + revs := index.RangeSince(tt.key, tt.end, atRev) if !reflect.DeepEqual(revs, tt.wrevs) { t.Errorf("#%d: revs = %+v, want %+v", i, revs, tt.wrevs) } diff --git a/storage/kvstore.go b/storage/kvstore.go index f5a9f6e28..042e7b80e 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -32,6 +32,13 @@ var ( keyBucketName = []byte("key") metaBucketName = []byte("meta") + // markedRevBytesLen is the byte length of marked revision. + // The first `revBytesLen` bytes represents a normal revision. The last + // one byte is the mark. + markedRevBytesLen = revBytesLen + 1 + markBytePosition = markedRevBytesLen - 1 + markTombstone byte = 't' + scheduledCompactKeyName = []byte("scheduledCompactRev") finishedCompactKeyName = []byte("finishedCompactRev") @@ -193,7 +200,7 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err return n, rev, nil } -// RangeEvents gets the events from key to end starting from startRev. +// RangeHistory ranges the history from key to end starting from startRev. // If `end` is nil, the request only observes the events on key. // If `end` is not nil, it observes the events on key range [key, range_end). // Limit limits the number of events returned. @@ -202,28 +209,29 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err // If the required start rev is compacted, ErrCompacted will be returned. // If the required start rev has not happened, ErrFutureRev will be returned. // -// RangeEvents returns events that satisfy the requirement (0 <= n <= limit). -// If events in the revision range have not all happened, it returns immeidately +// RangeHistory returns revision bytes slice and key-values that satisfy the requirement (0 <= n <= limit). +// If history in the revision range has not all happened, it returns immeidately // what is available. // It also returns nextRev which indicates the start revision used for the following // RangeEvents call. The nextRev could be smaller than the given endRev if the store // has not progressed so far or it hits the event limit. // -// TODO: return byte slices instead of events to avoid meaningless encode and decode. -func (s *store) RangeEvents(key, end []byte, limit, startRev int64) (evs []storagepb.Event, nextRev int64, err error) { +// TODO: return byte slices instead of keyValues to avoid meaningless encode and decode. +// This also helps to return raw (key, val) pair directly to make API consistent. +func (s *store) RangeHistory(key, end []byte, limit, startRev int64) (revbs [][]byte, kvs []storagepb.KeyValue, nextRev int64, err error) { s.mu.Lock() defer s.mu.Unlock() if startRev > 0 && startRev <= s.compactMainRev { - return nil, 0, ErrCompacted + return nil, nil, 0, ErrCompacted } if startRev > s.currentRev.main { - return nil, 0, ErrFutureRev + return nil, nil, 0, ErrFutureRev } - revs := s.kvindex.RangeEvents(key, end, startRev) + revs := s.kvindex.RangeSince(key, end, startRev) if len(revs) == 0 { - return nil, s.currentRev.main + 1, nil + return nil, nil, s.currentRev.main + 1, nil } tx := s.b.BatchTx() @@ -231,24 +239,24 @@ func (s *store) RangeEvents(key, end []byte, limit, startRev int64) (evs []stora defer tx.Unlock() // fetch events from the backend using revisions for _, rev := range revs { - revbytes := newRevBytes() - revToBytes(rev, revbytes) + start, end := revBytesRange(rev) - _, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) + ks, vs := tx.UnsafeRange(keyBucketName, start, end, 0) if len(vs) != 1 { log.Fatalf("storage: range cannot find rev (%d,%d)", rev.main, rev.sub) } - e := storagepb.Event{} - if err := e.Unmarshal(vs[0]); err != nil { + var kv storagepb.KeyValue + if err := kv.Unmarshal(vs[0]); err != nil { log.Fatalf("storage: cannot unmarshal event: %v", err) } - evs = append(evs, e) - if limit > 0 && len(evs) >= int(limit) { - return evs, rev.main + 1, nil + revbs = append(revbs, ks[0]) + kvs = append(kvs, kv) + if limit > 0 && len(kvs) >= int(limit) { + return revbs, kvs, rev.main + 1, nil } } - return evs, s.currentRev.main + 1, nil + return revbs, kvs, s.currentRev.main + 1, nil } func (s *store) Compact(rev int64) error { @@ -316,21 +324,19 @@ func (s *store) Restore() error { // TODO: limit N to reduce max memory usage keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0) for i, key := range keys { - e := &storagepb.Event{} - if err := e.Unmarshal(vals[i]); err != nil { + var kv storagepb.KeyValue + if err := kv.Unmarshal(vals[i]); err != nil { log.Fatalf("storage: cannot unmarshal event: %v", err) } - rev := bytesToRev(key) + rev := bytesToRev(key[:revBytesLen]) // restore index - switch e.Type { - case storagepb.PUT: - s.kvindex.Restore(e.Kv.Key, revision{e.Kv.CreateRevision, 0}, rev, e.Kv.Version) - case storagepb.DELETE: - s.kvindex.Tombstone(e.Kv.Key, rev) + switch { + case isTombstone(key): + s.kvindex.Tombstone(kv.Key, rev) default: - log.Panicf("storage: unexpected event type %s", e.Type) + s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version) } // update revision @@ -392,19 +398,18 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage } for _, revpair := range revpairs { - revbytes := newRevBytes() - revToBytes(revpair, revbytes) + start, end := revBytesRange(revpair) - _, vs := s.tx.UnsafeRange(keyBucketName, revbytes, nil, 0) + _, vs := s.tx.UnsafeRange(keyBucketName, start, end, 0) if len(vs) != 1 { log.Fatalf("storage: range cannot find rev (%d,%d)", revpair.main, revpair.sub) } - e := &storagepb.Event{} - if err := e.Unmarshal(vs[0]); err != nil { + var kv storagepb.KeyValue + if err := kv.Unmarshal(vs[0]); err != nil { log.Fatalf("storage: cannot unmarshal event: %v", err) } - kvs = append(kvs, *e.Kv) + kvs = append(kvs, kv) if limit > 0 && len(kvs) >= int(limit) { break } @@ -426,18 +431,15 @@ func (s *store) put(key, value []byte) { revToBytes(revision{main: rev, sub: s.currentRev.sub}, ibytes) ver = ver + 1 - event := storagepb.Event{ - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{ - Key: key, - Value: value, - CreateRevision: c, - ModRevision: rev, - Version: ver, - }, + kv := storagepb.KeyValue{ + Key: key, + Value: value, + CreateRevision: c, + ModRevision: rev, + Version: ver, } - d, err := event.Marshal() + d, err := kv.Marshal() if err != nil { log.Fatalf("storage: cannot marshal event: %v", err) } @@ -469,15 +471,13 @@ func (s *store) delete(key []byte) { ibytes := newRevBytes() revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes) + ibytes = appendMarkTombstone(ibytes) - event := storagepb.Event{ - Type: storagepb.DELETE, - Kv: &storagepb.KeyValue{ - Key: key, - }, + kv := storagepb.KeyValue{ + Key: key, } - d, err := event.Marshal() + d, err := kv.Marshal() if err != nil { log.Fatalf("storage: cannot marshal event: %v", err) } @@ -489,3 +489,29 @@ func (s *store) delete(key []byte) { } s.currentRev.sub += 1 } + +// appendMarkTombstone appends tombstone mark to normal revision bytes. +func appendMarkTombstone(b []byte) []byte { + if len(b) != revBytesLen { + log.Panicf("cannot append mark to non normal revision bytes") + } + return append(b, markTombstone) +} + +// isTombstone checks whether the revision bytes is a tombstone. +func isTombstone(b []byte) bool { + return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone +} + +// revBytesRange returns the range of revision bytes at +// the given revision. +func revBytesRange(rev revision) (start, end []byte) { + start = newRevBytes() + revToBytes(rev, start) + + end = newRevBytes() + endRev := revision{main: rev.main, sub: rev.sub + 1} + revToBytes(endRev, end) + + return start, end +} diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index eab2db980..0fa253197 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -46,22 +46,21 @@ func TestStorePut(t *testing.T) { r indexGetResp wrev revision - wev storagepb.Event + wkey []byte + wkv storagepb.KeyValue wputrev revision }{ { revision{1, 0}, indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound}, revision{1, 1}, - storagepb.Event{ - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{ - Key: []byte("foo"), - Value: []byte("bar"), - CreateRevision: 2, - ModRevision: 2, - Version: 1, - }, + newTestKeyBytes(revision{2, 0}, false), + storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateRevision: 2, + ModRevision: 2, + Version: 1, }, revision{2, 0}, }, @@ -69,15 +68,13 @@ func TestStorePut(t *testing.T) { revision{1, 1}, indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil}, revision{1, 2}, - storagepb.Event{ - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{ - Key: []byte("foo"), - Value: []byte("bar"), - CreateRevision: 2, - ModRevision: 2, - Version: 2, - }, + newTestKeyBytes(revision{2, 1}, false), + storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateRevision: 2, + ModRevision: 2, + Version: 2, }, revision{2, 1}, }, @@ -85,15 +82,13 @@ func TestStorePut(t *testing.T) { revision{2, 0}, indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil}, revision{2, 1}, - storagepb.Event{ - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{ - Key: []byte("foo"), - Value: []byte("bar"), - CreateRevision: 2, - ModRevision: 3, - Version: 3, - }, + newTestKeyBytes(revision{3, 0}, false), + storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateRevision: 2, + ModRevision: 3, + Version: 3, }, revision{3, 0}, }, @@ -106,12 +101,12 @@ func TestStorePut(t *testing.T) { s.put([]byte("foo"), []byte("bar")) - data, err := tt.wev.Marshal() + data, err := tt.wkv.Marshal() if err != nil { t.Errorf("#%d: marshal err = %v, want nil", i, err) } wact := []testutil.Action{ - {"put", []interface{}{keyBucketName, newTestBytes(tt.wputrev), data}}, + {"put", []interface{}{keyBucketName, tt.wkey, data}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) @@ -130,17 +125,15 @@ func TestStorePut(t *testing.T) { } func TestStoreRange(t *testing.T) { - ev := storagepb.Event{ - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{ - Key: []byte("foo"), - Value: []byte("bar"), - CreateRevision: 1, - ModRevision: 2, - Version: 1, - }, + key := newTestKeyBytes(revision{2, 0}, false) + kv := storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateRevision: 1, + ModRevision: 2, + Version: 1, } - evb, err := ev.Marshal() + kvb, err := kv.Marshal() if err != nil { t.Fatal(err) } @@ -153,11 +146,11 @@ func TestStoreRange(t *testing.T) { }{ { indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}}, - rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}}, + rangeResp{[][]byte{key}, [][]byte{kvb}}, }, { indexRangeResp{[][]byte{[]byte("foo"), []byte("foo1")}, []revision{{2, 0}, {3, 0}}}, - rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}}, + rangeResp{[][]byte{key}, [][]byte{kvb}}, }, } for i, tt := range tests { @@ -171,15 +164,16 @@ func TestStoreRange(t *testing.T) { if err != nil { t.Errorf("#%d: err = %v, want nil", i, err) } - if w := []storagepb.KeyValue{*ev.Kv}; !reflect.DeepEqual(kvs, w) { + if w := []storagepb.KeyValue{kv}; !reflect.DeepEqual(kvs, w) { t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, w) } if rev != wrev { t.Errorf("#%d: rev = %d, want %d", i, rev, wrev) } + wstart, wend := revBytesRange(tt.idxr.revs[0]) wact := []testutil.Action{ - {"range", []interface{}{keyBucketName, newTestBytes(tt.idxr.revs[0]), []byte(nil), int64(0)}}, + {"range", []interface{}{keyBucketName, wstart, wend, int64(0)}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) @@ -201,6 +195,7 @@ func TestStoreDeleteRange(t *testing.T) { rev revision r indexRangeResp + wkey []byte wrev revision wrrev int64 wdelrev revision @@ -208,6 +203,7 @@ func TestStoreDeleteRange(t *testing.T) { { revision{2, 0}, indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}}, + newTestKeyBytes(revision{3, 0}, true), revision{2, 1}, 2, revision{3, 0}, @@ -215,6 +211,7 @@ func TestStoreDeleteRange(t *testing.T) { { revision{2, 1}, indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}}, + newTestKeyBytes(revision{3, 1}, true), revision{2, 2}, 3, revision{3, 1}, @@ -231,17 +228,14 @@ func TestStoreDeleteRange(t *testing.T) { t.Errorf("#%d: n = %d, want 1", i, n) } - data, err := (&storagepb.Event{ - Type: storagepb.DELETE, - Kv: &storagepb.KeyValue{ - Key: []byte("foo"), - }, + data, err := (&storagepb.KeyValue{ + Key: []byte("foo"), }).Marshal() if err != nil { t.Errorf("#%d: marshal err = %v, want nil", i, err) } wact := []testutil.Action{ - {"put", []interface{}{keyBucketName, newTestBytes(tt.wdelrev), data}}, + {"put", []interface{}{keyBucketName, tt.wkey, data}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) @@ -259,18 +253,16 @@ func TestStoreDeleteRange(t *testing.T) { } } -func TestStoreRangeEvents(t *testing.T) { - ev := storagepb.Event{ - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{ - Key: []byte("foo"), - Value: []byte("bar"), - CreateRevision: 1, - ModRevision: 2, - Version: 1, - }, +func TestStoreRangeHistory(t *testing.T) { + key := newTestKeyBytes(revision{2, 0}, false) + kv := storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateRevision: 1, + ModRevision: 2, + Version: 1, } - evb, err := ev.Marshal() + kvb, err := kv.Marshal() if err != nil { t.Fatal(err) } @@ -282,11 +274,11 @@ func TestStoreRangeEvents(t *testing.T) { }{ { indexRangeEventsResp{[]revision{{2, 0}}}, - rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}}, + rangeResp{[][]byte{key}, [][]byte{kvb}}, }, { indexRangeEventsResp{[]revision{{2, 0}, {3, 0}}}, - rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}}, + rangeResp{[][]byte{key}, [][]byte{kvb}}, }, } for i, tt := range tests { @@ -295,12 +287,15 @@ func TestStoreRangeEvents(t *testing.T) { index.indexRangeEventsRespc <- tt.idxr b.tx.rangeRespc <- tt.r - evs, _, err := s.RangeEvents([]byte("foo"), []byte("goo"), 1, 1) + keys, kvs, _, err := s.RangeHistory([]byte("foo"), []byte("goo"), 1, 1) if err != nil { t.Errorf("#%d: err = %v, want nil", i, err) } - if w := []storagepb.Event{ev}; !reflect.DeepEqual(evs, w) { - t.Errorf("#%d: evs = %+v, want %+v", i, evs, w) + if w := [][]byte{key}; !reflect.DeepEqual(keys, w) { + t.Errorf("#%d: keys = %+v, want %+v", i, keys, w) + } + if w := []storagepb.KeyValue{kv}; !reflect.DeepEqual(kvs, w) { + t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, w) } wact := []testutil.Action{ @@ -309,8 +304,9 @@ func TestStoreRangeEvents(t *testing.T) { if g := index.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) } + wstart, wend := revBytesRange(tt.idxr.revs[0]) wact = []testutil.Action{ - {"range", []interface{}{keyBucketName, newTestBytes(tt.idxr.revs[0]), []byte(nil), int64(0)}}, + {"range", []interface{}{keyBucketName, wstart, wend, int64(0)}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) @@ -325,7 +321,9 @@ func TestStoreCompact(t *testing.T) { s, b, index := newFakeStore() s.currentRev = revision{3, 0} index.indexCompactRespc <- map[revision]struct{}{revision{1, 0}: {}} - b.tx.rangeRespc <- rangeResp{[][]byte{newTestBytes(revision{1, 0}), newTestBytes(revision{2, 0})}, nil} + key1 := newTestKeyBytes(revision{1, 0}, false) + key2 := newTestKeyBytes(revision{2, 0}, false) + b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, nil} s.Compact(3) s.wg.Wait() @@ -336,10 +334,10 @@ func TestStoreCompact(t *testing.T) { end := make([]byte, 8) binary.BigEndian.PutUint64(end, uint64(4)) wact := []testutil.Action{ - {"put", []interface{}{metaBucketName, scheduledCompactKeyName, newTestBytes(revision{3, 0})}}, + {"put", []interface{}{metaBucketName, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}}, {"range", []interface{}{keyBucketName, make([]byte, 17), end, int64(10000)}}, - {"delete", []interface{}{keyBucketName, newTestBytes(revision{2, 0})}}, - {"put", []interface{}{metaBucketName, finishedCompactKeyName, newTestBytes(revision{3, 0})}}, + {"delete", []interface{}{keyBucketName, key2}}, + {"put", []interface{}{metaBucketName, finishedCompactKeyName, newTestRevBytes(revision{3, 0})}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("tx actions = %+v, want %+v", g, wact) @@ -355,33 +353,29 @@ func TestStoreCompact(t *testing.T) { func TestStoreRestore(t *testing.T) { s, b, index := newFakeStore() - putev := storagepb.Event{ - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{ - Key: []byte("foo"), - Value: []byte("bar"), - CreateRevision: 3, - ModRevision: 3, - Version: 1, - }, + putkey := newTestKeyBytes(revision{3, 0}, false) + putkv := storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateRevision: 3, + ModRevision: 3, + Version: 1, } - putevb, err := putev.Marshal() + putkvb, err := putkv.Marshal() if err != nil { t.Fatal(err) } - delev := storagepb.Event{ - Type: storagepb.DELETE, - Kv: &storagepb.KeyValue{ - Key: []byte("foo"), - }, + delkey := newTestKeyBytes(revision{4, 0}, true) + delkv := storagepb.KeyValue{ + Key: []byte("foo"), } - delevb, err := delev.Marshal() + delkvb, err := delkv.Marshal() if err != nil { t.Fatal(err) } - b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestBytes(revision{2, 0})}} - b.tx.rangeRespc <- rangeResp{[][]byte{newTestBytes(revision{3, 0}), newTestBytes(revision{4, 0})}, [][]byte{putevb, delevb}} - b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestBytes(revision{2, 0})}} + b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestRevBytes(revision{2, 0})}} + b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}} + b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{2, 0})}} s.Restore() @@ -394,7 +388,7 @@ func TestStoreRestore(t *testing.T) { } wact := []testutil.Action{ {"range", []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}}, - {"range", []interface{}{keyBucketName, newTestBytes(revision{}), newTestBytes(revision{math.MaxInt64, math.MaxInt64}), int64(0)}}, + {"range", []interface{}{keyBucketName, newTestRevBytes(revision{}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(0)}}, {"range", []interface{}{metaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { @@ -410,79 +404,79 @@ func TestStoreRestore(t *testing.T) { } // tests end parameter works well -func TestStoreRangeEventsEnd(t *testing.T) { +func TestStoreRangeHistoryEnd(t *testing.T) { s := newStore(tmpPath) defer cleanup(s, tmpPath) s.Put([]byte("foo"), []byte("bar")) s.Put([]byte("foo1"), []byte("bar1")) s.Put([]byte("foo2"), []byte("bar2")) - evs := []storagepb.Event{ - { - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, - }, - { - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1}, - }, - { - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1}, - }, + keys := [][]byte{ + newTestKeyBytes(revision{1, 0}, false), + newTestKeyBytes(revision{2, 0}, false), + newTestKeyBytes(revision{3, 0}, false), + } + kvs := []storagepb.KeyValue{ + {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, + {Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1}, + {Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1}, } tests := []struct { key, end []byte - wevs []storagepb.Event + wkeys [][]byte + wkvs []storagepb.KeyValue }{ // get no keys { []byte("doo"), []byte("foo"), - nil, + nil, nil, }, // get no keys when key == end { []byte("foo"), []byte("foo"), - nil, + nil, nil, }, // get no keys when ranging single key { []byte("doo"), nil, - nil, + nil, nil, }, // get all keys { []byte("foo"), []byte("foo3"), - evs, + keys, kvs, }, // get partial keys { []byte("foo"), []byte("foo1"), - evs[:1], + keys[:1], kvs[:1], }, // get single key { []byte("foo"), nil, - evs[:1], + keys[:1], kvs[:1], }, } for i, tt := range tests { - evs, rev, err := s.RangeEvents(tt.key, tt.end, 0, 1) + keys, kvs, rev, err := s.RangeHistory(tt.key, tt.end, 0, 1) if err != nil { t.Fatal(err) } if rev != 4 { t.Errorf("#%d: rev = %d, want %d", i, rev, 4) } - if !reflect.DeepEqual(evs, tt.wevs) { - t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs) + if !reflect.DeepEqual(keys, tt.wkeys) { + t.Errorf("#%d: actions = %+v, want %+v", i, keys, tt.wkeys) + } + if !reflect.DeepEqual(kvs, tt.wkvs) { + t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs) } } } -func TestStoreRangeEventsRev(t *testing.T) { +func TestStoreRangeHistoryRev(t *testing.T) { s := newStore(tmpPath) defer cleanup(s, tmpPath) @@ -490,39 +484,39 @@ func TestStoreRangeEventsRev(t *testing.T) { s.DeleteRange([]byte("foo"), nil) s.Put([]byte("foo"), []byte("bar")) s.Put([]byte("unrelated"), []byte("unrelated")) - evs := []storagepb.Event{ - { - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, - }, - { - Type: storagepb.DELETE, - Kv: &storagepb.KeyValue{Key: []byte("foo")}, - }, - { - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1}, - }, + keys := [][]byte{ + newTestKeyBytes(revision{1, 0}, false), + newTestKeyBytes(revision{2, 0}, true), + newTestKeyBytes(revision{3, 0}, false), + } + kvs := []storagepb.KeyValue{ + {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, + {Key: []byte("foo")}, + {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1}, } tests := []struct { start int64 - wevs []storagepb.Event + wkeys [][]byte + wkvs []storagepb.KeyValue wnext int64 }{ - {0, evs, 5}, - {1, evs, 5}, - {3, evs[2:], 5}, + {0, keys, kvs, 5}, + {1, keys, kvs, 5}, + {3, keys[2:], kvs[2:], 5}, } for i, tt := range tests { - evs, next, err := s.RangeEvents([]byte("foo"), nil, 0, tt.start) + keys, kvs, next, err := s.RangeHistory([]byte("foo"), nil, 0, tt.start) if err != nil { t.Fatal(err) } - if !reflect.DeepEqual(evs, tt.wevs) { - t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs) + if !reflect.DeepEqual(keys, tt.wkeys) { + t.Errorf("#%d: acts = %+v, want %+v", i, keys, tt.wkeys) + } + if !reflect.DeepEqual(kvs, tt.wkvs) { + t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs) } if next != tt.wnext { t.Errorf("#%d: next = %d, want %d", i, next, tt.wnext) @@ -530,7 +524,7 @@ func TestStoreRangeEventsRev(t *testing.T) { } } -func TestStoreRangeEventsBad(t *testing.T) { +func TestStoreRangeHistoryBad(t *testing.T) { s := newStore(tmpPath) defer cleanup(s, tmpPath) @@ -552,55 +546,55 @@ func TestStoreRangeEventsBad(t *testing.T) { {10, ErrFutureRev}, } for i, tt := range tests { - _, _, err := s.RangeEvents([]byte("foo"), nil, 0, tt.rev) + _, _, _, err := s.RangeHistory([]byte("foo"), nil, 0, tt.rev) if err != tt.werr { t.Errorf("#%d: error = %v, want %v", i, err, tt.werr) } } } -func TestStoreRangeEventsLimit(t *testing.T) { +func TestStoreRangeHistoryLimit(t *testing.T) { s := newStore(tmpPath) defer cleanup(s, tmpPath) s.Put([]byte("foo"), []byte("bar")) s.DeleteRange([]byte("foo"), nil) s.Put([]byte("foo"), []byte("bar")) - evs := []storagepb.Event{ - { - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, - }, - { - Type: storagepb.DELETE, - Kv: &storagepb.KeyValue{Key: []byte("foo")}, - }, - { - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1}, - }, + keys := [][]byte{ + newTestKeyBytes(revision{1, 0}, false), + newTestKeyBytes(revision{2, 0}, true), + newTestKeyBytes(revision{3, 0}, false), + } + kvs := []storagepb.KeyValue{ + {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, + {Key: []byte("foo")}, + {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1}, } tests := []struct { limit int64 - wevs []storagepb.Event + wkeys [][]byte + wkvs []storagepb.KeyValue }{ // no limit - {-1, evs}, + {-1, keys, kvs}, // no limit - {0, evs}, - {1, evs[:1]}, - {2, evs[:2]}, - {3, evs}, - {100, evs}, + {0, keys, kvs}, + {1, keys[:1], kvs[:1]}, + {2, keys[:2], kvs[:2]}, + {3, keys, kvs}, + {100, keys, kvs}, } for i, tt := range tests { - evs, _, err := s.RangeEvents([]byte("foo"), nil, tt.limit, 1) + keys, kvs, _, err := s.RangeHistory([]byte("foo"), nil, tt.limit, 1) if err != nil { t.Fatalf("#%d: range error (%v)", i, err) } - if !reflect.DeepEqual(evs, tt.wevs) { - t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs) + if !reflect.DeepEqual(keys, tt.wkeys) { + t.Errorf("#%d: acts = %+v, want %+v", i, keys, tt.wkeys) + } + if !reflect.DeepEqual(kvs, tt.wkvs) { + t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs) } } } @@ -688,12 +682,21 @@ func BenchmarkStorePut(b *testing.B) { } } -func newTestBytes(rev revision) []byte { +func newTestRevBytes(rev revision) []byte { bytes := newRevBytes() revToBytes(rev, bytes) return bytes } +func newTestKeyBytes(rev revision, tombstone bool) []byte { + bytes := newRevBytes() + revToBytes(rev, bytes) + if tombstone { + bytes = appendMarkTombstone(bytes) + } + return bytes +} + func newFakeStore() (*store, *fakeBackend, *fakeIndex) { b := &fakeBackend{&fakeBatchTx{rangeRespc: make(chan rangeResp, 5)}} index := &fakeIndex{ @@ -792,7 +795,7 @@ func (i *fakeIndex) Tombstone(key []byte, rev revision) error { i.Recorder.Record(testutil.Action{Name: "tombstone", Params: []interface{}{key, rev}}) return nil } -func (i *fakeIndex) RangeEvents(key, end []byte, rev int64) []revision { +func (i *fakeIndex) RangeSince(key, end []byte, rev int64) []revision { i.Recorder.Record(testutil.Action{Name: "rangeEvents", Params: []interface{}{key, end, rev}}) r := <-i.indexRangeEventsRespc return r.revs diff --git a/storage/revision.go b/storage/revision.go index d5cf173a2..d2b0458d5 100644 --- a/storage/revision.go +++ b/storage/revision.go @@ -16,6 +16,11 @@ package storage import "encoding/binary" +// revBytesLen is the byte length of a normal revision. +// First 8 bytes is the revision.main in big-endian format. The 9th byte +// is a '_'. The last 8 bytes is the revision.sub in big-endian format. +const revBytesLen = 8 + 1 + 8 + type revision struct { main int64 sub int64 @@ -32,7 +37,7 @@ func (a revision) GreaterThan(b revision) bool { } func newRevBytes() []byte { - return make([]byte, 8+1+8) + return make([]byte, revBytesLen, markedRevBytesLen) } func revToBytes(rev revision, bytes []byte) { diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 5b7f673c7..96825fbc4 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -247,7 +247,7 @@ func (s *watchableStore) syncWatchings() { if limit == 0 { continue } - evs, nextRev, err := s.store.RangeEvents(w.key, end, int64(limit), w.cur) + revbs, kvs, nextRev, err := s.store.RangeHistory(w.key, end, int64(limit), w.cur) if err != nil { // TODO: send error event to watching delete(s.unsynced, w) @@ -255,8 +255,19 @@ func (s *watchableStore) syncWatchings() { } // push events to the channel - for _, ev := range evs { - w.ch <- ev + for i, kv := range kvs { + var evt storagepb.Event_EventType + switch { + case isTombstone(revbs[i]): + evt = storagepb.DELETE + default: + evt = storagepb.PUT + } + + w.ch <- storagepb.Event{ + Type: evt, + Kv: &kv, + } pendingEventsGauge.Inc() } // switch to tracking future events if needed