From ecc3e15a461e23e0fc258179808417d3e0b40ac0 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Mon, 28 Dec 2015 11:12:24 -0800 Subject: [PATCH] storage: delete RangeHistory This has been replaced by operations inside `syncWatchings`. --- storage/kvstore.go | 59 --------- storage/kvstore_test.go | 263 ---------------------------------------- 2 files changed, 322 deletions(-) diff --git a/storage/kvstore.go b/storage/kvstore.go index 6dc11a4cc..4761b938e 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -189,65 +189,6 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err return n, rev, nil } -// RangeHistory ranges the history from key to end starting from startRev. -// If `end` is nil, the request only observes the events on key. -// If `end` is not nil, it observes the events on key range [key, range_end). -// Limit limits the number of events returned. -// If startRev <=0, rangeEvents returns events from the beginning of uncompacted history. -// -// If the required start rev is compacted, ErrCompacted will be returned. -// If the required start rev has not happened, ErrFutureRev will be returned. -// -// RangeHistory returns revision bytes slice and key-values that satisfy the requirement (0 <= n <= limit). -// If history in the revision range has not all happened, it returns immeidately -// what is available. -// It also returns nextRev which indicates the start revision used for the following -// RangeEvents call. The nextRev could be smaller than the given endRev if the store -// has not progressed so far or it hits the event limit. -// -// TODO: return byte slices instead of keyValues to avoid meaningless encode and decode. -// This also helps to return raw (key, val) pair directly to make API consistent. -func (s *store) RangeHistory(key, end []byte, limit, startRev int64) (revbs [][]byte, kvs []storagepb.KeyValue, nextRev int64, err error) { - s.mu.Lock() - defer s.mu.Unlock() - - if startRev > 0 && startRev <= s.compactMainRev { - return nil, nil, 0, ErrCompacted - } - if startRev > s.currentRev.main { - return nil, nil, 0, ErrFutureRev - } - - revs := s.kvindex.RangeSince(key, end, startRev) - if len(revs) == 0 { - return nil, nil, s.currentRev.main + 1, nil - } - - tx := s.b.BatchTx() - tx.Lock() - defer tx.Unlock() - // fetch events from the backend using revisions - for _, rev := range revs { - start, end := revBytesRange(rev) - - ks, vs := tx.UnsafeRange(keyBucketName, start, end, 0) - if len(vs) != 1 { - log.Fatalf("storage: range cannot find rev (%d,%d)", rev.main, rev.sub) - } - - var kv storagepb.KeyValue - if err := kv.Unmarshal(vs[0]); err != nil { - log.Fatalf("storage: cannot unmarshal event: %v", err) - } - revbs = append(revbs, ks[0]) - kvs = append(kvs, kv) - if limit > 0 && len(kvs) >= int(limit) { - return revbs, kvs, rev.main + 1, nil - } - } - return revbs, kvs, s.currentRev.main + 1, nil -} - func (s *store) Compact(rev int64) error { s.mu.Lock() defer s.mu.Unlock() diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index c179147fc..ccc0376dd 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -263,73 +263,6 @@ func TestStoreDeleteRange(t *testing.T) { } } -func TestStoreRangeHistory(t *testing.T) { - key := newTestKeyBytes(revision{2, 0}, false) - kv := storagepb.KeyValue{ - Key: []byte("foo"), - Value: []byte("bar"), - CreateRevision: 1, - ModRevision: 2, - Version: 1, - } - kvb, err := kv.Marshal() - if err != nil { - t.Fatal(err) - } - currev := revision{2, 0} - - tests := []struct { - idxr indexRangeEventsResp - r rangeResp - }{ - { - indexRangeEventsResp{[]revision{{2, 0}}}, - rangeResp{[][]byte{key}, [][]byte{kvb}}, - }, - { - indexRangeEventsResp{[]revision{{2, 0}, {3, 0}}}, - rangeResp{[][]byte{key}, [][]byte{kvb}}, - }, - } - for i, tt := range tests { - s := newFakeStore() - b := s.b.(*fakeBackend) - fi := s.kvindex.(*fakeIndex) - - s.currentRev = currev - fi.indexRangeEventsRespc <- tt.idxr - b.tx.rangeRespc <- tt.r - - keys, kvs, _, err := s.RangeHistory([]byte("foo"), []byte("goo"), 1, 1) - if err != nil { - t.Errorf("#%d: err = %v, want nil", i, err) - } - if w := [][]byte{key}; !reflect.DeepEqual(keys, w) { - t.Errorf("#%d: keys = %+v, want %+v", i, keys, w) - } - if w := []storagepb.KeyValue{kv}; !reflect.DeepEqual(kvs, w) { - t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, w) - } - - wact := []testutil.Action{ - {"rangeEvents", []interface{}{[]byte("foo"), []byte("goo"), int64(1)}}, - } - if g := fi.Action(); !reflect.DeepEqual(g, wact) { - t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) - } - wstart, wend := revBytesRange(tt.idxr.revs[0]) - wact = []testutil.Action{ - {"range", []interface{}{keyBucketName, wstart, wend, int64(0)}}, - } - if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { - t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) - } - if s.currentRev != currev { - t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, currev) - } - } -} - func TestStoreCompact(t *testing.T) { s := newFakeStore() b := s.b.(*fakeBackend) @@ -421,202 +354,6 @@ func TestStoreRestore(t *testing.T) { } } -// tests end parameter works well -func TestStoreRangeHistoryEnd(t *testing.T) { - s := newStore(tmpPath) - defer cleanup(s, tmpPath) - - s.Put([]byte("foo"), []byte("bar")) - s.Put([]byte("foo1"), []byte("bar1")) - s.Put([]byte("foo2"), []byte("bar2")) - keys := [][]byte{ - newTestKeyBytes(revision{1, 0}, false), - newTestKeyBytes(revision{2, 0}, false), - newTestKeyBytes(revision{3, 0}, false), - } - kvs := []storagepb.KeyValue{ - {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, - {Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1}, - {Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1}, - } - - tests := []struct { - key, end []byte - wkeys [][]byte - wkvs []storagepb.KeyValue - }{ - // get no keys - { - []byte("doo"), []byte("foo"), - nil, nil, - }, - // get no keys when key == end - { - []byte("foo"), []byte("foo"), - nil, nil, - }, - // get no keys when ranging single key - { - []byte("doo"), nil, - nil, nil, - }, - // get all keys - { - []byte("foo"), []byte("foo3"), - keys, kvs, - }, - // get partial keys - { - []byte("foo"), []byte("foo1"), - keys[:1], kvs[:1], - }, - // get single key - { - []byte("foo"), nil, - keys[:1], kvs[:1], - }, - } - - for i, tt := range tests { - keys, kvs, rev, err := s.RangeHistory(tt.key, tt.end, 0, 1) - if err != nil { - t.Fatal(err) - } - if rev != 4 { - t.Errorf("#%d: rev = %d, want %d", i, rev, 4) - } - if !reflect.DeepEqual(keys, tt.wkeys) { - t.Errorf("#%d: actions = %+v, want %+v", i, keys, tt.wkeys) - } - if !reflect.DeepEqual(kvs, tt.wkvs) { - t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs) - } - } -} - -func TestStoreRangeHistoryRev(t *testing.T) { - s := newStore(tmpPath) - defer cleanup(s, tmpPath) - - s.Put([]byte("foo"), []byte("bar")) - s.DeleteRange([]byte("foo"), nil) - s.Put([]byte("foo"), []byte("bar")) - s.Put([]byte("unrelated"), []byte("unrelated")) - keys := [][]byte{ - newTestKeyBytes(revision{1, 0}, false), - newTestKeyBytes(revision{2, 0}, true), - newTestKeyBytes(revision{3, 0}, false), - } - kvs := []storagepb.KeyValue{ - {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, - {Key: []byte("foo")}, - {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1}, - } - - tests := []struct { - start int64 - - wkeys [][]byte - wkvs []storagepb.KeyValue - wnext int64 - }{ - {0, keys, kvs, 5}, - {1, keys, kvs, 5}, - {3, keys[2:], kvs[2:], 5}, - } - - for i, tt := range tests { - keys, kvs, next, err := s.RangeHistory([]byte("foo"), nil, 0, tt.start) - if err != nil { - t.Fatal(err) - } - if !reflect.DeepEqual(keys, tt.wkeys) { - t.Errorf("#%d: acts = %+v, want %+v", i, keys, tt.wkeys) - } - if !reflect.DeepEqual(kvs, tt.wkvs) { - t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs) - } - if next != tt.wnext { - t.Errorf("#%d: next = %d, want %d", i, next, tt.wnext) - } - } -} - -func TestStoreRangeHistoryBad(t *testing.T) { - s := newStore(tmpPath) - defer cleanup(s, tmpPath) - - s.Put([]byte("foo"), []byte("bar")) - s.Put([]byte("foo"), []byte("bar1")) - s.Put([]byte("foo"), []byte("bar2")) - if err := s.Compact(3); err != nil { - t.Fatalf("compact error (%v)", err) - } - - tests := []struct { - rev int64 - werr error - }{ - {1, ErrCompacted}, - {2, ErrCompacted}, - {3, ErrCompacted}, - {4, ErrFutureRev}, - {10, ErrFutureRev}, - } - for i, tt := range tests { - _, _, _, err := s.RangeHistory([]byte("foo"), nil, 0, tt.rev) - if err != tt.werr { - t.Errorf("#%d: error = %v, want %v", i, err, tt.werr) - } - } -} - -func TestStoreRangeHistoryLimit(t *testing.T) { - s := newStore(tmpPath) - defer cleanup(s, tmpPath) - - s.Put([]byte("foo"), []byte("bar")) - s.DeleteRange([]byte("foo"), nil) - s.Put([]byte("foo"), []byte("bar")) - keys := [][]byte{ - newTestKeyBytes(revision{1, 0}, false), - newTestKeyBytes(revision{2, 0}, true), - newTestKeyBytes(revision{3, 0}, false), - } - kvs := []storagepb.KeyValue{ - {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, - {Key: []byte("foo")}, - {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1}, - } - - tests := []struct { - limit int64 - wkeys [][]byte - wkvs []storagepb.KeyValue - }{ - // no limit - {-1, keys, kvs}, - // no limit - {0, keys, kvs}, - {1, keys[:1], kvs[:1]}, - {2, keys[:2], kvs[:2]}, - {3, keys, kvs}, - {100, keys, kvs}, - } - for i, tt := range tests { - keys, kvs, _, err := s.RangeHistory([]byte("foo"), nil, tt.limit, 1) - if err != nil { - t.Fatalf("#%d: range error (%v)", i, err) - } - if !reflect.DeepEqual(keys, tt.wkeys) { - t.Errorf("#%d: acts = %+v, want %+v", i, keys, tt.wkeys) - } - if !reflect.DeepEqual(kvs, tt.wkvs) { - t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs) - } - } -} - func TestRestoreContinueUnfinishedCompaction(t *testing.T) { s0 := newStore(tmpPath) defer os.Remove(tmpPath)