From 6556bf164358b74d7d0aed04ce8a4f01724b3f97 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 17 Oct 2015 15:22:11 -0700 Subject: [PATCH] storage: remove the endRev of watcher --- storage/kv_test.go | 17 ++-------- storage/kvstore.go | 8 ++--- storage/kvstore_test.go | 25 ++++++--------- storage/watchable_store.go | 59 +++-------------------------------- storage/watcher.go | 4 +-- storage/watcher_bench_test.go | 2 +- 6 files changed, 19 insertions(+), 96 deletions(-) diff --git a/storage/kv_test.go b/storage/kv_test.go index cfb4bb6e8..67e1bce7a 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -733,7 +733,7 @@ func TestWatchableKVWatch(t *testing.T) { s := newWatchableStore(tmpPath) defer cleanup(s, tmpPath) - wa, cancel := s.Watcher([]byte("foo"), true, 0, 0) + wa, cancel := s.Watcher([]byte("foo"), true, 0) defer cancel() s.Put([]byte("foo"), []byte("bar")) @@ -776,7 +776,7 @@ func TestWatchableKVWatch(t *testing.T) { t.Fatalf("failed to watch the event") } - wa, cancel = s.Watcher([]byte("foo1"), false, 1, 4) + wa, cancel = s.Watcher([]byte("foo1"), false, 1) defer cancel() select { @@ -817,19 +817,6 @@ func TestWatchableKVWatch(t *testing.T) { case <-time.After(time.Second): t.Fatalf("failed to watch the event") } - - select { - case ev := <-wa.Event(): - if !reflect.DeepEqual(ev, storagepb.Event{}) { - t.Errorf("watched event = %+v, want %+v", ev, storagepb.Event{}) - } - if g := wa.Err(); g != ExceedEnd { - t.Errorf("err = %+v, want %+v", g, ExceedEnd) - } - case <-time.After(time.Second): - t.Fatalf("failed to watch the event") - } - } func cleanup(s KV, path string) { diff --git a/storage/kvstore.go b/storage/kvstore.go index a57a63b1e..5dcf10d40 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -197,12 +197,11 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err return n, rev, nil } -// RangeEvents gets the events from key to end in [startRev, endRev). +// RangeEvents gets the events from key to end starting from startRev. // If `end` is nil, the request only observes the events on key. // If `end` is not nil, it observes the events on key range [key, range_end). // Limit limits the number of events returned. // If startRev <=0, rangeEvents returns events from the beginning of uncompacted history. -// If endRev <=0, it indicates there is no end revision. // // If the required start rev is compacted, ErrCompacted will be returned. // If the required start rev has not happened, ErrFutureRev will be returned. @@ -215,7 +214,7 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err // has not progressed so far or it hits the event limit. // // TODO: return byte slices instead of events to avoid meaningless encode and decode. -func (s *store) RangeEvents(key, end []byte, limit, startRev, endRev int64) (evs []storagepb.Event, nextRev int64, err error) { +func (s *store) RangeEvents(key, end []byte, limit, startRev int64) (evs []storagepb.Event, nextRev int64, err error) { s.mu.Lock() defer s.mu.Unlock() @@ -236,9 +235,6 @@ func (s *store) RangeEvents(key, end []byte, limit, startRev, endRev int64) (evs defer tx.Unlock() // fetch events from the backend using revisions for _, rev := range revs { - if endRev > 0 && rev.main >= endRev { - return evs, rev.main, nil - } revbytes := newRevBytes() revToBytes(rev, revbytes) diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index c8e4df606..8149a55e9 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -295,7 +295,7 @@ func TestStoreRangeEvents(t *testing.T) { index.indexRangeEventsRespc <- tt.idxr b.tx.rangeRespc <- tt.r - evs, _, err := s.RangeEvents([]byte("foo"), []byte("goo"), 1, 1, 4) + evs, _, err := s.RangeEvents([]byte("foo"), []byte("goo"), 1, 1) if err != nil { t.Errorf("#%d: err = %v, want nil", i, err) } @@ -469,7 +469,7 @@ func TestStoreRangeEventsEnd(t *testing.T) { } for i, tt := range tests { - evs, rev, err := s.RangeEvents(tt.key, tt.end, 0, 1, 100) + evs, rev, err := s.RangeEvents(tt.key, tt.end, 0, 1) if err != nil { t.Fatal(err) } @@ -507,24 +507,17 @@ func TestStoreRangeEventsRev(t *testing.T) { tests := []struct { start int64 - end int64 + wevs []storagepb.Event wnext int64 }{ - {1, 1, nil, 1}, - {1, 2, evs[:1], 2}, - {1, 3, evs[:2], 3}, - {1, 4, evs, 5}, - {1, 5, evs, 5}, - {1, 10, evs, 5}, - {3, 4, evs[2:], 5}, - {0, 10, evs, 5}, - {1, 0, evs, 5}, - {0, 0, evs, 5}, + {0, evs, 5}, + {1, evs, 5}, + {3, evs[2:], 5}, } for i, tt := range tests { - evs, next, err := s.RangeEvents([]byte("foo"), nil, 0, tt.start, tt.end) + evs, next, err := s.RangeEvents([]byte("foo"), nil, 0, tt.start) if err != nil { t.Fatal(err) } @@ -559,7 +552,7 @@ func TestStoreRangeEventsBad(t *testing.T) { {10, ErrFutureRev}, } for i, tt := range tests { - _, _, err := s.RangeEvents([]byte("foo"), nil, 0, tt.rev, 100) + _, _, err := s.RangeEvents([]byte("foo"), nil, 0, tt.rev) if err != tt.werr { t.Errorf("#%d: error = %v, want %v", i, err, tt.werr) } @@ -602,7 +595,7 @@ func TestStoreRangeEventsLimit(t *testing.T) { {100, evs}, } for i, tt := range tests { - evs, _, err := s.RangeEvents([]byte("foo"), nil, tt.limit, 1, 100) + evs, _, err := s.RangeEvents([]byte("foo"), nil, tt.limit, 1) if err != nil { t.Fatalf("#%d: range error (%v)", i, err) } diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 84f39f2b5..242901152 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -15,7 +15,6 @@ package storage import ( - "errors" "log" "sync" "time" @@ -23,10 +22,6 @@ import ( "github.com/coreos/etcd/storage/storagepb" ) -// ReachEnd is the error returned by Watcher.Err when watcher reaches its end revision and -// no more event is available. -var ExceedEnd = errors.New("storage: watcher reaches end revision") - type watchableStore struct { mu sync.Mutex @@ -38,10 +33,7 @@ type watchableStore struct { // contains all synced watchers that are tracking the events that will happen // The key of the map is the key that the watcher is watching on. synced map[string][]*watcher - // contains all synced watchers that have an end revision - // The key of the map is the end revision of the watcher. - endm map[int64][]*watcher - tx *ongoingTx + tx *ongoingTx stopc chan struct{} wg sync.WaitGroup @@ -51,7 +43,6 @@ func newWatchableStore(path string) *watchableStore { s := &watchableStore{ KV: newStore(path), synced: make(map[string][]*watcher), - endm: make(map[int64][]*watcher), stopc: make(chan struct{}), } s.wg.Add(1) @@ -160,17 +151,14 @@ func (s *watchableStore) Close() error { return s.KV.Close() } -func (s *watchableStore) Watcher(key []byte, prefix bool, startRev, endRev int64) (Watcher, CancelFunc) { +func (s *watchableStore) Watcher(key []byte, prefix bool, startRev int64) (Watcher, CancelFunc) { s.mu.Lock() defer s.mu.Unlock() - wa := newWatcher(key, prefix, startRev, endRev) + wa := newWatcher(key, prefix, startRev) k := string(key) if startRev == 0 { s.synced[k] = append(s.synced[k], wa) - if endRev != 0 { - s.endm[endRev] = append(s.endm[endRev], wa) - } } else { slowWatchersGauge.Inc() s.unsynced = append(s.unsynced, wa) @@ -198,13 +186,6 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev, endRev int64 watchersGauge.Dec() } } - if wa.end != 0 { - for i, w := range s.endm[wa.end] { - if w == wa { - s.endm[wa.end] = append(s.endm[wa.end][:i], s.endm[wa.end][i+1:]...) - } - } - } // If we cannot find it, it should have finished watch. }) @@ -248,7 +229,7 @@ func (s *watchableStore) syncWatchers() { nws = append(nws, w) continue } - evs, nextRev, err := s.KV.(*store).RangeEvents(w.key, end, int64(limit), w.cur, w.end) + evs, nextRev, err := s.KV.(*store).RangeEvents(w.key, end, int64(limit), w.cur) if err != nil { w.stopWithError(err) continue @@ -259,17 +240,9 @@ func (s *watchableStore) syncWatchers() { w.ch <- ev pendingEventsGauge.Inc() } - // stop watcher if it reaches the end - if w.end > 0 && nextRev >= w.end { - w.stopWithError(ExceedEnd) - continue - } // switch to tracking future events if needed if nextRev > curRev { s.synced[string(w.key)] = append(s.synced[string(w.key)], w) - if w.end != 0 { - s.endm[w.end] = append(s.endm[w.end], w) - } continue } // put it back to try it in the next round @@ -283,7 +256,6 @@ func (s *watchableStore) syncWatchers() { // handle handles the change of the happening event on all watchers. func (s *watchableStore) handle(rev int64, ev storagepb.Event) { s.notify(rev, ev) - s.stopWatchers(rev) } // notify notifies the fact that given event at the given rev just happened to @@ -304,14 +276,6 @@ func (s *watchableStore) notify(rev int64, ev storagepb.Event) { pendingEventsGauge.Inc() nws = append(nws, w) default: - // put it back to unsynced place - if w.end != 0 { - for i, ew := range s.endm[w.end] { - if ew == w { - s.endm[w.end] = append(s.endm[w.end][:i], s.endm[w.end][i+1:]...) - } - } - } w.cur = rev s.unsynced = append(s.unsynced, w) slowWatchersGauge.Inc() @@ -321,21 +285,6 @@ func (s *watchableStore) notify(rev int64, ev storagepb.Event) { } } -// stopWatchers stops watchers with limit equal to rev. -func (s *watchableStore) stopWatchers(rev int64) { - for i, wa := range s.endm[rev+1] { - k := string(wa.key) - for _, w := range s.synced[k] { - if w == wa { - s.synced[k] = append(s.synced[k][:i], s.synced[k][i+1:]...) - watchersGauge.Dec() - } - } - wa.stopWithError(ExceedEnd) - } - delete(s.endm, rev+1) -} - type ongoingTx struct { // keys put/deleted in the ongoing txn putm map[string]bool diff --git a/storage/watcher.go b/storage/watcher.go index aa4092794..5971e9b2c 100644 --- a/storage/watcher.go +++ b/storage/watcher.go @@ -24,19 +24,17 @@ type watcher struct { key []byte prefix bool cur int64 - end int64 ch chan storagepb.Event mu sync.Mutex err error } -func newWatcher(key []byte, prefix bool, start, end int64) *watcher { +func newWatcher(key []byte, prefix bool, start int64) *watcher { return &watcher{ key: key, prefix: prefix, cur: start, - end: end, ch: make(chan storagepb.Event, 10), } } diff --git a/storage/watcher_bench_test.go b/storage/watcher_bench_test.go index 818f441cc..fd556a49a 100644 --- a/storage/watcher_bench_test.go +++ b/storage/watcher_bench_test.go @@ -26,6 +26,6 @@ func BenchmarkKVWatcherMemoryUsage(b *testing.B) { b.ReportAllocs() b.StartTimer() for i := 0; i < b.N; i++ { - s.Watcher([]byte(fmt.Sprint("foo", i)), false, 0, 0) + s.Watcher([]byte(fmt.Sprint("foo", i)), false, 0) } }