diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index ad17b2be7..f3379e5a8 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -218,6 +218,7 @@ func (s *watchableStore) syncWatchersLoop() { waitDuration := 100 * time.Millisecond delayTicker := time.NewTicker(waitDuration) defer delayTicker.Stop() + var evs []mvccpb.Event for { s.mu.RLock() @@ -227,7 +228,7 @@ func (s *watchableStore) syncWatchersLoop() { unsyncedWatchers := 0 if lastUnsyncedWatchers > 0 { - unsyncedWatchers = s.syncWatchers() + unsyncedWatchers, evs = s.syncWatchers(evs) } syncDuration := time.Since(st) @@ -336,12 +337,12 @@ func (s *watchableStore) moveVictims() (moved int) { // 2. iterate over the set to get the minimum revision and remove compacted watchers // 3. use minimum revision to get all key-value pairs and send those events to watchers // 4. remove synced watchers in set from unsynced group and move to synced group -func (s *watchableStore) syncWatchers() int { +func (s *watchableStore) syncWatchers(evs []mvccpb.Event) (int, []mvccpb.Event) { s.mu.Lock() defer s.mu.Unlock() if s.unsynced.size() == 0 { - return 0 + return 0, evs } s.store.revMu.RLock() @@ -354,23 +355,42 @@ func (s *watchableStore) syncWatchers() int { compactionRev := s.store.compactMainRev wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev) - minBytes, maxBytes := NewRevBytes(), NewRevBytes() - minBytes = RevToBytes(Revision{Main: minRev}, minBytes) - maxBytes = RevToBytes(Revision{Main: curRev + 1}, maxBytes) - // UnsafeRange returns keys and values. And in boltdb, keys are revisions. - // values are actual key-value pairs in backend. - tx := s.store.b.ReadTx() - tx.RLock() - revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0) - evs := kvsToEvents(s.store.lg, wg, revs, vs) - // Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy. - // We can only unlock after Unmarshal, which will do deep copy. - // Otherwise we will trigger SIGSEGV during boltdb re-mmap. - tx.RUnlock() + if len(evs) == 0 { + evs = s.events(minRev, curRev+1) + } else { + if evs[0].Kv.ModRevision > minRev { + evs = append(s.events(minRev, evs[0].Kv.ModRevision), evs...) + } + + if len(evs) == 0 { + evs = s.events(minRev, curRev+1) + } + if len(evs) != 0 && evs[len(evs)-1].Kv.ModRevision < curRev { + evs = append(evs, s.events(evs[len(evs)-1].Kv.ModRevision+1, curRev+1)...) + } + } + // TODO: Ensure thread safe access to victims + victimsMinRev := minRev + for _, vic := range s.victims { + for w := range vic { + if victimsMinRev > w.minRev { + victimsMinRev = w.minRev + } + } + } + victimsRevIndex := 0 + for victimsRevIndex < len(evs) && evs[victimsRevIndex].Kv.ModRevision < compactionRev { + victimsRevIndex++ + } + evs = evs[victimsRevIndex:] + minRevIndex := 0 + for minRevIndex < len(evs) && evs[minRevIndex].Kv.ModRevision < minRev { + minRevIndex++ + } victims := make(watcherBatch) - wb := newWatcherBatch(wg, evs) + wb := newWatcherBatch(wg, evs[minRevIndex:]) for w := range wg.watchers { if w.minRev < compactionRev { // Skip the watcher that failed to send compacted watch response due to w.ch is full. @@ -416,21 +436,34 @@ func (s *watchableStore) syncWatchers() int { } slowWatcherGauge.Set(float64(s.unsynced.size() + vsz)) - return s.unsynced.size() + return s.unsynced.size(), evs +} + +func (s *watchableStore) events(minRev, maxRev int64) []mvccpb.Event { + minBytes, maxBytes := NewRevBytes(), NewRevBytes() + minBytes = RevToBytes(Revision{Main: minRev}, minBytes) + maxBytes = RevToBytes(Revision{Main: maxRev}, maxBytes) + // UnsafeRange returns keys and values. And in boltdb, keys are revisions. + // values are actual key-value pairs in backend. + tx := s.store.b.ReadTx() + tx.RLock() + revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0) + evs := kvsToEvents(s.store.lg, revs, vs) + // Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy. + // We can only unlock after Unmarshal, which will do deep copy. + // Otherwise we will trigger SIGSEGV during boltdb re-mmap. + tx.RUnlock() + return evs } // kvsToEvents gets all events for the watchers from all key-value pairs -func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) { +func kvsToEvents(lg *zap.Logger, revs, vals [][]byte) (evs []mvccpb.Event) { for i, v := range vals { var kv mvccpb.KeyValue if err := kv.Unmarshal(v); err != nil { lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) } - if !wg.contains(string(kv.Key)) { - continue - } - ty := mvccpb.PUT if isTombstone(revs[i]) { ty = mvccpb.DELETE