From 5984e4636497a8f8057609fbc529dfb408463bc8 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 19 May 2016 23:23:51 -0700 Subject: [PATCH] mvcc: move blocked sync watcher work to victim list Instead of holding the store lock while doing a lot of work like when syncung unsynced watchers, the work from a blocked synced notify can be reused and dispatched without holding the store lock for long. --- mvcc/watchable_store.go | 126 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 122 insertions(+), 4 deletions(-) diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 8ba6b0005..2f0366493 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -46,6 +46,10 @@ type watchableStore struct { *store + // victims are watcher batches that were blocked on the watch channel + victims []watcherBatch + victimc chan struct{} + // contains all unsynced watchers that needs to sync with events that have happened unsynced watcherGroup @@ -68,6 +72,7 @@ func New(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) Consisten func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *watchableStore { s := &watchableStore{ store: NewStore(b, le, ig), + victimc: make(chan struct{}, 1), unsynced: newWatcherGroup(), synced: newWatcherGroup(), stopc: make(chan struct{}), @@ -76,8 +81,9 @@ func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGet // use this store as the deleter so revokes trigger watch events s.le.SetRangeDeleter(s) } - s.wg.Add(1) + s.wg.Add(2) go s.syncWatchersLoop() + go s.syncVictimsLoop() return s } @@ -217,6 +223,15 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c watcherGauge.Dec() } else if s.synced.delete(wa) { watcherGauge.Dec() + } else { + for _, wb := range s.victims { + if wb[wa] != nil { + slowWatcherGauge.Dec() + watcherGauge.Dec() + delete(wb, wa) + break + } + } } s.mu.Unlock() @@ -254,6 +269,89 @@ func (s *watchableStore) syncWatchersLoop() { } } +// syncVictimsLoop tries to write precomputed watcher responses to +// watchers that had a blocked watcher channel +func (s *watchableStore) syncVictimsLoop() { + defer s.wg.Done() + + for { + for s.moveVictims() != 0 { + // try to update all victim watchers + } + s.mu.Lock() + isEmpty := len(s.victims) == 0 + s.mu.Unlock() + + var tickc <-chan time.Time + if !isEmpty { + tickc = time.After(10 * time.Millisecond) + } + + select { + case <-tickc: + case <-s.victimc: + case <-s.stopc: + return + } + } +} + +// moveVictims tries to update watches with already pending event data +func (s *watchableStore) moveVictims() (moved int) { + s.mu.Lock() + victims := s.victims + s.victims = nil + s.mu.Unlock() + + var newVictim watcherBatch + for _, wb := range victims { + // try to send responses again + for w, eb := range wb { + select { + case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: w.cur}: + pendingEventsGauge.Add(float64(len(eb.evs))) + default: + if newVictim == nil { + newVictim = make(watcherBatch) + } + newVictim[w] = eb + continue + } + moved++ + } + + // assign completed victim watchers to unsync/sync + s.mu.Lock() + s.store.mu.Lock() + curRev := s.store.currentRev.main + for w, eb := range wb { + if newVictim != nil && newVictim[w] != nil { + // couldn't send watch response; stays victim + continue + } + if eb.moreRev != 0 { + w.cur = eb.moreRev + } + if w.cur < curRev { + s.unsynced.add(w) + } else { + slowWatcherGauge.Dec() + s.synced.add(w) + } + } + s.store.mu.Unlock() + s.mu.Unlock() + } + + if len(newVictim) > 0 { + s.mu.Lock() + s.victims = append(s.victims, newVictim) + s.mu.Unlock() + } + + return moved +} + // syncWatchers syncs unsynced watchers by: // 1. choose a set of watchers from the unsynced watcher group // 2. iterate over the set to get the minimum revision and remove compacted watchers @@ -314,7 +412,11 @@ func (s *watchableStore) syncWatchers() { s.unsynced.delete(w) } - slowWatcherGauge.Set(float64(s.unsynced.size())) + vsz := 0 + for _, v := range s.victims { + vsz += len(v) + } + slowWatcherGauge.Set(float64(s.unsynced.size() + vsz)) } // kvsToEvents gets all events for the watchers from all key-value pairs @@ -343,6 +445,7 @@ func kvsToEvents(wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) { // notify notifies the fact that given event at the given rev just happened to // watchers that watch on the key of the event. func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) { + var victim watcherBatch for w, eb := range newWatcherBatch(&s.synced, evs) { if eb.revs != 1 { log.Panicf("mvcc: unexpected multiple revisions in notification") @@ -351,13 +454,28 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) { case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.Rev()}: pendingEventsGauge.Add(float64(len(eb.evs))) default: - // move slow watcher to unsynced + // move slow watcher to victims w.cur = rev - s.unsynced.add(w) + if victim == nil { + victim = make(watcherBatch) + } + victim[w] = eb s.synced.delete(w) slowWatcherGauge.Inc() } } + s.addVictim(victim) +} + +func (s *watchableStore) addVictim(victim watcherBatch) { + if victim == nil { + return + } + s.victims = append(s.victims, victim) + select { + case s.victimc <- struct{}{}: + default: + } } func (s *watchableStore) rev() int64 { return s.store.Rev() }