From f73d0ed1d99fd7975c143138857e6c355ac567db Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 21 Oct 2015 12:06:27 -0700 Subject: [PATCH] storage: use map for watchable store unsynced This is for `TODO: use map to reduce cancel cost`. I switched slice to map, and benchmark results show that map implementation performs better, as follows: ``` [1]: benchmark old ns/op new ns/op delta BenchmarkWatchableStoreUnsyncedCancel 215212 1307 -99.39% BenchmarkWatchableStoreUnsyncedCancel-2 120453 710 -99.41% BenchmarkWatchableStoreUnsyncedCancel-4 120765 748 -99.38% BenchmarkWatchableStoreUnsyncedCancel-8 121391 719 -99.41% benchmark old allocs new allocs delta BenchmarkWatchableStoreUnsyncedCancel 0 0 +0.00% BenchmarkWatchableStoreUnsyncedCancel-2 0 0 +0.00% BenchmarkWatchableStoreUnsyncedCancel-4 0 0 +0.00% BenchmarkWatchableStoreUnsyncedCancel-8 0 0 +0.00% benchmark old bytes new bytes delta BenchmarkWatchableStoreUnsyncedCancel 200 1 -99.50% BenchmarkWatchableStoreUnsyncedCancel-2 138 0 -100.00% BenchmarkWatchableStoreUnsyncedCancel-4 138 0 -100.00% BenchmarkWatchableStoreUnsyncedCancel-8 139 0 -100.00% [2]: benchmark old ns/op new ns/op delta BenchmarkWatchableStoreUnsyncedCancel 212550 1117 -99.47% BenchmarkWatchableStoreUnsyncedCancel-2 120927 691 -99.43% BenchmarkWatchableStoreUnsyncedCancel-4 120752 699 -99.42% BenchmarkWatchableStoreUnsyncedCancel-8 121012 688 -99.43% benchmark old allocs new allocs delta BenchmarkWatchableStoreUnsyncedCancel 0 0 +0.00% BenchmarkWatchableStoreUnsyncedCancel-2 0 0 +0.00% BenchmarkWatchableStoreUnsyncedCancel-4 0 0 +0.00% BenchmarkWatchableStoreUnsyncedCancel-8 0 0 +0.00% benchmark old bytes new bytes delta BenchmarkWatchableStoreUnsyncedCancel 197 1 -99.49% BenchmarkWatchableStoreUnsyncedCancel-2 138 0 -100.00% BenchmarkWatchableStoreUnsyncedCancel-4 138 0 -100.00% BenchmarkWatchableStoreUnsyncedCancel-8 139 0 -100.00% [3]: benchmark old ns/op new ns/op delta BenchmarkWatchableStoreUnsyncedCancel 214268 1183 -99.45% BenchmarkWatchableStoreUnsyncedCancel-2 120763 759 -99.37% BenchmarkWatchableStoreUnsyncedCancel-4 120321 708 -99.41% BenchmarkWatchableStoreUnsyncedCancel-8 121628 680 -99.44% benchmark old allocs new allocs delta BenchmarkWatchableStoreUnsyncedCancel 0 0 +0.00% BenchmarkWatchableStoreUnsyncedCancel-2 0 0 +0.00% BenchmarkWatchableStoreUnsyncedCancel-4 0 0 +0.00% BenchmarkWatchableStoreUnsyncedCancel-8 0 0 +0.00% benchmark old bytes new bytes delta BenchmarkWatchableStoreUnsyncedCancel 200 1 -99.50% BenchmarkWatchableStoreUnsyncedCancel-2 139 0 -100.00% BenchmarkWatchableStoreUnsyncedCancel-4 138 0 -100.00% BenchmarkWatchableStoreUnsyncedCancel-8 139 0 -100.00% [4]: benchmark old ns/op new ns/op delta BenchmarkWatchableStoreUnsyncedCancel 208332 1089 -99.48% BenchmarkWatchableStoreUnsyncedCancel-2 121011 691 -99.43% BenchmarkWatchableStoreUnsyncedCancel-4 120678 681 -99.44% BenchmarkWatchableStoreUnsyncedCancel-8 121303 721 -99.41% benchmark old allocs new allocs delta BenchmarkWatchableStoreUnsyncedCancel 0 0 +0.00% BenchmarkWatchableStoreUnsyncedCancel-2 0 0 +0.00% BenchmarkWatchableStoreUnsyncedCancel-4 0 0 +0.00% BenchmarkWatchableStoreUnsyncedCancel-8 0 0 +0.00% benchmark old bytes new bytes delta BenchmarkWatchableStoreUnsyncedCancel 194 1 -99.48% BenchmarkWatchableStoreUnsyncedCancel-2 139 0 -100.00% BenchmarkWatchableStoreUnsyncedCancel-4 139 0 -100.00% BenchmarkWatchableStoreUnsyncedCancel-8 139 0 -100.00% [5]: benchmark old ns/op new ns/op delta BenchmarkWatchableStoreUnsyncedCancel 211900 1097 -99.48% BenchmarkWatchableStoreUnsyncedCancel-2 121795 753 -99.38% BenchmarkWatchableStoreUnsyncedCancel-4 123182 700 -99.43% BenchmarkWatchableStoreUnsyncedCancel-8 122820 688 -99.44% benchmark old allocs new allocs delta BenchmarkWatchableStoreUnsyncedCancel 0 0 +0.00% BenchmarkWatchableStoreUnsyncedCancel-2 0 0 +0.00% BenchmarkWatchableStoreUnsyncedCancel-4 0 0 +0.00% BenchmarkWatchableStoreUnsyncedCancel-8 0 0 +0.00% benchmark old bytes new bytes delta BenchmarkWatchableStoreUnsyncedCancel 198 1 -99.49% BenchmarkWatchableStoreUnsyncedCancel-2 140 0 -100.00% BenchmarkWatchableStoreUnsyncedCancel-4 141 0 -100.00% BenchmarkWatchableStoreUnsyncedCancel-8 141 0 -100.00% ``` --- storage/watchable_store.go | 38 ++++++------- storage/watchable_store_bench_test.go | 81 +++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 22 deletions(-) create mode 100644 storage/watchable_store_bench_test.go diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 5126baba6..847a57ff9 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -28,8 +28,8 @@ type watchableStore struct { *store // contains all unsynced watchers that needs to sync events that have happened - // TODO: use map to reduce cancel cost - unsynced []*watcher + unsynced map[*watcher]struct{} + // contains all synced watchers that are tracking the events that will happen // The key of the map is the key that the watcher is watching on. synced map[string][]*watcher @@ -41,9 +41,10 @@ type watchableStore struct { func newWatchableStore(path string) *watchableStore { s := &watchableStore{ - store: newStore(path), - synced: make(map[string][]*watcher), - stopc: make(chan struct{}), + store: newStore(path), + unsynced: make(map[*watcher]struct{}), + synced: make(map[string][]*watcher), + stopc: make(chan struct{}), } s.wg.Add(1) go s.syncWatchersLoop() @@ -161,7 +162,7 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev int64) (Watch s.synced[k] = append(s.synced[k], wa) } else { slowWatchersGauge.Inc() - s.unsynced = append(s.unsynced, wa) + s.unsynced[wa] = struct{}{} } watchersGauge.Inc() @@ -171,13 +172,11 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev int64) (Watch wa.stopWithError(ErrCanceled) // remove global references of the watcher - for i, w := range s.unsynced { - if w == wa { - s.unsynced = append(s.unsynced[:i], s.unsynced[i+1:]...) - slowWatchersGauge.Dec() - watchersGauge.Dec() - return - } + if _, ok := s.unsynced[wa]; ok { + delete(s.unsynced, wa) + slowWatchersGauge.Dec() + watchersGauge.Dec() + return } for i, w := range s.synced[k] { @@ -212,11 +211,7 @@ func (s *watchableStore) syncWatchersLoop() { // syncWatchers syncs the watchers in the unsyncd map. func (s *watchableStore) syncWatchers() { _, curRev, _ := s.store.Range(nil, nil, 0, 0) - - // filtering without allocating - // https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating - nws := s.unsynced[:0] - for _, w := range s.unsynced { + for w := range s.unsynced { var end []byte if w.prefix { end = make([]byte, len(w.key)) @@ -226,12 +221,12 @@ func (s *watchableStore) syncWatchers() { limit := cap(w.ch) - len(w.ch) // the channel is full, try it in the next round if limit == 0 { - nws = append(nws, w) continue } evs, nextRev, err := s.store.RangeEvents(w.key, end, int64(limit), w.cur) if err != nil { w.stopWithError(err) + delete(s.unsynced, w) continue } @@ -243,13 +238,12 @@ func (s *watchableStore) syncWatchers() { // switch to tracking future events if needed if nextRev > curRev { s.synced[string(w.key)] = append(s.synced[string(w.key)], w) + delete(s.unsynced, w) continue } // put it back to try it in the next round w.cur = nextRev - nws = append(nws, w) } - s.unsynced = nws slowWatchersGauge.Set(float64(len(s.unsynced))) } @@ -277,7 +271,7 @@ func (s *watchableStore) notify(rev int64, ev storagepb.Event) { nws = append(nws, w) default: w.cur = rev - s.unsynced = append(s.unsynced, w) + s.unsynced[w] = struct{}{} slowWatchersGauge.Inc() } } diff --git a/storage/watchable_store_bench_test.go b/storage/watchable_store_bench_test.go new file mode 100644 index 000000000..4c84b6097 --- /dev/null +++ b/storage/watchable_store_bench_test.go @@ -0,0 +1,81 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "math/rand" + "os" + "testing" +) + +// Benchmarks on cancel function performance for unsynced watchers +// in a WatchableStore. It creates k*N watchers to populate unsynced +// with a reasonably large number of watchers. And measures the time it +// takes to cancel N watchers out of k*N watchers. The performance is +// expected to differ depending on the unsynced member implementation. +// TODO: k is an arbitrary constant. We need to figure out what factor +// we should put to simulate the real-world use cases. +func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { + const k int = 2 + benchSampleSize := b.N + watcherSize := k * benchSampleSize + // manually create watchableStore instead of newWatchableStore + // because newWatchableStore periodically calls syncWatchersLoop + // method to sync watchers in unsynced map. We want to keep watchers + // in unsynced for this benchmark. + s := &watchableStore{ + store: newStore(tmpPath), + unsynced: make(map[*watcher]struct{}), + + // For previous implementation, use: + // unsynced: make([]*watcher, 0), + + // to make the test not crash from assigning to nil map. + // 'synced' doesn't get populated in this test. + synced: make(map[string][]*watcher), + } + + defer func() { + s.store.Close() + os.Remove(tmpPath) + }() + + // Put a key so that we can spawn watchers on that key + // (testKey in this test). This increases the rev to 1, + // and later we can we set the watcher's startRev to 1, + // and force watchers to be in unsynced. + testKey := []byte("foo") + testValue := []byte("bar") + s.Put(testKey, testValue) + + cancels := make([]CancelFunc, watcherSize) + for i := 0; i < watcherSize; i++ { + // non-0 value to keep watchers in unsynced + _, cancel := s.Watcher(testKey, true, 1) + cancels[i] = cancel + } + + // random-cancel N watchers to make it not biased towards + // data structures with an order, such as slice. + ix := rand.Perm(watcherSize) + + b.ResetTimer() + b.ReportAllocs() + + // cancel N watchers + for _, idx := range ix[:benchSampleSize] { + cancels[idx]() + } +}