From 48aebd9b09c2c7984c76ba083761b27e934798ff Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Sat, 21 Nov 2015 00:42:09 -0800 Subject: [PATCH] storage: use map for watchableStore synced This is for coreos#3859 switching slice to map for synced watchings. For a large amount of synced watchings, map implementation performs better. When putting 1 million watchers on the same key and canceling them one by one: original implementation takes 9m7.268221091s, while the one with map takes only 430.531637ms. --- storage/watchable_store.go | 79 +++++++++++++++------- storage/watchable_store_bench_test.go | 38 ++++++++++- storage/watchable_store_test.go | 97 +++++++++++++++++++++++++++ 3 files changed, 189 insertions(+), 25 deletions(-) create mode 100644 storage/watchable_store_test.go diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 96825fbc4..96f0688d1 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -15,6 +15,7 @@ package storage import ( + "fmt" "log" "sync" "time" @@ -44,7 +45,7 @@ type watchableStore struct { // contains all synced watching that are tracking the events that will happen // The key of the map is the key that the watching is watching on. - synced map[string][]*watching + synced map[string]map[*watching]struct{} tx *ongoingTx stopc chan struct{} @@ -55,7 +56,7 @@ func newWatchableStore(path string) *watchableStore { s := &watchableStore{ store: newStore(path), unsynced: make(map[*watching]struct{}), - synced: make(map[string][]*watching), + synced: make(map[string]map[*watching]struct{}), stopc: make(chan struct{}), } s.wg.Add(1) @@ -185,7 +186,9 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, ch chan< k := string(key) if startRev == 0 { - s.synced[k] = append(s.synced[k], wa) + if err := unsafeAddWatching(&s.synced, k, wa); err != nil { + log.Panicf("error unsafeAddWatching (%v) for key %s", err, k) + } } else { slowWatchingGauge.Inc() s.unsynced[wa] = struct{}{} @@ -203,9 +206,14 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, ch chan< return } - for i, w := range s.synced[k] { - if w == wa { - s.synced[k] = append(s.synced[k][:i], s.synced[k][i+1:]...) + if v, ok := s.synced[k]; ok { + if _, ok := v[wa]; ok { + delete(v, wa) + // if there is nothing in s.synced[k], + // remove the key from the synced + if len(v) == 0 { + delete(s.synced, k) + } watchingGauge.Dec() } } @@ -272,7 +280,10 @@ func (s *watchableStore) syncWatchings() { } // switch to tracking future events if needed if nextRev > curRev { - s.synced[string(w.key)] = append(s.synced[string(w.key)], w) + k := string(w.key) + if err := unsafeAddWatching(&s.synced, k, w); err != nil { + log.Panicf("error unsafeAddWatching (%v) for key %s", err, k) + } delete(s.unsynced, w) continue } @@ -292,25 +303,25 @@ func (s *watchableStore) handle(rev int64, ev storagepb.Event) { func (s *watchableStore) notify(rev int64, ev storagepb.Event) { // check all prefixes of the key to notify all corresponded watchings for i := 0; i <= len(ev.Kv.Key); i++ { - ws := s.synced[string(ev.Kv.Key[:i])] - nws := ws[:0] - for _, w := range ws { - // the watching needs to be notified when either it watches prefix or - // the key is exactly matched. - if !w.prefix && i != len(ev.Kv.Key) { - continue - } - select { - case w.ch <- ev: - pendingEventsGauge.Inc() - nws = append(nws, w) - default: - w.cur = rev - s.unsynced[w] = struct{}{} - slowWatchingGauge.Inc() + k := string(ev.Kv.Key[:i]) + if wm, ok := s.synced[k]; ok { + for w := range wm { + // the watching needs to be notified when either it watches prefix or + // the key is exactly matched. + if !w.prefix && i != len(ev.Kv.Key) { + continue + } + select { + case w.ch <- ev: + pendingEventsGauge.Inc() + default: + w.cur = rev + s.unsynced[w] = struct{}{} + delete(wm, w) + slowWatchingGauge.Inc() + } } } - s.synced[string(ev.Kv.Key[:i])] = nws } } @@ -356,3 +367,23 @@ type watching struct { // The chan might be shared with other watchings. ch chan<- storagepb.Event } + +// unsafeAddWatching puts watching with key k into watchableStore's synced. +// Make sure to this is thread-safe using mutex before and after. +func unsafeAddWatching(synced *map[string]map[*watching]struct{}, k string, wa *watching) error { + if wa == nil { + return fmt.Errorf("nil watching received") + } + mp := *synced + if v, ok := mp[k]; ok { + if _, ok := v[wa]; ok { + return fmt.Errorf("put the same watch twice: %+v", wa) + } else { + v[wa] = struct{}{} + } + + } + mp[k] = make(map[*watching]struct{}) + mp[k][wa] = struct{}{} + return nil +} diff --git a/storage/watchable_store_bench_test.go b/storage/watchable_store_bench_test.go index 437022f9a..a4d6ad861 100644 --- a/storage/watchable_store_bench_test.go +++ b/storage/watchable_store_bench_test.go @@ -44,7 +44,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { // to make the test not crash from assigning to nil map. // 'synced' doesn't get populated in this test. - synced: make(map[string][]*watching), + synced: make(map[string]map[*watching]struct{}), } defer func() { @@ -81,3 +81,39 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { cancels[idx]() } } + +func BenchmarkWatchableStoreSyncedCancel(b *testing.B) { + s := newWatchableStore(tmpPath) + defer func() { + s.store.Close() + os.Remove(tmpPath) + }() + + // Put a key so that we can spawn watchers on that key + testKey := []byte("foo") + testValue := []byte("bar") + s.Put(testKey, testValue) + + w := s.NewWatcher() + + // put 1 million watchers on the same key + const watcherSize = 1000000 + + cancels := make([]CancelFunc, watcherSize) + for i := 0; i < watcherSize; i++ { + // 0 for startRev to keep watchers in synced + cancel := w.Watch(testKey, true, 0) + cancels[i] = cancel + } + + // randomly cancel watchers to make it not biased towards + // data structures with an order, such as slice. + ix := rand.Perm(watcherSize) + + b.ResetTimer() + b.ReportAllocs() + + for _, idx := range ix { + cancels[idx]() + } +} diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go new file mode 100644 index 000000000..eefae15a3 --- /dev/null +++ b/storage/watchable_store_test.go @@ -0,0 +1,97 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "os" + "testing" +) + +func TestWatch(t *testing.T) { + s := newWatchableStore(tmpPath) + defer func() { + s.store.Close() + os.Remove(tmpPath) + }() + testKey := []byte("foo") + testValue := []byte("bar") + s.Put(testKey, testValue) + + w := s.NewWatcher() + w.Watch(testKey, true, 0) + + if _, ok := s.synced[string(testKey)]; !ok { + // the key must have had an entry in synced + t.Errorf("existence = %v, want true", ok) + } +} + +func TestNewWatcherCancel(t *testing.T) { + s := newWatchableStore(tmpPath) + defer func() { + s.store.Close() + os.Remove(tmpPath) + }() + testKey := []byte("foo") + testValue := []byte("bar") + s.Put(testKey, testValue) + + w := s.NewWatcher() + cancel := w.Watch(testKey, true, 0) + + cancel() + + if _, ok := s.synced[string(testKey)]; ok { + // the key shoud have been deleted + t.Errorf("existence = %v, want false", ok) + } +} + +func TestUnsafeAddWatching(t *testing.T) { + s := newWatchableStore(tmpPath) + defer func() { + s.store.Close() + os.Remove(tmpPath) + }() + testKey := []byte("foo") + testValue := []byte("bar") + s.Put(testKey, testValue) + + wa := &watching{ + key: testKey, + prefix: true, + cur: 0, + } + + if err := unsafeAddWatching(&s.synced, string(testKey), wa); err != nil { + t.Error(err) + } + + if v, ok := s.synced[string(testKey)]; !ok { + // the key must have had entry in synced + t.Errorf("existence = %v, want true", ok) + } else { + if len(v) != 1 { + // the key must have ONE entry in its watching map + t.Errorf("len(v) = %d, want 1", len(v)) + } + } + + if err := unsafeAddWatching(&s.synced, string(testKey), wa); err == nil { + // unsafeAddWatching should have returned error + // when putting the same watch twice" + t.Error(`error = nil, want "put the same watch twice"`) + } +}