From 810c3e74a8e479d7752621c05bfb5b241d3b99ba Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 2 Feb 2016 19:15:46 -0800 Subject: [PATCH 1/2] storage: remove unnecessary abstraction --- storage/watchable_store.go | 20 ++--------------- storage/watchable_store_test.go | 39 --------------------------------- 2 files changed, 2 insertions(+), 57 deletions(-) diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 482f83917..789ea0f24 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -15,7 +15,6 @@ package storage import ( - "fmt" "log" "math" "strings" @@ -206,9 +205,7 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, id Watch k := string(key) if startRev == 0 { - if err := unsafeAddWatcher(s.synced, k, wa); err != nil { - log.Panicf("error unsafeAddWatcher (%v) for key %s", err, k) - } + s.synced.add(wa) } else { slowWatcherGauge.Inc() s.unsynced[wa] = struct{}{} @@ -358,10 +355,7 @@ func (s *watchableStore) syncWatchers() { // will be processed next time and hopefully it will not be full. continue } - k := string(w.key) - if err := unsafeAddWatcher(s.synced, k, w); err != nil { - log.Panicf("error unsafeAddWatcher (%v) for key %s", err, k) - } + s.synced.add(w) delete(s.unsynced, w) } @@ -416,16 +410,6 @@ type watcher struct { ch chan<- WatchResponse } -// unsafeAddWatcher puts watcher with key k into watchableStore's synced. -// Make sure to this is thread-safe using mutex before and after. -func unsafeAddWatcher(synced watcherSetByKey, k string, wa *watcher) error { - if wa == nil { - return fmt.Errorf("nil watcher received") - } - synced.add(wa) - return nil -} - // newWatcherToEventMap creates a map that has watcher as key and events as // value. It enables quick events look up by watcher. func newWatcherToEventMap(sm watcherSetByKey, evs []storagepb.Event) map[*watcher][]storagepb.Event { diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go index d934b3aa5..1cd3a31e9 100644 --- a/storage/watchable_store_test.go +++ b/storage/watchable_store_test.go @@ -215,45 +215,6 @@ func TestSyncWatchers(t *testing.T) { } } -func TestUnsafeAddWatcher(t *testing.T) { - b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(b, &lease.FakeLessor{}) - defer func() { - s.store.Close() - os.Remove(tmpPath) - }() - testKey := []byte("foo") - testValue := []byte("bar") - s.Put(testKey, testValue, lease.NoLease) - - size := 10 - ws := make([]*watcher, size) - for i := 0; i < size; i++ { - ws[i] = &watcher{ - key: testKey, - prefix: true, - cur: 0, - } - } - // to test if unsafeAddWatcher is correctly updating - // synced map when adding new watcher. - for i, wa := range ws { - if err := unsafeAddWatcher(s.synced, string(testKey), wa); err != nil { - t.Errorf("#%d: error = %v, want nil", i, err) - } - if v, ok := s.synced[string(testKey)]; !ok { - t.Errorf("#%d: ok = %v, want ok true", i, ok) - } else { - if len(v) != i+1 { - t.Errorf("#%d: len(v) = %d, want %d", i, len(v), i+1) - } - if _, ok := v[wa]; !ok { - t.Errorf("#%d: ok = %v, want ok true", i, ok) - } - } - } -} - func TestNewMapwatcherToEventMap(t *testing.T) { k0, k1, k2 := []byte("foo0"), []byte("foo1"), []byte("foo2") v0, v1, v2 := []byte("bar0"), []byte("bar1"), []byte("bar2") From 8dc6248aa71193cc025685ce8537f48f84982e07 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 2 Feb 2016 19:28:42 -0800 Subject: [PATCH 2/2] storage: add set delete --- storage/watchable_store.go | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 789ea0f24..02a311b04 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -55,6 +55,22 @@ func (w watcherSetByKey) add(wa *watcher) { set.add(wa) } +func (w watcherSetByKey) delete(wa *watcher) bool { + k := string(wa.key) + if v, ok := w[k]; ok { + if _, ok := v[wa]; ok { + delete(v, wa) + // if there is nothing in the set, + // remove the set + if len(v) == 0 { + delete(w, k) + } + return true + } + } + return false +} + type watchable interface { watch(key []byte, prefix bool, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc) rev() int64 @@ -203,7 +219,6 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, id Watch ch: ch, } - k := string(key) if startRev == 0 { s.synced.add(wa) } else { @@ -223,16 +238,8 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, id Watch return } - if v, ok := s.synced[k]; ok { - if _, ok := v[wa]; ok { - delete(v, wa) - // if there is nothing in s.synced[k], - // remove the key from the synced - if len(v) == 0 { - delete(s.synced, k) - } - watcherGauge.Dec() - } + if s.synced.delete(wa) { + watcherGauge.Dec() } // If we cannot find it, it should have finished watch. })