Merge 05db32124e948a9d886a31fdcd0bcbd629fa6bb0 into c86c93ca2951338115159dcdd20711603044e1f1

This commit is contained in:
Marek Siarkowicz 2024-09-26 22:00:14 +01:00 committed by GitHub
commit 1fb8ecc6f9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -218,6 +218,7 @@ func (s *watchableStore) syncWatchersLoop() {
waitDuration := 100 * time.Millisecond waitDuration := 100 * time.Millisecond
delayTicker := time.NewTicker(waitDuration) delayTicker := time.NewTicker(waitDuration)
defer delayTicker.Stop() defer delayTicker.Stop()
var evs []mvccpb.Event
for { for {
s.mu.RLock() s.mu.RLock()
@ -227,7 +228,7 @@ func (s *watchableStore) syncWatchersLoop() {
unsyncedWatchers := 0 unsyncedWatchers := 0
if lastUnsyncedWatchers > 0 { if lastUnsyncedWatchers > 0 {
unsyncedWatchers = s.syncWatchers() unsyncedWatchers, evs = s.syncWatchers(evs)
} }
syncDuration := time.Since(st) syncDuration := time.Since(st)
@ -336,12 +337,12 @@ func (s *watchableStore) moveVictims() (moved int) {
// 2. iterate over the set to get the minimum revision and remove compacted watchers // 2. iterate over the set to get the minimum revision and remove compacted watchers
// 3. use minimum revision to get all key-value pairs and send those events to watchers // 3. use minimum revision to get all key-value pairs and send those events to watchers
// 4. remove synced watchers in set from unsynced group and move to synced group // 4. remove synced watchers in set from unsynced group and move to synced group
func (s *watchableStore) syncWatchers() int { func (s *watchableStore) syncWatchers(evs []mvccpb.Event) (int, []mvccpb.Event) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
if s.unsynced.size() == 0 { if s.unsynced.size() == 0 {
return 0 return 0, evs
} }
s.store.revMu.RLock() s.store.revMu.RLock()
@ -354,23 +355,42 @@ func (s *watchableStore) syncWatchers() int {
compactionRev := s.store.compactMainRev compactionRev := s.store.compactMainRev
wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev) wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
minBytes, maxBytes := NewRevBytes(), NewRevBytes()
minBytes = RevToBytes(Revision{Main: minRev}, minBytes)
maxBytes = RevToBytes(Revision{Main: curRev + 1}, maxBytes)
// UnsafeRange returns keys and values. And in boltdb, keys are revisions. if len(evs) == 0 {
// values are actual key-value pairs in backend. evs = s.events(minRev, curRev+1)
tx := s.store.b.ReadTx() } else {
tx.RLock() if evs[0].Kv.ModRevision > minRev {
revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0) evs = append(s.events(minRev, evs[0].Kv.ModRevision), evs...)
evs := kvsToEvents(s.store.lg, wg, revs, vs) }
// Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
// We can only unlock after Unmarshal, which will do deep copy. if len(evs) == 0 {
// Otherwise we will trigger SIGSEGV during boltdb re-mmap. evs = s.events(minRev, curRev+1)
tx.RUnlock() }
if len(evs) != 0 && evs[len(evs)-1].Kv.ModRevision < curRev {
evs = append(evs, s.events(evs[len(evs)-1].Kv.ModRevision+1, curRev+1)...)
}
}
// TODO: Ensure thread safe access to victims
victimsMinRev := minRev
for _, vic := range s.victims {
for w := range vic {
if victimsMinRev > w.minRev {
victimsMinRev = w.minRev
}
}
}
victimsRevIndex := 0
for victimsRevIndex < len(evs) && evs[victimsRevIndex].Kv.ModRevision < compactionRev {
victimsRevIndex++
}
evs = evs[victimsRevIndex:]
minRevIndex := 0
for minRevIndex < len(evs) && evs[minRevIndex].Kv.ModRevision < minRev {
minRevIndex++
}
victims := make(watcherBatch) victims := make(watcherBatch)
wb := newWatcherBatch(wg, evs) wb := newWatcherBatch(wg, evs[minRevIndex:])
for w := range wg.watchers { for w := range wg.watchers {
if w.minRev < compactionRev { if w.minRev < compactionRev {
// Skip the watcher that failed to send compacted watch response due to w.ch is full. // Skip the watcher that failed to send compacted watch response due to w.ch is full.
@ -416,21 +436,34 @@ func (s *watchableStore) syncWatchers() int {
} }
slowWatcherGauge.Set(float64(s.unsynced.size() + vsz)) slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))
return s.unsynced.size() return s.unsynced.size(), evs
}
func (s *watchableStore) events(minRev, maxRev int64) []mvccpb.Event {
minBytes, maxBytes := NewRevBytes(), NewRevBytes()
minBytes = RevToBytes(Revision{Main: minRev}, minBytes)
maxBytes = RevToBytes(Revision{Main: maxRev}, maxBytes)
// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
// values are actual key-value pairs in backend.
tx := s.store.b.ReadTx()
tx.RLock()
revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0)
evs := kvsToEvents(s.store.lg, revs, vs)
// Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
// We can only unlock after Unmarshal, which will do deep copy.
// Otherwise we will trigger SIGSEGV during boltdb re-mmap.
tx.RUnlock()
return evs
} }
// kvsToEvents gets all events for the watchers from all key-value pairs // kvsToEvents gets all events for the watchers from all key-value pairs
func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) { func kvsToEvents(lg *zap.Logger, revs, vals [][]byte) (evs []mvccpb.Event) {
for i, v := range vals { for i, v := range vals {
var kv mvccpb.KeyValue var kv mvccpb.KeyValue
if err := kv.Unmarshal(v); err != nil { if err := kv.Unmarshal(v); err != nil {
lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
} }
if !wg.contains(string(kv.Key)) {
continue
}
ty := mvccpb.PUT ty := mvccpb.PUT
if isTombstone(revs[i]) { if isTombstone(revs[i]) {
ty = mvccpb.DELETE ty = mvccpb.DELETE