From 2cbf7cf6d1e4c283056680824c6f26b64d5df595 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 17 Feb 2016 02:49:04 -0800 Subject: [PATCH] storage: do not send outdated events to unsynced watchers --- storage/watchable_store.go | 145 +++++++++++++++++++------------------ 1 file changed, 73 insertions(+), 72 deletions(-) diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 28729e453..d0a54a7d0 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -155,6 +155,7 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) { evs[i] = storagepb.Event{ Type: storagepb.DELETE, Kv: &change} + evs[i].Kv.ModRevision = rev } s.notify(rev, evs) return n, rev @@ -177,6 +178,7 @@ func (s *watchableStore) TxnEnd(txnID int64) error { return nil } + rev := s.store.Rev() evs := make([]storagepb.Event, len(changes)) for i, change := range changes { switch change.Value { @@ -184,6 +186,7 @@ func (s *watchableStore) TxnEnd(txnID int64) error { evs[i] = storagepb.Event{ Type: storagepb.DELETE, Kv: &changes[i]} + evs[i].Kv.ModRevision = rev default: evs[i] = storagepb.Event{ Type: storagepb.PUT, @@ -191,7 +194,7 @@ func (s *watchableStore) TxnEnd(txnID int64) error { } } - s.notify(s.store.Rev(), evs) + s.notify(rev, evs) s.mu.Unlock() return nil @@ -284,40 +287,8 @@ func (s *watchableStore) syncWatchers() { // in order to find key-value pairs from unsynced watchers, we need to // find min revision index, and these revisions can be used to // query the backend store of key-value pairs - minRev := int64(math.MaxInt64) - + prefixes, minRev := s.scanUnsync() curRev := s.store.currentRev.main - compactionRev := s.store.compactMainRev - - prefixes := make(map[string]struct{}) - for _, set := range s.unsynced { - for w := range set { - k := string(w.key) - - if w.cur > curRev { - panic("watcher current revision should not exceed current revision") - } - - if w.cur < compactionRev { - select { - case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactionRev}: - s.unsynced.delete(w) - default: - // retry next time - } - continue - } - - if minRev >= w.cur { - minRev = w.cur - } - - if w.prefix { - prefixes[k] = struct{}{} - } - } - } - minBytes, maxBytes := newRevBytes(), newRevBytes() revToBytes(revision{main: minRev}, minBytes) revToBytes(revision{main: curRev + 1}, maxBytes) @@ -326,33 +297,8 @@ func (s *watchableStore) syncWatchers() { // values are actual key-value pairs in backend. tx := s.store.b.BatchTx() tx.Lock() - ks, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) - - evs := []storagepb.Event{} - - // get the list of all events from all key-value pairs - for i, v := range vs { - var kv storagepb.KeyValue - if err := kv.Unmarshal(v); err != nil { - log.Panicf("storage: cannot unmarshal event: %v", err) - } - - k := string(kv.Key) - if _, ok := s.unsynced.getSetByKey(k); !ok && !matchPrefix(k, prefixes) { - continue - } - - var ev storagepb.Event - switch { - case isTombstone(ks[i]): - ev.Type = storagepb.DELETE - default: - ev.Type = storagepb.PUT - } - ev.Kv = &kv - - evs = append(evs, ev) - } + revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) + evs := kvsToEvents(revs, vs, s.unsynced, prefixes) tx.Unlock() for w, es := range newWatcherToEventMap(s.unsynced, evs) { @@ -374,6 +320,67 @@ func (s *watchableStore) syncWatchers() { slowWatcherGauge.Set(float64(len(s.unsynced))) } +func (s *watchableStore) scanUnsync() (prefixes map[string]struct{}, minRev int64) { + curRev := s.store.currentRev.main + compactionRev := s.store.compactMainRev + + prefixes = make(map[string]struct{}) + minRev = int64(math.MaxInt64) + for _, set := range s.unsynced { + for w := range set { + k := string(w.key) + + if w.cur > curRev { + panic("watcher current revision should not exceed current revision") + } + + if w.cur < compactionRev { + select { + case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactionRev}: + s.unsynced.delete(w) + default: + // retry next time + } + continue + } + + if minRev > w.cur { + minRev = w.cur + } + + if w.prefix { + prefixes[k] = struct{}{} + } + } + } + + return prefixes, minRev +} + +// kvsToEvents gets all events for the watchers from all key-value pairs +func kvsToEvents(revs, vals [][]byte, wsk watcherSetByKey, pfxs map[string]struct{}) (evs []storagepb.Event) { + for i, v := range vals { + var kv storagepb.KeyValue + if err := kv.Unmarshal(v); err != nil { + log.Panicf("storage: cannot unmarshal event: %v", err) + } + + k := string(kv.Key) + if _, ok := wsk.getSetByKey(k); !ok && !matchPrefix(k, pfxs) { + continue + } + + ty := storagepb.PUT + if isTombstone(revs[i]) { + ty = storagepb.DELETE + // patch in mod revision so watchers won't skip + kv.ModRevision = bytesToRev(revs[i]).main + } + evs = append(evs, storagepb.Event{Kv: &kv, Type: ty}) + } + return evs +} + // notify notifies the fact that given event at the given rev just happened to // watchers that watch on the key of the event. func (s *watchableStore) notify(rev int64, evs []storagepb.Event) { @@ -426,23 +433,17 @@ func newWatcherToEventMap(sm watcherSetByKey, evs []storagepb.Event) map[*watche // check all prefixes of the key to notify all corresponded watchers for i := 0; i <= len(key); i++ { - k := string(key[:i]) + for w := range sm[key[:i]] { + // don't double notify + if ev.Kv.ModRevision < w.cur { + continue + } - wm, ok := sm[k] - if !ok { - continue - } - - for w := range wm { // the watcher needs to be notified when either it watches prefix or // the key is exactly matched. if !w.prefix && i != len(ev.Kv.Key) { continue } - - if _, ok := watcherToEvents[w]; !ok { - watcherToEvents[w] = []storagepb.Event{} - } watcherToEvents[w] = append(watcherToEvents[w], ev) } }