From 2f74f76025021b462075c597179b819498ef9d7b Mon Sep 17 00:00:00 2001 From: Yicheng Qin <qycqycqycqycqyc@gmail.com> Date: Mon, 9 Nov 2015 17:03:01 -0800 Subject: [PATCH 1/2] storage: remove the event concept from key-value layer The point is to decouple the key-value storage layer and the event notification layer clearly. It gives the watchableKV the flexibility to define whatever event structure it wants without breaking the ondisk format at key-value storage layer. Changes: 1. change the format of key and value stored in backend Store KeyValue struct instead of Event struct in backend value for better abstraction as xiang suggests. And record the corresponded action in the backend key. 2. Remove word 'event' from functions --- storage/index.go | 6 +- storage/index_test.go | 4 +- storage/kvstore.go | 124 ++++++++------ storage/kvstore_test.go | 331 +++++++++++++++++++------------------ storage/revision.go | 7 +- storage/watchable_store.go | 17 +- 6 files changed, 267 insertions(+), 222 deletions(-) diff --git a/storage/index.go b/storage/index.go index 57ab5e137..cb196fed8 100644 --- a/storage/index.go +++ b/storage/index.go @@ -28,7 +28,7 @@ type index interface { Put(key []byte, rev revision) Restore(key []byte, created, modified revision, ver int64) Tombstone(key []byte, rev revision) error - RangeEvents(key, end []byte, rev int64) []revision + RangeSince(key, end []byte, rev int64) []revision Compact(rev int64) map[revision]struct{} Equal(b index) bool } @@ -134,10 +134,10 @@ func (ti *treeIndex) Tombstone(key []byte, rev revision) error { return ki.tombstone(rev.main, rev.sub) } -// RangeEvents returns all revisions from key(including) to end(excluding) +// RangeSince returns all revisions from key(including) to end(excluding) // at or after the given rev. The returned slice is sorted in the order // of revision. -func (ti *treeIndex) RangeEvents(key, end []byte, rev int64) []revision { +func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision { ti.RLock() defer ti.RUnlock() diff --git a/storage/index_test.go b/storage/index_test.go index 5acf51836..7ab07486b 100644 --- a/storage/index_test.go +++ b/storage/index_test.go @@ -136,7 +136,7 @@ func TestIndexTombstone(t *testing.T) { } } -func TestIndexRangeEvents(t *testing.T) { +func TestIndexRangeSince(t *testing.T) { allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2"), []byte("foo2"), []byte("foo1"), []byte("foo")} allRevs := []revision{{main: 1}, {main: 2}, {main: 3}, {main: 4}, {main: 5}, {main: 6}} @@ -184,7 +184,7 @@ func TestIndexRangeEvents(t *testing.T) { }, } for i, tt := range tests { - revs := index.RangeEvents(tt.key, tt.end, atRev) + revs := index.RangeSince(tt.key, tt.end, atRev) if !reflect.DeepEqual(revs, tt.wrevs) { t.Errorf("#%d: revs = %+v, want %+v", i, revs, tt.wrevs) } diff --git a/storage/kvstore.go b/storage/kvstore.go index f5a9f6e28..042e7b80e 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -32,6 +32,13 @@ var ( keyBucketName = []byte("key") metaBucketName = []byte("meta") + // markedRevBytesLen is the byte length of marked revision. + // The first `revBytesLen` bytes represents a normal revision. The last + // one byte is the mark. + markedRevBytesLen = revBytesLen + 1 + markBytePosition = markedRevBytesLen - 1 + markTombstone byte = 't' + scheduledCompactKeyName = []byte("scheduledCompactRev") finishedCompactKeyName = []byte("finishedCompactRev") @@ -193,7 +200,7 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err return n, rev, nil } -// RangeEvents gets the events from key to end starting from startRev. +// RangeHistory ranges the history from key to end starting from startRev. // If `end` is nil, the request only observes the events on key. // If `end` is not nil, it observes the events on key range [key, range_end). // Limit limits the number of events returned. @@ -202,28 +209,29 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err // If the required start rev is compacted, ErrCompacted will be returned. // If the required start rev has not happened, ErrFutureRev will be returned. // -// RangeEvents returns events that satisfy the requirement (0 <= n <= limit). -// If events in the revision range have not all happened, it returns immeidately +// RangeHistory returns revision bytes slice and key-values that satisfy the requirement (0 <= n <= limit). +// If history in the revision range has not all happened, it returns immeidately // what is available. // It also returns nextRev which indicates the start revision used for the following // RangeEvents call. The nextRev could be smaller than the given endRev if the store // has not progressed so far or it hits the event limit. // -// TODO: return byte slices instead of events to avoid meaningless encode and decode. -func (s *store) RangeEvents(key, end []byte, limit, startRev int64) (evs []storagepb.Event, nextRev int64, err error) { +// TODO: return byte slices instead of keyValues to avoid meaningless encode and decode. +// This also helps to return raw (key, val) pair directly to make API consistent. +func (s *store) RangeHistory(key, end []byte, limit, startRev int64) (revbs [][]byte, kvs []storagepb.KeyValue, nextRev int64, err error) { s.mu.Lock() defer s.mu.Unlock() if startRev > 0 && startRev <= s.compactMainRev { - return nil, 0, ErrCompacted + return nil, nil, 0, ErrCompacted } if startRev > s.currentRev.main { - return nil, 0, ErrFutureRev + return nil, nil, 0, ErrFutureRev } - revs := s.kvindex.RangeEvents(key, end, startRev) + revs := s.kvindex.RangeSince(key, end, startRev) if len(revs) == 0 { - return nil, s.currentRev.main + 1, nil + return nil, nil, s.currentRev.main + 1, nil } tx := s.b.BatchTx() @@ -231,24 +239,24 @@ func (s *store) RangeEvents(key, end []byte, limit, startRev int64) (evs []stora defer tx.Unlock() // fetch events from the backend using revisions for _, rev := range revs { - revbytes := newRevBytes() - revToBytes(rev, revbytes) + start, end := revBytesRange(rev) - _, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) + ks, vs := tx.UnsafeRange(keyBucketName, start, end, 0) if len(vs) != 1 { log.Fatalf("storage: range cannot find rev (%d,%d)", rev.main, rev.sub) } - e := storagepb.Event{} - if err := e.Unmarshal(vs[0]); err != nil { + var kv storagepb.KeyValue + if err := kv.Unmarshal(vs[0]); err != nil { log.Fatalf("storage: cannot unmarshal event: %v", err) } - evs = append(evs, e) - if limit > 0 && len(evs) >= int(limit) { - return evs, rev.main + 1, nil + revbs = append(revbs, ks[0]) + kvs = append(kvs, kv) + if limit > 0 && len(kvs) >= int(limit) { + return revbs, kvs, rev.main + 1, nil } } - return evs, s.currentRev.main + 1, nil + return revbs, kvs, s.currentRev.main + 1, nil } func (s *store) Compact(rev int64) error { @@ -316,21 +324,19 @@ func (s *store) Restore() error { // TODO: limit N to reduce max memory usage keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0) for i, key := range keys { - e := &storagepb.Event{} - if err := e.Unmarshal(vals[i]); err != nil { + var kv storagepb.KeyValue + if err := kv.Unmarshal(vals[i]); err != nil { log.Fatalf("storage: cannot unmarshal event: %v", err) } - rev := bytesToRev(key) + rev := bytesToRev(key[:revBytesLen]) // restore index - switch e.Type { - case storagepb.PUT: - s.kvindex.Restore(e.Kv.Key, revision{e.Kv.CreateRevision, 0}, rev, e.Kv.Version) - case storagepb.DELETE: - s.kvindex.Tombstone(e.Kv.Key, rev) + switch { + case isTombstone(key): + s.kvindex.Tombstone(kv.Key, rev) default: - log.Panicf("storage: unexpected event type %s", e.Type) + s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version) } // update revision @@ -392,19 +398,18 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage } for _, revpair := range revpairs { - revbytes := newRevBytes() - revToBytes(revpair, revbytes) + start, end := revBytesRange(revpair) - _, vs := s.tx.UnsafeRange(keyBucketName, revbytes, nil, 0) + _, vs := s.tx.UnsafeRange(keyBucketName, start, end, 0) if len(vs) != 1 { log.Fatalf("storage: range cannot find rev (%d,%d)", revpair.main, revpair.sub) } - e := &storagepb.Event{} - if err := e.Unmarshal(vs[0]); err != nil { + var kv storagepb.KeyValue + if err := kv.Unmarshal(vs[0]); err != nil { log.Fatalf("storage: cannot unmarshal event: %v", err) } - kvs = append(kvs, *e.Kv) + kvs = append(kvs, kv) if limit > 0 && len(kvs) >= int(limit) { break } @@ -426,18 +431,15 @@ func (s *store) put(key, value []byte) { revToBytes(revision{main: rev, sub: s.currentRev.sub}, ibytes) ver = ver + 1 - event := storagepb.Event{ - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{ - Key: key, - Value: value, - CreateRevision: c, - ModRevision: rev, - Version: ver, - }, + kv := storagepb.KeyValue{ + Key: key, + Value: value, + CreateRevision: c, + ModRevision: rev, + Version: ver, } - d, err := event.Marshal() + d, err := kv.Marshal() if err != nil { log.Fatalf("storage: cannot marshal event: %v", err) } @@ -469,15 +471,13 @@ func (s *store) delete(key []byte) { ibytes := newRevBytes() revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes) + ibytes = appendMarkTombstone(ibytes) - event := storagepb.Event{ - Type: storagepb.DELETE, - Kv: &storagepb.KeyValue{ - Key: key, - }, + kv := storagepb.KeyValue{ + Key: key, } - d, err := event.Marshal() + d, err := kv.Marshal() if err != nil { log.Fatalf("storage: cannot marshal event: %v", err) } @@ -489,3 +489,29 @@ func (s *store) delete(key []byte) { } s.currentRev.sub += 1 } + +// appendMarkTombstone appends tombstone mark to normal revision bytes. +func appendMarkTombstone(b []byte) []byte { + if len(b) != revBytesLen { + log.Panicf("cannot append mark to non normal revision bytes") + } + return append(b, markTombstone) +} + +// isTombstone checks whether the revision bytes is a tombstone. +func isTombstone(b []byte) bool { + return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone +} + +// revBytesRange returns the range of revision bytes at +// the given revision. +func revBytesRange(rev revision) (start, end []byte) { + start = newRevBytes() + revToBytes(rev, start) + + end = newRevBytes() + endRev := revision{main: rev.main, sub: rev.sub + 1} + revToBytes(endRev, end) + + return start, end +} diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index eab2db980..0fa253197 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -46,22 +46,21 @@ func TestStorePut(t *testing.T) { r indexGetResp wrev revision - wev storagepb.Event + wkey []byte + wkv storagepb.KeyValue wputrev revision }{ { revision{1, 0}, indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound}, revision{1, 1}, - storagepb.Event{ - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{ - Key: []byte("foo"), - Value: []byte("bar"), - CreateRevision: 2, - ModRevision: 2, - Version: 1, - }, + newTestKeyBytes(revision{2, 0}, false), + storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateRevision: 2, + ModRevision: 2, + Version: 1, }, revision{2, 0}, }, @@ -69,15 +68,13 @@ func TestStorePut(t *testing.T) { revision{1, 1}, indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil}, revision{1, 2}, - storagepb.Event{ - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{ - Key: []byte("foo"), - Value: []byte("bar"), - CreateRevision: 2, - ModRevision: 2, - Version: 2, - }, + newTestKeyBytes(revision{2, 1}, false), + storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateRevision: 2, + ModRevision: 2, + Version: 2, }, revision{2, 1}, }, @@ -85,15 +82,13 @@ func TestStorePut(t *testing.T) { revision{2, 0}, indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil}, revision{2, 1}, - storagepb.Event{ - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{ - Key: []byte("foo"), - Value: []byte("bar"), - CreateRevision: 2, - ModRevision: 3, - Version: 3, - }, + newTestKeyBytes(revision{3, 0}, false), + storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateRevision: 2, + ModRevision: 3, + Version: 3, }, revision{3, 0}, }, @@ -106,12 +101,12 @@ func TestStorePut(t *testing.T) { s.put([]byte("foo"), []byte("bar")) - data, err := tt.wev.Marshal() + data, err := tt.wkv.Marshal() if err != nil { t.Errorf("#%d: marshal err = %v, want nil", i, err) } wact := []testutil.Action{ - {"put", []interface{}{keyBucketName, newTestBytes(tt.wputrev), data}}, + {"put", []interface{}{keyBucketName, tt.wkey, data}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) @@ -130,17 +125,15 @@ func TestStorePut(t *testing.T) { } func TestStoreRange(t *testing.T) { - ev := storagepb.Event{ - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{ - Key: []byte("foo"), - Value: []byte("bar"), - CreateRevision: 1, - ModRevision: 2, - Version: 1, - }, + key := newTestKeyBytes(revision{2, 0}, false) + kv := storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateRevision: 1, + ModRevision: 2, + Version: 1, } - evb, err := ev.Marshal() + kvb, err := kv.Marshal() if err != nil { t.Fatal(err) } @@ -153,11 +146,11 @@ func TestStoreRange(t *testing.T) { }{ { indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}}, - rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}}, + rangeResp{[][]byte{key}, [][]byte{kvb}}, }, { indexRangeResp{[][]byte{[]byte("foo"), []byte("foo1")}, []revision{{2, 0}, {3, 0}}}, - rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}}, + rangeResp{[][]byte{key}, [][]byte{kvb}}, }, } for i, tt := range tests { @@ -171,15 +164,16 @@ func TestStoreRange(t *testing.T) { if err != nil { t.Errorf("#%d: err = %v, want nil", i, err) } - if w := []storagepb.KeyValue{*ev.Kv}; !reflect.DeepEqual(kvs, w) { + if w := []storagepb.KeyValue{kv}; !reflect.DeepEqual(kvs, w) { t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, w) } if rev != wrev { t.Errorf("#%d: rev = %d, want %d", i, rev, wrev) } + wstart, wend := revBytesRange(tt.idxr.revs[0]) wact := []testutil.Action{ - {"range", []interface{}{keyBucketName, newTestBytes(tt.idxr.revs[0]), []byte(nil), int64(0)}}, + {"range", []interface{}{keyBucketName, wstart, wend, int64(0)}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) @@ -201,6 +195,7 @@ func TestStoreDeleteRange(t *testing.T) { rev revision r indexRangeResp + wkey []byte wrev revision wrrev int64 wdelrev revision @@ -208,6 +203,7 @@ func TestStoreDeleteRange(t *testing.T) { { revision{2, 0}, indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}}, + newTestKeyBytes(revision{3, 0}, true), revision{2, 1}, 2, revision{3, 0}, @@ -215,6 +211,7 @@ func TestStoreDeleteRange(t *testing.T) { { revision{2, 1}, indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}}, + newTestKeyBytes(revision{3, 1}, true), revision{2, 2}, 3, revision{3, 1}, @@ -231,17 +228,14 @@ func TestStoreDeleteRange(t *testing.T) { t.Errorf("#%d: n = %d, want 1", i, n) } - data, err := (&storagepb.Event{ - Type: storagepb.DELETE, - Kv: &storagepb.KeyValue{ - Key: []byte("foo"), - }, + data, err := (&storagepb.KeyValue{ + Key: []byte("foo"), }).Marshal() if err != nil { t.Errorf("#%d: marshal err = %v, want nil", i, err) } wact := []testutil.Action{ - {"put", []interface{}{keyBucketName, newTestBytes(tt.wdelrev), data}}, + {"put", []interface{}{keyBucketName, tt.wkey, data}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) @@ -259,18 +253,16 @@ func TestStoreDeleteRange(t *testing.T) { } } -func TestStoreRangeEvents(t *testing.T) { - ev := storagepb.Event{ - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{ - Key: []byte("foo"), - Value: []byte("bar"), - CreateRevision: 1, - ModRevision: 2, - Version: 1, - }, +func TestStoreRangeHistory(t *testing.T) { + key := newTestKeyBytes(revision{2, 0}, false) + kv := storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateRevision: 1, + ModRevision: 2, + Version: 1, } - evb, err := ev.Marshal() + kvb, err := kv.Marshal() if err != nil { t.Fatal(err) } @@ -282,11 +274,11 @@ func TestStoreRangeEvents(t *testing.T) { }{ { indexRangeEventsResp{[]revision{{2, 0}}}, - rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}}, + rangeResp{[][]byte{key}, [][]byte{kvb}}, }, { indexRangeEventsResp{[]revision{{2, 0}, {3, 0}}}, - rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}}, + rangeResp{[][]byte{key}, [][]byte{kvb}}, }, } for i, tt := range tests { @@ -295,12 +287,15 @@ func TestStoreRangeEvents(t *testing.T) { index.indexRangeEventsRespc <- tt.idxr b.tx.rangeRespc <- tt.r - evs, _, err := s.RangeEvents([]byte("foo"), []byte("goo"), 1, 1) + keys, kvs, _, err := s.RangeHistory([]byte("foo"), []byte("goo"), 1, 1) if err != nil { t.Errorf("#%d: err = %v, want nil", i, err) } - if w := []storagepb.Event{ev}; !reflect.DeepEqual(evs, w) { - t.Errorf("#%d: evs = %+v, want %+v", i, evs, w) + if w := [][]byte{key}; !reflect.DeepEqual(keys, w) { + t.Errorf("#%d: keys = %+v, want %+v", i, keys, w) + } + if w := []storagepb.KeyValue{kv}; !reflect.DeepEqual(kvs, w) { + t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, w) } wact := []testutil.Action{ @@ -309,8 +304,9 @@ func TestStoreRangeEvents(t *testing.T) { if g := index.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) } + wstart, wend := revBytesRange(tt.idxr.revs[0]) wact = []testutil.Action{ - {"range", []interface{}{keyBucketName, newTestBytes(tt.idxr.revs[0]), []byte(nil), int64(0)}}, + {"range", []interface{}{keyBucketName, wstart, wend, int64(0)}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) @@ -325,7 +321,9 @@ func TestStoreCompact(t *testing.T) { s, b, index := newFakeStore() s.currentRev = revision{3, 0} index.indexCompactRespc <- map[revision]struct{}{revision{1, 0}: {}} - b.tx.rangeRespc <- rangeResp{[][]byte{newTestBytes(revision{1, 0}), newTestBytes(revision{2, 0})}, nil} + key1 := newTestKeyBytes(revision{1, 0}, false) + key2 := newTestKeyBytes(revision{2, 0}, false) + b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, nil} s.Compact(3) s.wg.Wait() @@ -336,10 +334,10 @@ func TestStoreCompact(t *testing.T) { end := make([]byte, 8) binary.BigEndian.PutUint64(end, uint64(4)) wact := []testutil.Action{ - {"put", []interface{}{metaBucketName, scheduledCompactKeyName, newTestBytes(revision{3, 0})}}, + {"put", []interface{}{metaBucketName, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}}, {"range", []interface{}{keyBucketName, make([]byte, 17), end, int64(10000)}}, - {"delete", []interface{}{keyBucketName, newTestBytes(revision{2, 0})}}, - {"put", []interface{}{metaBucketName, finishedCompactKeyName, newTestBytes(revision{3, 0})}}, + {"delete", []interface{}{keyBucketName, key2}}, + {"put", []interface{}{metaBucketName, finishedCompactKeyName, newTestRevBytes(revision{3, 0})}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("tx actions = %+v, want %+v", g, wact) @@ -355,33 +353,29 @@ func TestStoreCompact(t *testing.T) { func TestStoreRestore(t *testing.T) { s, b, index := newFakeStore() - putev := storagepb.Event{ - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{ - Key: []byte("foo"), - Value: []byte("bar"), - CreateRevision: 3, - ModRevision: 3, - Version: 1, - }, + putkey := newTestKeyBytes(revision{3, 0}, false) + putkv := storagepb.KeyValue{ + Key: []byte("foo"), + Value: []byte("bar"), + CreateRevision: 3, + ModRevision: 3, + Version: 1, } - putevb, err := putev.Marshal() + putkvb, err := putkv.Marshal() if err != nil { t.Fatal(err) } - delev := storagepb.Event{ - Type: storagepb.DELETE, - Kv: &storagepb.KeyValue{ - Key: []byte("foo"), - }, + delkey := newTestKeyBytes(revision{4, 0}, true) + delkv := storagepb.KeyValue{ + Key: []byte("foo"), } - delevb, err := delev.Marshal() + delkvb, err := delkv.Marshal() if err != nil { t.Fatal(err) } - b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestBytes(revision{2, 0})}} - b.tx.rangeRespc <- rangeResp{[][]byte{newTestBytes(revision{3, 0}), newTestBytes(revision{4, 0})}, [][]byte{putevb, delevb}} - b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestBytes(revision{2, 0})}} + b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestRevBytes(revision{2, 0})}} + b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}} + b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{2, 0})}} s.Restore() @@ -394,7 +388,7 @@ func TestStoreRestore(t *testing.T) { } wact := []testutil.Action{ {"range", []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}}, - {"range", []interface{}{keyBucketName, newTestBytes(revision{}), newTestBytes(revision{math.MaxInt64, math.MaxInt64}), int64(0)}}, + {"range", []interface{}{keyBucketName, newTestRevBytes(revision{}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(0)}}, {"range", []interface{}{metaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { @@ -410,79 +404,79 @@ func TestStoreRestore(t *testing.T) { } // tests end parameter works well -func TestStoreRangeEventsEnd(t *testing.T) { +func TestStoreRangeHistoryEnd(t *testing.T) { s := newStore(tmpPath) defer cleanup(s, tmpPath) s.Put([]byte("foo"), []byte("bar")) s.Put([]byte("foo1"), []byte("bar1")) s.Put([]byte("foo2"), []byte("bar2")) - evs := []storagepb.Event{ - { - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, - }, - { - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1}, - }, - { - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1}, - }, + keys := [][]byte{ + newTestKeyBytes(revision{1, 0}, false), + newTestKeyBytes(revision{2, 0}, false), + newTestKeyBytes(revision{3, 0}, false), + } + kvs := []storagepb.KeyValue{ + {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, + {Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1}, + {Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1}, } tests := []struct { key, end []byte - wevs []storagepb.Event + wkeys [][]byte + wkvs []storagepb.KeyValue }{ // get no keys { []byte("doo"), []byte("foo"), - nil, + nil, nil, }, // get no keys when key == end { []byte("foo"), []byte("foo"), - nil, + nil, nil, }, // get no keys when ranging single key { []byte("doo"), nil, - nil, + nil, nil, }, // get all keys { []byte("foo"), []byte("foo3"), - evs, + keys, kvs, }, // get partial keys { []byte("foo"), []byte("foo1"), - evs[:1], + keys[:1], kvs[:1], }, // get single key { []byte("foo"), nil, - evs[:1], + keys[:1], kvs[:1], }, } for i, tt := range tests { - evs, rev, err := s.RangeEvents(tt.key, tt.end, 0, 1) + keys, kvs, rev, err := s.RangeHistory(tt.key, tt.end, 0, 1) if err != nil { t.Fatal(err) } if rev != 4 { t.Errorf("#%d: rev = %d, want %d", i, rev, 4) } - if !reflect.DeepEqual(evs, tt.wevs) { - t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs) + if !reflect.DeepEqual(keys, tt.wkeys) { + t.Errorf("#%d: actions = %+v, want %+v", i, keys, tt.wkeys) + } + if !reflect.DeepEqual(kvs, tt.wkvs) { + t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs) } } } -func TestStoreRangeEventsRev(t *testing.T) { +func TestStoreRangeHistoryRev(t *testing.T) { s := newStore(tmpPath) defer cleanup(s, tmpPath) @@ -490,39 +484,39 @@ func TestStoreRangeEventsRev(t *testing.T) { s.DeleteRange([]byte("foo"), nil) s.Put([]byte("foo"), []byte("bar")) s.Put([]byte("unrelated"), []byte("unrelated")) - evs := []storagepb.Event{ - { - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, - }, - { - Type: storagepb.DELETE, - Kv: &storagepb.KeyValue{Key: []byte("foo")}, - }, - { - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1}, - }, + keys := [][]byte{ + newTestKeyBytes(revision{1, 0}, false), + newTestKeyBytes(revision{2, 0}, true), + newTestKeyBytes(revision{3, 0}, false), + } + kvs := []storagepb.KeyValue{ + {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, + {Key: []byte("foo")}, + {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1}, } tests := []struct { start int64 - wevs []storagepb.Event + wkeys [][]byte + wkvs []storagepb.KeyValue wnext int64 }{ - {0, evs, 5}, - {1, evs, 5}, - {3, evs[2:], 5}, + {0, keys, kvs, 5}, + {1, keys, kvs, 5}, + {3, keys[2:], kvs[2:], 5}, } for i, tt := range tests { - evs, next, err := s.RangeEvents([]byte("foo"), nil, 0, tt.start) + keys, kvs, next, err := s.RangeHistory([]byte("foo"), nil, 0, tt.start) if err != nil { t.Fatal(err) } - if !reflect.DeepEqual(evs, tt.wevs) { - t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs) + if !reflect.DeepEqual(keys, tt.wkeys) { + t.Errorf("#%d: acts = %+v, want %+v", i, keys, tt.wkeys) + } + if !reflect.DeepEqual(kvs, tt.wkvs) { + t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs) } if next != tt.wnext { t.Errorf("#%d: next = %d, want %d", i, next, tt.wnext) @@ -530,7 +524,7 @@ func TestStoreRangeEventsRev(t *testing.T) { } } -func TestStoreRangeEventsBad(t *testing.T) { +func TestStoreRangeHistoryBad(t *testing.T) { s := newStore(tmpPath) defer cleanup(s, tmpPath) @@ -552,55 +546,55 @@ func TestStoreRangeEventsBad(t *testing.T) { {10, ErrFutureRev}, } for i, tt := range tests { - _, _, err := s.RangeEvents([]byte("foo"), nil, 0, tt.rev) + _, _, _, err := s.RangeHistory([]byte("foo"), nil, 0, tt.rev) if err != tt.werr { t.Errorf("#%d: error = %v, want %v", i, err, tt.werr) } } } -func TestStoreRangeEventsLimit(t *testing.T) { +func TestStoreRangeHistoryLimit(t *testing.T) { s := newStore(tmpPath) defer cleanup(s, tmpPath) s.Put([]byte("foo"), []byte("bar")) s.DeleteRange([]byte("foo"), nil) s.Put([]byte("foo"), []byte("bar")) - evs := []storagepb.Event{ - { - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, - }, - { - Type: storagepb.DELETE, - Kv: &storagepb.KeyValue{Key: []byte("foo")}, - }, - { - Type: storagepb.PUT, - Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1}, - }, + keys := [][]byte{ + newTestKeyBytes(revision{1, 0}, false), + newTestKeyBytes(revision{2, 0}, true), + newTestKeyBytes(revision{3, 0}, false), + } + kvs := []storagepb.KeyValue{ + {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, + {Key: []byte("foo")}, + {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1}, } tests := []struct { limit int64 - wevs []storagepb.Event + wkeys [][]byte + wkvs []storagepb.KeyValue }{ // no limit - {-1, evs}, + {-1, keys, kvs}, // no limit - {0, evs}, - {1, evs[:1]}, - {2, evs[:2]}, - {3, evs}, - {100, evs}, + {0, keys, kvs}, + {1, keys[:1], kvs[:1]}, + {2, keys[:2], kvs[:2]}, + {3, keys, kvs}, + {100, keys, kvs}, } for i, tt := range tests { - evs, _, err := s.RangeEvents([]byte("foo"), nil, tt.limit, 1) + keys, kvs, _, err := s.RangeHistory([]byte("foo"), nil, tt.limit, 1) if err != nil { t.Fatalf("#%d: range error (%v)", i, err) } - if !reflect.DeepEqual(evs, tt.wevs) { - t.Errorf("#%d: evs = %+v, want %+v", i, evs, tt.wevs) + if !reflect.DeepEqual(keys, tt.wkeys) { + t.Errorf("#%d: acts = %+v, want %+v", i, keys, tt.wkeys) + } + if !reflect.DeepEqual(kvs, tt.wkvs) { + t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs) } } } @@ -688,12 +682,21 @@ func BenchmarkStorePut(b *testing.B) { } } -func newTestBytes(rev revision) []byte { +func newTestRevBytes(rev revision) []byte { bytes := newRevBytes() revToBytes(rev, bytes) return bytes } +func newTestKeyBytes(rev revision, tombstone bool) []byte { + bytes := newRevBytes() + revToBytes(rev, bytes) + if tombstone { + bytes = appendMarkTombstone(bytes) + } + return bytes +} + func newFakeStore() (*store, *fakeBackend, *fakeIndex) { b := &fakeBackend{&fakeBatchTx{rangeRespc: make(chan rangeResp, 5)}} index := &fakeIndex{ @@ -792,7 +795,7 @@ func (i *fakeIndex) Tombstone(key []byte, rev revision) error { i.Recorder.Record(testutil.Action{Name: "tombstone", Params: []interface{}{key, rev}}) return nil } -func (i *fakeIndex) RangeEvents(key, end []byte, rev int64) []revision { +func (i *fakeIndex) RangeSince(key, end []byte, rev int64) []revision { i.Recorder.Record(testutil.Action{Name: "rangeEvents", Params: []interface{}{key, end, rev}}) r := <-i.indexRangeEventsRespc return r.revs diff --git a/storage/revision.go b/storage/revision.go index d5cf173a2..47eb6ee0a 100644 --- a/storage/revision.go +++ b/storage/revision.go @@ -16,6 +16,11 @@ package storage import "encoding/binary" +// revBytesLen is the byte length of a normal revision. +// First 8 bytes is the revision.main in big-endian format. The 9th byte +// is a '_'. The last 8 bytes is the revision.sub in big-endian format. +const revBytesLen = 8 + 1 + 8 + type revision struct { main int64 sub int64 @@ -32,7 +37,7 @@ func (a revision) GreaterThan(b revision) bool { } func newRevBytes() []byte { - return make([]byte, 8+1+8) + return make([]byte, revBytesLen) } func revToBytes(rev revision, bytes []byte) { diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 5b7f673c7..96825fbc4 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -247,7 +247,7 @@ func (s *watchableStore) syncWatchings() { if limit == 0 { continue } - evs, nextRev, err := s.store.RangeEvents(w.key, end, int64(limit), w.cur) + revbs, kvs, nextRev, err := s.store.RangeHistory(w.key, end, int64(limit), w.cur) if err != nil { // TODO: send error event to watching delete(s.unsynced, w) @@ -255,8 +255,19 @@ func (s *watchableStore) syncWatchings() { } // push events to the channel - for _, ev := range evs { - w.ch <- ev + for i, kv := range kvs { + var evt storagepb.Event_EventType + switch { + case isTombstone(revbs[i]): + evt = storagepb.DELETE + default: + evt = storagepb.PUT + } + + w.ch <- storagepb.Event{ + Type: evt, + Kv: &kv, + } pendingEventsGauge.Inc() } // switch to tracking future events if needed From 1214f775197e7be7a2b7b337dc4d6f54ffd3deb8 Mon Sep 17 00:00:00 2001 From: Yicheng Qin <qycqycqycqycqyc@gmail.com> Date: Thu, 19 Nov 2015 09:53:31 -0800 Subject: [PATCH 2/2] storage: set revBytes capacity to avoid malloc when appending mark This is a performance optimization. --- storage/revision.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/revision.go b/storage/revision.go index 47eb6ee0a..d2b0458d5 100644 --- a/storage/revision.go +++ b/storage/revision.go @@ -37,7 +37,7 @@ func (a revision) GreaterThan(b revision) bool { } func newRevBytes() []byte { - return make([]byte, revBytesLen) + return make([]byte, revBytesLen, markedRevBytesLen) } func revToBytes(rev revision, bytes []byte) {