From 7c17665a1aa62f369ab218e419f131de652c7e16 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 18 Feb 2016 04:12:40 -0800 Subject: [PATCH] storage: limit total unique revisions in unsynced watcher event list --- storage/watchable_store.go | 86 ++++++++++++++++++++++++++++----- storage/watchable_store_test.go | 44 +++++++++++++++-- 2 files changed, 112 insertions(+), 18 deletions(-) diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 05f6fc6c3..d6e3d6daf 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -34,11 +34,64 @@ const ( chanBufLen = 1024 ) +var ( + // watchBatchMaxRevs is the maximum distinct revisions that + // may be sent to an unsynced watcher at a time. Declared as + // var instead of const for testing purposes. + watchBatchMaxRevs = 1000 +) + +type eventBatch struct { + // evs is a batch of revision-ordered events + evs []storagepb.Event + // revs is the minimum unique revisions observed for this batch + revs int + // moreRev is first revision with more events following this batch + moreRev int64 +} + type ( watcherSetByKey map[string]watcherSet watcherSet map[*watcher]struct{} + watcherBatch map[*watcher]*eventBatch ) +func (eb *eventBatch) add(ev storagepb.Event) { + if eb.revs > watchBatchMaxRevs { + // maxed out batch size + return + } + + if len(eb.evs) == 0 { + // base case + eb.revs = 1 + eb.evs = append(eb.evs, ev) + return + } + + // revision accounting + ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision + evRev := ev.Kv.ModRevision + if evRev > ebRev { + eb.revs++ + if eb.revs > watchBatchMaxRevs { + eb.moreRev = evRev + return + } + } + + eb.evs = append(eb.evs, ev) +} + +func (wb watcherBatch) add(w *watcher, ev storagepb.Event) { + eb := wb[w] + if eb == nil { + eb = &eventBatch{} + wb[w] = eb + } + eb.add(ev) +} + func (w watcherSet) add(wa *watcher) { if _, ok := w[wa]; ok { panic("add watcher twice!") @@ -310,17 +363,21 @@ func (s *watchableStore) syncWatchers() { evs := kvsToEvents(revs, vs, s.unsynced, prefixes) tx.Unlock() - for w, es := range newWatcherToEventMap(s.unsynced, evs) { + for w, eb := range newWatcherBatch(s.unsynced, evs) { select { // s.store.Rev also uses Lock, so just return directly - case w.ch <- WatchResponse{WatchID: w.id, Events: es, Revision: s.store.currentRev.main}: - pendingEventsGauge.Add(float64(len(es))) + case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.store.currentRev.main}: + pendingEventsGauge.Add(float64(len(eb.evs))) default: // TODO: handle the full unsynced watchers. // continue to process other watchers for now, the full ones // will be processed next time and hopefully it will not be full. continue } + if eb.moreRev != 0 { + w.cur = eb.moreRev + continue + } w.cur = curRev s.synced.add(w) s.unsynced.delete(w) @@ -393,16 +450,19 @@ func kvsToEvents(revs, vals [][]byte, wsk watcherSetByKey, pfxs map[string]struc // notify notifies the fact that given event at the given rev just happened to // watchers that watch on the key of the event. func (s *watchableStore) notify(rev int64, evs []storagepb.Event) { - we := newWatcherToEventMap(s.synced, evs) + we := newWatcherBatch(s.synced, evs) for _, wm := range s.synced { for w := range wm { - es, ok := we[w] + eb, ok := we[w] if !ok { continue } + if eb.revs != 1 { + panic("unexpected multiple revisions in notification") + } select { - case w.ch <- WatchResponse{WatchID: w.id, Events: es, Revision: s.Rev()}: - pendingEventsGauge.Add(float64(len(es))) + case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.Rev()}: + pendingEventsGauge.Add(float64(len(eb.evs))) default: // move slow watcher to unsynced w.cur = rev @@ -433,10 +493,10 @@ type watcher struct { ch chan<- WatchResponse } -// newWatcherToEventMap creates a map that has watcher as key and events as -// value. It enables quick events look up by watcher. -func newWatcherToEventMap(sm watcherSetByKey, evs []storagepb.Event) map[*watcher][]storagepb.Event { - watcherToEvents := make(map[*watcher][]storagepb.Event) +// newWatcherBatch maps watchers to their matched events. It enables quick +// events look up by watcher. +func newWatcherBatch(sm watcherSetByKey, evs []storagepb.Event) watcherBatch { + wb := make(watcherBatch) for _, ev := range evs { key := string(ev.Kv.Key) @@ -453,12 +513,12 @@ func newWatcherToEventMap(sm watcherSetByKey, evs []storagepb.Event) map[*watche if !w.prefix && i != len(ev.Kv.Key) { continue } - watcherToEvents[w] = append(watcherToEvents[w], ev) + wb.add(w, ev) } } } - return watcherToEvents + return wb } // matchPrefix returns true if key has any matching prefix diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go index f3988683e..51f040dd2 100644 --- a/storage/watchable_store_test.go +++ b/storage/watchable_store_test.go @@ -255,6 +255,40 @@ func TestWatchCompacted(t *testing.T) { } } +// TestWatchBatchUnsynced tests batching on unsynced watchers +func TestWatchBatchUnsynced(t *testing.T) { + b, tmpPath := backend.NewDefaultTmpBackend() + s := newWatchableStore(b, &lease.FakeLessor{}) + + oldMaxRevs := watchBatchMaxRevs + defer func() { + watchBatchMaxRevs = oldMaxRevs + s.store.Close() + os.Remove(tmpPath) + }() + batches := 3 + watchBatchMaxRevs = 4 + + v := []byte("foo") + for i := 0; i < watchBatchMaxRevs*batches; i++ { + s.Put(v, v, lease.NoLease) + } + + w := s.NewWatchStream() + w.Watch(v, false, 1) + for i := 0; i < batches; i++ { + if resp := <-w.Chan(); len(resp.Events) != watchBatchMaxRevs { + t.Fatalf("len(events) = %d, want %d", len(resp.Events), watchBatchMaxRevs) + } + } + + s.store.mu.Lock() + defer s.store.mu.Unlock() + if len(s.synced) != 1 { + t.Errorf("synced size = %d, want 1", len(s.synced)) + } +} + func TestNewMapwatcherToEventMap(t *testing.T) { k0, k1, k2 := []byte("foo0"), []byte("foo1"), []byte("foo2") v0, v1, v2 := []byte("bar0"), []byte("bar1"), []byte("bar2") @@ -341,16 +375,16 @@ func TestNewMapwatcherToEventMap(t *testing.T) { } for i, tt := range tests { - gwe := newWatcherToEventMap(tt.sync, tt.evs) + gwe := newWatcherBatch(tt.sync, tt.evs) if len(gwe) != len(tt.wwe) { t.Errorf("#%d: len(gwe) got = %d, want = %d", i, len(gwe), len(tt.wwe)) } // compare gwe and tt.wwe - for w, mevs := range gwe { - if len(mevs) != len(tt.wwe[w]) { - t.Errorf("#%d: len(mevs) got = %d, want = %d", i, len(mevs), len(tt.wwe[w])) + for w, eb := range gwe { + if len(eb.evs) != len(tt.wwe[w]) { + t.Errorf("#%d: len(eb.evs) got = %d, want = %d", i, len(eb.evs), len(tt.wwe[w])) } - if !reflect.DeepEqual(mevs, tt.wwe[w]) { + if !reflect.DeepEqual(eb.evs, tt.wwe[w]) { t.Errorf("#%d: reflect.DeepEqual events got = %v, want = true", i, false) } }