From 31c0c5181af6bc84d428baeeb87e5973559c4d92 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 2 Feb 2016 20:06:53 -0800 Subject: [PATCH] storage: make unsync a watcherSetByKey --- storage/watchable_store.go | 63 ++++++++++++++------------- storage/watchable_store_bench_test.go | 2 +- storage/watchable_store_test.go | 4 +- 3 files changed, 35 insertions(+), 34 deletions(-) diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 02a311b04..fef796008 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -55,6 +55,11 @@ func (w watcherSetByKey) add(wa *watcher) { set.add(wa) } +func (w watcherSetByKey) getSetByKey(key string) (watcherSet, bool) { + set, ok := w[key] + return set, ok +} + func (w watcherSetByKey) delete(wa *watcher) bool { k := string(wa.key) if v, ok := w[k]; ok { @@ -82,7 +87,7 @@ type watchableStore struct { *store // contains all unsynced watchers that needs to sync with events that have happened - unsynced watcherSet + unsynced watcherSetByKey // contains all synced watchers that are in sync with the progress of the store. // The key of the map is the key that the watcher watches on. @@ -99,7 +104,7 @@ type cancelFunc func() func newWatchableStore(b backend.Backend, le lease.Lessor) *watchableStore { s := &watchableStore{ store: NewStore(b, le), - unsynced: make(watcherSet), + unsynced: make(watcherSetByKey), synced: make(watcherSetByKey), stopc: make(chan struct{}), } @@ -223,16 +228,15 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, id Watch s.synced.add(wa) } else { slowWatcherGauge.Inc() - s.unsynced[wa] = struct{}{} + s.unsynced.add(wa) } watcherGauge.Inc() cancel := cancelFunc(func() { s.mu.Lock() defer s.mu.Unlock() - // remove global references of the watcher - if _, ok := s.unsynced[wa]; ok { - delete(s.unsynced, wa) + // remove references of the watcher + if s.unsynced.delete(wa) { slowWatcherGauge.Dec() watcherGauge.Dec() return @@ -285,32 +289,29 @@ func (s *watchableStore) syncWatchers() { curRev := s.store.currentRev.main compactionRev := s.store.compactMainRev - // TODO: change unsynced struct type same to this - keyToUnsynced := make(watcherSetByKey) prefixes := make(map[string]struct{}) + for _, set := range s.unsynced { + for w := range set { + k := string(w.key) - for w := range s.unsynced { - k := string(w.key) + if w.cur > curRev { + panic("watcher current revision should not exceed current revision") + } - if w.cur > curRev { - panic("watcher current revision should not exceed current revision") - } + if w.cur < compactionRev { + // TODO: return error compacted to that watcher instead of + // just removing it silently from unsynced. + s.unsynced.delete(w) + continue + } - if w.cur < compactionRev { - // TODO: return error compacted to that watcher instead of - // just removing it silently from unsynced. - delete(s.unsynced, w) - continue - } + if minRev >= w.cur { + minRev = w.cur + } - if minRev >= w.cur { - minRev = w.cur - } - - keyToUnsynced.add(w) - - if w.prefix { - prefixes[k] = struct{}{} + if w.prefix { + prefixes[k] = struct{}{} + } } } @@ -335,7 +336,7 @@ func (s *watchableStore) syncWatchers() { } k := string(kv.Key) - if _, ok := keyToUnsynced[k]; !ok && !matchPrefix(k, prefixes) { + if _, ok := s.unsynced.getSetByKey(k); !ok && !matchPrefix(k, prefixes) { continue } @@ -351,7 +352,7 @@ func (s *watchableStore) syncWatchers() { evs = append(evs, ev) } - for w, es := range newWatcherToEventMap(keyToUnsynced, evs) { + for w, es := range newWatcherToEventMap(s.unsynced, evs) { select { // s.store.Rev also uses Lock, so just return directly case w.ch <- WatchResponse{WatchID: w.id, Events: es, Revision: s.store.currentRev.main}: @@ -363,7 +364,7 @@ func (s *watchableStore) syncWatchers() { continue } s.synced.add(w) - delete(s.unsynced, w) + s.unsynced.delete(w) } slowWatcherGauge.Set(float64(len(s.unsynced))) @@ -390,7 +391,7 @@ func (s *watchableStore) notify(rev int64, evs []storagepb.Event) { default: // move slow watcher to unsynced w.cur = rev - s.unsynced[w] = struct{}{} + s.unsynced.add(w) delete(wm, w) slowWatcherGauge.Inc() } diff --git a/storage/watchable_store_bench_test.go b/storage/watchable_store_bench_test.go index 972f93c4b..91302be68 100644 --- a/storage/watchable_store_bench_test.go +++ b/storage/watchable_store_bench_test.go @@ -40,7 +40,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { // in unsynced for this benchmark. ws := &watchableStore{ store: s, - unsynced: make(watcherSet), + unsynced: make(watcherSetByKey), // to make the test not crash from assigning to nil map. // 'synced' doesn't get populated in this test. diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go index 1cd3a31e9..6136d2448 100644 --- a/storage/watchable_store_test.go +++ b/storage/watchable_store_test.go @@ -82,7 +82,7 @@ func TestCancelUnsynced(t *testing.T) { // in unsynced to test if syncWatchers works as expected. s := &watchableStore{ store: NewStore(b, &lease.FakeLessor{}), - unsynced: make(watcherSet), + unsynced: make(watcherSetByKey), // to make the test not crash from assigning to nil map. // 'synced' doesn't get populated in this test. @@ -137,7 +137,7 @@ func TestSyncWatchers(t *testing.T) { s := &watchableStore{ store: NewStore(b, &lease.FakeLessor{}), - unsynced: make(watcherSet), + unsynced: make(watcherSetByKey), synced: make(watcherSetByKey), }