From e5b35b82c52b0d5ac8484a7491353c252244c19f Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 2 Feb 2016 18:56:36 -0800 Subject: [PATCH] storage: add watchSet and watchSetByKey type --- storage/watchable_store.go | 57 +++++++++++++++------------ storage/watchable_store_bench_test.go | 4 +- storage/watchable_store_test.go | 22 +++++------ 3 files changed, 45 insertions(+), 38 deletions(-) diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 7682d2985..482f83917 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -35,6 +35,27 @@ const ( chanBufLen = 1024 ) +type ( + watcherSetByKey map[string]watcherSet + watcherSet map[*watcher]struct{} +) + +func (w watcherSet) add(wa *watcher) { + if _, ok := w[wa]; ok { + panic("add watcher twice!") + } + w[wa] = struct{}{} +} + +func (w watcherSetByKey) add(wa *watcher) { + set := w[string(wa.key)] + if set == nil { + set = make(watcherSet) + w[string(wa.key)] = set + } + set.add(wa) +} + type watchable interface { watch(key []byte, prefix bool, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc) rev() int64 @@ -46,11 +67,11 @@ type watchableStore struct { *store // contains all unsynced watchers that needs to sync with events that have happened - unsynced map[*watcher]struct{} + unsynced watcherSet // contains all synced watchers that are in sync with the progress of the store. // The key of the map is the key that the watcher watches on. - synced map[string]map[*watcher]struct{} + synced watcherSetByKey stopc chan struct{} wg sync.WaitGroup @@ -63,8 +84,8 @@ type cancelFunc func() func newWatchableStore(b backend.Backend, le lease.Lessor) *watchableStore { s := &watchableStore{ store: NewStore(b, le), - unsynced: make(map[*watcher]struct{}), - synced: make(map[string]map[*watcher]struct{}), + unsynced: make(watcherSet), + synced: make(watcherSetByKey), stopc: make(chan struct{}), } if s.le != nil { @@ -185,7 +206,7 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, id Watch k := string(key) if startRev == 0 { - if err := unsafeAddWatcher(&s.synced, k, wa); err != nil { + if err := unsafeAddWatcher(s.synced, k, wa); err != nil { log.Panicf("error unsafeAddWatcher (%v) for key %s", err, k) } } else { @@ -261,7 +282,7 @@ func (s *watchableStore) syncWatchers() { compactionRev := s.store.compactMainRev // TODO: change unsynced struct type same to this - keyToUnsynced := make(map[string]map[*watcher]struct{}) + keyToUnsynced := make(watcherSetByKey) prefixes := make(map[string]struct{}) for w := range s.unsynced { @@ -282,10 +303,7 @@ func (s *watchableStore) syncWatchers() { minRev = w.cur } - if _, ok := keyToUnsynced[k]; !ok { - keyToUnsynced[k] = make(map[*watcher]struct{}) - } - keyToUnsynced[k][w] = struct{}{} + keyToUnsynced.add(w) if w.prefix { prefixes[k] = struct{}{} @@ -341,7 +359,7 @@ func (s *watchableStore) syncWatchers() { continue } k := string(w.key) - if err := unsafeAddWatcher(&s.synced, k, w); err != nil { + if err := unsafeAddWatcher(s.synced, k, w); err != nil { log.Panicf("error unsafeAddWatcher (%v) for key %s", err, k) } delete(s.unsynced, w) @@ -400,28 +418,17 @@ type watcher struct { // unsafeAddWatcher puts watcher with key k into watchableStore's synced. // Make sure to this is thread-safe using mutex before and after. -func unsafeAddWatcher(synced *map[string]map[*watcher]struct{}, k string, wa *watcher) error { +func unsafeAddWatcher(synced watcherSetByKey, k string, wa *watcher) error { if wa == nil { return fmt.Errorf("nil watcher received") } - mp := *synced - if v, ok := mp[k]; ok { - if _, ok := v[wa]; ok { - return fmt.Errorf("put the same watcher twice: %+v", wa) - } else { - v[wa] = struct{}{} - } - return nil - } - - mp[k] = make(map[*watcher]struct{}) - mp[k][wa] = struct{}{} + synced.add(wa) return nil } // newWatcherToEventMap creates a map that has watcher as key and events as // value. It enables quick events look up by watcher. -func newWatcherToEventMap(sm map[string]map[*watcher]struct{}, evs []storagepb.Event) map[*watcher][]storagepb.Event { +func newWatcherToEventMap(sm watcherSetByKey, evs []storagepb.Event) map[*watcher][]storagepb.Event { watcherToEvents := make(map[*watcher][]storagepb.Event) for _, ev := range evs { key := string(ev.Kv.Key) diff --git a/storage/watchable_store_bench_test.go b/storage/watchable_store_bench_test.go index 49f3c0cb6..972f93c4b 100644 --- a/storage/watchable_store_bench_test.go +++ b/storage/watchable_store_bench_test.go @@ -40,11 +40,11 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { // in unsynced for this benchmark. ws := &watchableStore{ store: s, - unsynced: make(map[*watcher]struct{}), + unsynced: make(watcherSet), // to make the test not crash from assigning to nil map. // 'synced' doesn't get populated in this test. - synced: make(map[string]map[*watcher]struct{}), + synced: make(watcherSetByKey), } defer func() { diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go index dc37fd399..d934b3aa5 100644 --- a/storage/watchable_store_test.go +++ b/storage/watchable_store_test.go @@ -82,11 +82,11 @@ func TestCancelUnsynced(t *testing.T) { // in unsynced to test if syncWatchers works as expected. s := &watchableStore{ store: NewStore(b, &lease.FakeLessor{}), - unsynced: make(map[*watcher]struct{}), + unsynced: make(watcherSet), // to make the test not crash from assigning to nil map. // 'synced' doesn't get populated in this test. - synced: make(map[string]map[*watcher]struct{}), + synced: make(watcherSetByKey), } defer func() { @@ -137,8 +137,8 @@ func TestSyncWatchers(t *testing.T) { s := &watchableStore{ store: NewStore(b, &lease.FakeLessor{}), - unsynced: make(map[*watcher]struct{}), - synced: make(map[string]map[*watcher]struct{}), + unsynced: make(watcherSet), + synced: make(watcherSetByKey), } defer func() { @@ -238,7 +238,7 @@ func TestUnsafeAddWatcher(t *testing.T) { // to test if unsafeAddWatcher is correctly updating // synced map when adding new watcher. for i, wa := range ws { - if err := unsafeAddWatcher(&s.synced, string(testKey), wa); err != nil { + if err := unsafeAddWatcher(s.synced, string(testKey), wa); err != nil { t.Errorf("#%d: error = %v, want nil", i, err) } if v, ok := s.synced[string(testKey)]; !ok { @@ -276,14 +276,14 @@ func TestNewMapwatcherToEventMap(t *testing.T) { } tests := []struct { - sync map[string]map[*watcher]struct{} + sync watcherSetByKey evs []storagepb.Event wwe map[*watcher][]storagepb.Event }{ // no watcher in sync, some events should return empty wwe { - map[string]map[*watcher]struct{}{}, + watcherSetByKey{}, evs, map[*watcher][]storagepb.Event{}, }, @@ -291,7 +291,7 @@ func TestNewMapwatcherToEventMap(t *testing.T) { // one watcher in sync, one event that does not match the key of that // watcher should return empty wwe { - map[string]map[*watcher]struct{}{ + watcherSetByKey{ string(k2): {ws[2]: struct{}{}}, }, evs[:1], @@ -301,7 +301,7 @@ func TestNewMapwatcherToEventMap(t *testing.T) { // one watcher in sync, one event that matches the key of that // watcher should return wwe with that matching watcher { - map[string]map[*watcher]struct{}{ + watcherSetByKey{ string(k1): {ws[1]: struct{}{}}, }, evs[1:2], @@ -314,7 +314,7 @@ func TestNewMapwatcherToEventMap(t *testing.T) { // that matches the key of only one of the watcher should return wwe // with the matching watcher { - map[string]map[*watcher]struct{}{ + watcherSetByKey{ string(k0): {ws[0]: struct{}{}}, string(k2): {ws[2]: struct{}{}}, }, @@ -327,7 +327,7 @@ func TestNewMapwatcherToEventMap(t *testing.T) { // two watchers in sync that watches the same key, two events that // match the keys should return wwe with those two watchers { - map[string]map[*watcher]struct{}{ + watcherSetByKey{ string(k0): {ws[0]: struct{}{}}, string(k1): {ws[1]: struct{}{}}, },