From 394ce5f3b8d946c0b68931e6af3a2f78a6359a43 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 20 May 2016 00:09:03 -0700 Subject: [PATCH] mvcc: move blocked unsynced watchers to victim list --- mvcc/watchable_store.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 2f0366493..e32abf9e0 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -383,6 +383,7 @@ func (s *watchableStore) syncWatchers() { evs := kvsToEvents(wg, revs, vs) tx.Unlock() + var victims watcherBatch wb := newWatcherBatch(wg, evs) for w := range wg.watchers { eb, ok := wb[w] @@ -394,23 +395,30 @@ func (s *watchableStore) syncWatchers() { continue } + w.cur = curRev + isBlocked := false select { case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}: pendingEventsGauge.Add(float64(len(eb.evs))) default: - // TODO: handle the full unsynced watchers. - // continue to process other watchers for now, the full ones - // will be processed next time and hopefully it will not be full. - continue + if victims == nil { + victims = make(watcherBatch) + } + isBlocked = true } - if eb.moreRev != 0 { - w.cur = eb.moreRev - continue + + if isBlocked { + victims[w] = eb + } else { + if eb.moreRev != 0 { + w.cur = eb.moreRev + continue + } + s.synced.add(w) } - w.cur = curRev - s.synced.add(w) s.unsynced.delete(w) } + s.addVictim(victims) vsz := 0 for _, v := range s.victims {