Merge pull request #5318 from heyitsanthony/watcher-latency

batch watcher sync to reduce request latency
This commit is contained in:
Anthony Romano 2016-05-11 12:53:20 -07:00
commit b03a2f0323
3 changed files with 65 additions and 39 deletions

View File

@ -30,6 +30,9 @@ const (
// TODO: find a good buf value. 1024 is just a random one that // TODO: find a good buf value. 1024 is just a random one that
// seems to be reasonable. // seems to be reasonable.
chanBufLen = 1024 chanBufLen = 1024
// maxWatchersPerSync is the number of watchers to sync in a single batch
maxWatchersPerSync = 512
) )
type watchable interface { type watchable interface {
@ -231,36 +234,47 @@ func (s *watchableStore) syncWatchersLoop() {
for { for {
s.mu.Lock() s.mu.Lock()
st := time.Now()
lastUnsyncedWatchers := s.unsynced.size()
s.syncWatchers() s.syncWatchers()
unsyncedWatchers := s.unsynced.size()
s.mu.Unlock() s.mu.Unlock()
syncDuration := time.Since(st)
waitDuration := 100 * time.Millisecond
// more work pending?
if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
// be fair to other store operations by yielding time taken
waitDuration = syncDuration
}
select { select {
case <-time.After(100 * time.Millisecond): case <-time.After(waitDuration):
case <-s.stopc: case <-s.stopc:
return return
} }
} }
} }
// syncWatchers periodically syncs unsynced watchers by: Iterate all unsynced // syncWatchers syncs unsynced watchers by:
// watchers to get the minimum revision within its range, skipping the // 1. choose a set of watchers from the unsynced watcher group
// watcher if its current revision is behind the compact revision of the // 2. iterate over the set to get the minimum revision and remove compacted watchers
// store. And use this minimum revision to get all key-value pairs. Then send // 3. use minimum revision to get all key-value pairs and send those events to watchers
// those events to watchers. // 4. remove synced watchers in set from unsynced group and move to synced group
func (s *watchableStore) syncWatchers() { func (s *watchableStore) syncWatchers() {
s.store.mu.Lock()
defer s.store.mu.Unlock()
if s.unsynced.size() == 0 { if s.unsynced.size() == 0 {
return return
} }
s.store.mu.Lock()
defer s.store.mu.Unlock()
// in order to find key-value pairs from unsynced watchers, we need to // in order to find key-value pairs from unsynced watchers, we need to
// find min revision index, and these revisions can be used to // find min revision index, and these revisions can be used to
// query the backend store of key-value pairs // query the backend store of key-value pairs
curRev := s.store.currentRev.main curRev := s.store.currentRev.main
compactionRev := s.store.compactMainRev compactionRev := s.store.compactMainRev
minRev := s.unsynced.scanMinRev(curRev, compactionRev) wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
minBytes, maxBytes := newRevBytes(), newRevBytes() minBytes, maxBytes := newRevBytes(), newRevBytes()
revToBytes(revision{main: minRev}, minBytes) revToBytes(revision{main: minRev}, minBytes)
revToBytes(revision{main: curRev + 1}, maxBytes) revToBytes(revision{main: curRev + 1}, maxBytes)
@ -270,15 +284,22 @@ func (s *watchableStore) syncWatchers() {
tx := s.store.b.BatchTx() tx := s.store.b.BatchTx()
tx.Lock() tx.Lock()
revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
evs := kvsToEvents(&s.unsynced, revs, vs) evs := kvsToEvents(wg, revs, vs)
tx.Unlock() tx.Unlock()
wb := newWatcherBatch(&s.unsynced, evs) wb := newWatcherBatch(wg, evs)
for w := range wg.watchers {
eb, ok := wb[w]
if !ok {
// bring un-notified watcher to synced
w.cur = curRev
s.synced.add(w)
s.unsynced.delete(w)
continue
}
for w, eb := range wb {
select { select {
// s.store.Rev also uses Lock, so just return directly case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}:
case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.store.currentRev.main}:
pendingEventsGauge.Add(float64(len(eb.evs))) pendingEventsGauge.Add(float64(len(eb.evs)))
default: default:
// TODO: handle the full unsynced watchers. // TODO: handle the full unsynced watchers.
@ -295,15 +316,6 @@ func (s *watchableStore) syncWatchers() {
s.unsynced.delete(w) s.unsynced.delete(w)
} }
// bring all un-notified watchers to synced.
for w := range s.unsynced.watchers {
if !wb.contains(w) {
w.cur = curRev
s.synced.add(w)
s.unsynced.delete(w)
}
}
slowWatcherGauge.Set(float64(s.unsynced.size())) slowWatcherGauge.Set(float64(s.unsynced.size()))
} }

View File

@ -75,11 +75,6 @@ func (wb watcherBatch) add(w *watcher, ev mvccpb.Event) {
eb.add(ev) eb.add(ev)
} }
func (wb watcherBatch) contains(w *watcher) bool {
_, ok := wb[w]
return ok
}
// newWatcherBatch maps watchers to their matched events. It enables quick // newWatcherBatch maps watchers to their matched events. It enables quick
// events look up by watcher. // events look up by watcher.
func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch { func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch {
@ -219,7 +214,23 @@ func (wg *watcherGroup) delete(wa *watcher) bool {
return true return true
} }
func (wg *watcherGroup) scanMinRev(curRev int64, compactRev int64) int64 { // choose selects watchers from the watcher group to update
func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watcherGroup, int64) {
if len(wg.watchers) < maxWatchers {
return wg, wg.chooseAll(curRev, compactRev)
}
ret := newWatcherGroup()
for w := range wg.watchers {
if maxWatchers <= 0 {
break
}
maxWatchers--
ret.add(w)
}
return &ret, ret.chooseAll(curRev, compactRev)
}
func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
minRev := int64(math.MaxInt64) minRev := int64(math.MaxInt64)
for w := range wg.watchers { for w := range wg.watchers {
if w.cur > curRev { if w.cur > curRev {

View File

@ -35,19 +35,22 @@ var watchGetCmd = &cobra.Command{
} }
var ( var (
watchGetTotalStreams int watchGetTotalWatchers int
watchEvents int watchGetTotalStreams int
firstWatch sync.Once watchEvents int
firstWatch sync.Once
) )
func init() { func init() {
RootCmd.AddCommand(watchGetCmd) RootCmd.AddCommand(watchGetCmd)
watchGetCmd.Flags().IntVar(&watchGetTotalStreams, "watchers", 10000, "Total number of watchers") watchGetCmd.Flags().IntVar(&watchGetTotalWatchers, "watchers", 10000, "Total number of watchers")
watchGetCmd.Flags().IntVar(&watchGetTotalStreams, "streams", 1, "Total number of watcher streams")
watchGetCmd.Flags().IntVar(&watchEvents, "events", 8, "Number of events per watcher") watchGetCmd.Flags().IntVar(&watchEvents, "events", 8, "Number of events per watcher")
} }
func watchGetFunc(cmd *cobra.Command, args []string) { func watchGetFunc(cmd *cobra.Command, args []string) {
clients := mustCreateClients(totalClients, totalConns) clients := mustCreateClients(totalClients, totalConns)
getClient := mustCreateClients(1, 1)
// setup keys for watchers // setup keys for watchers
watchRev := int64(0) watchRev := int64(0)
@ -70,18 +73,18 @@ func watchGetFunc(cmd *cobra.Command, args []string) {
// results from trying to do serialized gets with concurrent watchers // results from trying to do serialized gets with concurrent watchers
results = make(chan result) results = make(chan result)
bar = pb.New(watchGetTotalStreams * watchEvents) bar = pb.New(watchGetTotalWatchers * watchEvents)
bar.Format("Bom !") bar.Format("Bom !")
bar.Start() bar.Start()
pdoneC := printReport(results) pdoneC := printReport(results)
wg.Add(len(streams)) wg.Add(watchGetTotalWatchers)
ctx, cancel := context.WithCancel(context.TODO()) ctx, cancel := context.WithCancel(context.TODO())
f := func() { f := func() {
doSerializedGet(ctx, clients[0], results) doSerializedGet(ctx, getClient[0], results)
} }
for i := range streams { for i := 0; i < watchGetTotalWatchers; i++ {
go doUnsyncWatch(streams[i], watchRev, f) go doUnsyncWatch(streams[i%len(streams)], watchRev, f)
} }
wg.Wait() wg.Wait()
cancel() cancel()