From 080272be175407c17018c22d353739ee35ecf31c Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 11 May 2016 02:53:17 -0700 Subject: [PATCH] mvcc: limit total watchers synced per sync Fixes #4567 --- mvcc/watchable_store.go | 60 ++++++++++++++++++++++++----------------- mvcc/watcher_group.go | 23 +++++++++++----- 2 files changed, 53 insertions(+), 30 deletions(-) diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 1ef571512..3dea895b1 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -30,6 +30,9 @@ const ( // TODO: find a good buf value. 1024 is just a random one that // seems to be reasonable. chanBufLen = 1024 + + // maxWatchersPerSync is the number of watchers to sync in a single batch + maxWatchersPerSync = 512 ) type watchable interface { @@ -231,36 +234,47 @@ func (s *watchableStore) syncWatchersLoop() { for { s.mu.Lock() + st := time.Now() + lastUnsyncedWatchers := s.unsynced.size() s.syncWatchers() + unsyncedWatchers := s.unsynced.size() s.mu.Unlock() + syncDuration := time.Since(st) + + waitDuration := 100 * time.Millisecond + // more work pending? + if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers { + // be fair to other store operations by yielding time taken + waitDuration = syncDuration + } select { - case <-time.After(100 * time.Millisecond): + case <-time.After(waitDuration): case <-s.stopc: return } } } -// syncWatchers periodically syncs unsynced watchers by: Iterate all unsynced -// watchers to get the minimum revision within its range, skipping the -// watcher if its current revision is behind the compact revision of the -// store. And use this minimum revision to get all key-value pairs. Then send -// those events to watchers. +// syncWatchers syncs unsynced watchers by: +// 1. choose a set of watchers from the unsynced watcher group +// 2. iterate over the set to get the minimum revision and remove compacted watchers +// 3. use minimum revision to get all key-value pairs and send those events to watchers +// 4. remove synced watchers in set from unsynced group and move to synced group func (s *watchableStore) syncWatchers() { - s.store.mu.Lock() - defer s.store.mu.Unlock() - if s.unsynced.size() == 0 { return } + s.store.mu.Lock() + defer s.store.mu.Unlock() + // in order to find key-value pairs from unsynced watchers, we need to // find min revision index, and these revisions can be used to // query the backend store of key-value pairs curRev := s.store.currentRev.main compactionRev := s.store.compactMainRev - minRev := s.unsynced.scanMinRev(curRev, compactionRev) + wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev) minBytes, maxBytes := newRevBytes(), newRevBytes() revToBytes(revision{main: minRev}, minBytes) revToBytes(revision{main: curRev + 1}, maxBytes) @@ -270,15 +284,22 @@ func (s *watchableStore) syncWatchers() { tx := s.store.b.BatchTx() tx.Lock() revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) - evs := kvsToEvents(&s.unsynced, revs, vs) + evs := kvsToEvents(wg, revs, vs) tx.Unlock() - wb := newWatcherBatch(&s.unsynced, evs) + wb := newWatcherBatch(wg, evs) + for w := range wg.watchers { + eb, ok := wb[w] + if !ok { + // bring un-notified watcher to synced + w.cur = curRev + s.synced.add(w) + s.unsynced.delete(w) + continue + } - for w, eb := range wb { select { - // s.store.Rev also uses Lock, so just return directly - case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.store.currentRev.main}: + case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}: pendingEventsGauge.Add(float64(len(eb.evs))) default: // TODO: handle the full unsynced watchers. @@ -295,15 +316,6 @@ func (s *watchableStore) syncWatchers() { s.unsynced.delete(w) } - // bring all un-notified watchers to synced. - for w := range s.unsynced.watchers { - if !wb.contains(w) { - w.cur = curRev - s.synced.add(w) - s.unsynced.delete(w) - } - } - slowWatcherGauge.Set(float64(s.unsynced.size())) } diff --git a/mvcc/watcher_group.go b/mvcc/watcher_group.go index 4b654b751..7990e8bf0 100644 --- a/mvcc/watcher_group.go +++ b/mvcc/watcher_group.go @@ -75,11 +75,6 @@ func (wb watcherBatch) add(w *watcher, ev mvccpb.Event) { eb.add(ev) } -func (wb watcherBatch) contains(w *watcher) bool { - _, ok := wb[w] - return ok -} - // newWatcherBatch maps watchers to their matched events. It enables quick // events look up by watcher. func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch { @@ -219,7 +214,23 @@ func (wg *watcherGroup) delete(wa *watcher) bool { return true } -func (wg *watcherGroup) scanMinRev(curRev int64, compactRev int64) int64 { +// choose selects watchers from the watcher group to update +func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watcherGroup, int64) { + if len(wg.watchers) < maxWatchers { + return wg, wg.chooseAll(curRev, compactRev) + } + ret := newWatcherGroup() + for w := range wg.watchers { + if maxWatchers <= 0 { + break + } + maxWatchers-- + ret.add(w) + } + return &ret, ret.chooseAll(curRev, compactRev) +} + +func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 { minRev := int64(math.MaxInt64) for w := range wg.watchers { if w.cur > curRev {