mvcc: move blocked sync watcher work to victim list

Instead of holding the store lock while doing a lot of work like when syncung
unsynced watchers, the work from a blocked synced notify can be reused and
dispatched without holding the store lock for long.
This commit is contained in:
Anthony Romano 2016-05-19 23:23:51 -07:00
parent 0b34b236d6
commit 5984e46364

View File

@ -46,6 +46,10 @@ type watchableStore struct {
*store
// victims are watcher batches that were blocked on the watch channel
victims []watcherBatch
victimc chan struct{}
// contains all unsynced watchers that needs to sync with events that have happened
unsynced watcherGroup
@ -68,6 +72,7 @@ func New(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) Consisten
func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *watchableStore {
s := &watchableStore{
store: NewStore(b, le, ig),
victimc: make(chan struct{}, 1),
unsynced: newWatcherGroup(),
synced: newWatcherGroup(),
stopc: make(chan struct{}),
@ -76,8 +81,9 @@ func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGet
// use this store as the deleter so revokes trigger watch events
s.le.SetRangeDeleter(s)
}
s.wg.Add(1)
s.wg.Add(2)
go s.syncWatchersLoop()
go s.syncVictimsLoop()
return s
}
@ -217,6 +223,15 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c
watcherGauge.Dec()
} else if s.synced.delete(wa) {
watcherGauge.Dec()
} else {
for _, wb := range s.victims {
if wb[wa] != nil {
slowWatcherGauge.Dec()
watcherGauge.Dec()
delete(wb, wa)
break
}
}
}
s.mu.Unlock()
@ -254,6 +269,89 @@ func (s *watchableStore) syncWatchersLoop() {
}
}
// syncVictimsLoop tries to write precomputed watcher responses to
// watchers that had a blocked watcher channel
func (s *watchableStore) syncVictimsLoop() {
defer s.wg.Done()
for {
for s.moveVictims() != 0 {
// try to update all victim watchers
}
s.mu.Lock()
isEmpty := len(s.victims) == 0
s.mu.Unlock()
var tickc <-chan time.Time
if !isEmpty {
tickc = time.After(10 * time.Millisecond)
}
select {
case <-tickc:
case <-s.victimc:
case <-s.stopc:
return
}
}
}
// moveVictims tries to update watches with already pending event data
func (s *watchableStore) moveVictims() (moved int) {
s.mu.Lock()
victims := s.victims
s.victims = nil
s.mu.Unlock()
var newVictim watcherBatch
for _, wb := range victims {
// try to send responses again
for w, eb := range wb {
select {
case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: w.cur}:
pendingEventsGauge.Add(float64(len(eb.evs)))
default:
if newVictim == nil {
newVictim = make(watcherBatch)
}
newVictim[w] = eb
continue
}
moved++
}
// assign completed victim watchers to unsync/sync
s.mu.Lock()
s.store.mu.Lock()
curRev := s.store.currentRev.main
for w, eb := range wb {
if newVictim != nil && newVictim[w] != nil {
// couldn't send watch response; stays victim
continue
}
if eb.moreRev != 0 {
w.cur = eb.moreRev
}
if w.cur < curRev {
s.unsynced.add(w)
} else {
slowWatcherGauge.Dec()
s.synced.add(w)
}
}
s.store.mu.Unlock()
s.mu.Unlock()
}
if len(newVictim) > 0 {
s.mu.Lock()
s.victims = append(s.victims, newVictim)
s.mu.Unlock()
}
return moved
}
// syncWatchers syncs unsynced watchers by:
// 1. choose a set of watchers from the unsynced watcher group
// 2. iterate over the set to get the minimum revision and remove compacted watchers
@ -314,7 +412,11 @@ func (s *watchableStore) syncWatchers() {
s.unsynced.delete(w)
}
slowWatcherGauge.Set(float64(s.unsynced.size()))
vsz := 0
for _, v := range s.victims {
vsz += len(v)
}
slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))
}
// kvsToEvents gets all events for the watchers from all key-value pairs
@ -343,6 +445,7 @@ func kvsToEvents(wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
// notify notifies the fact that given event at the given rev just happened to
// watchers that watch on the key of the event.
func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
var victim watcherBatch
for w, eb := range newWatcherBatch(&s.synced, evs) {
if eb.revs != 1 {
log.Panicf("mvcc: unexpected multiple revisions in notification")
@ -351,13 +454,28 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.Rev()}:
pendingEventsGauge.Add(float64(len(eb.evs)))
default:
// move slow watcher to unsynced
// move slow watcher to victims
w.cur = rev
s.unsynced.add(w)
if victim == nil {
victim = make(watcherBatch)
}
victim[w] = eb
s.synced.delete(w)
slowWatcherGauge.Inc()
}
}
s.addVictim(victim)
}
func (s *watchableStore) addVictim(victim watcherBatch) {
if victim == nil {
return
}
s.victims = append(s.victims, victim)
select {
case s.victimc <- struct{}{}:
default:
}
}
func (s *watchableStore) rev() int64 { return s.store.Rev() }