mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
[Draft] Reuse events used for syncing watchers
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
b81936dade
commit
05db32124e
@ -216,6 +216,7 @@ func (s *watchableStore) syncWatchersLoop() {
|
||||
waitDuration := 100 * time.Millisecond
|
||||
delayTicker := time.NewTicker(waitDuration)
|
||||
defer delayTicker.Stop()
|
||||
var evs []mvccpb.Event
|
||||
|
||||
for {
|
||||
s.mu.RLock()
|
||||
@ -225,7 +226,7 @@ func (s *watchableStore) syncWatchersLoop() {
|
||||
|
||||
unsyncedWatchers := 0
|
||||
if lastUnsyncedWatchers > 0 {
|
||||
unsyncedWatchers = s.syncWatchers()
|
||||
unsyncedWatchers, evs = s.syncWatchers(evs)
|
||||
}
|
||||
syncDuration := time.Since(st)
|
||||
|
||||
@ -334,12 +335,12 @@ func (s *watchableStore) moveVictims() (moved int) {
|
||||
// 2. iterate over the set to get the minimum revision and remove compacted watchers
|
||||
// 3. use minimum revision to get all key-value pairs and send those events to watchers
|
||||
// 4. remove synced watchers in set from unsynced group and move to synced group
|
||||
func (s *watchableStore) syncWatchers() int {
|
||||
func (s *watchableStore) syncWatchers(evs []mvccpb.Event) (int, []mvccpb.Event) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.unsynced.size() == 0 {
|
||||
return 0
|
||||
return 0, evs
|
||||
}
|
||||
|
||||
s.store.revMu.RLock()
|
||||
@ -352,23 +353,42 @@ func (s *watchableStore) syncWatchers() int {
|
||||
compactionRev := s.store.compactMainRev
|
||||
|
||||
wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
|
||||
minBytes, maxBytes := NewRevBytes(), NewRevBytes()
|
||||
minBytes = RevToBytes(Revision{Main: minRev}, minBytes)
|
||||
maxBytes = RevToBytes(Revision{Main: curRev + 1}, maxBytes)
|
||||
|
||||
// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
|
||||
// values are actual key-value pairs in backend.
|
||||
tx := s.store.b.ReadTx()
|
||||
tx.RLock()
|
||||
revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0)
|
||||
evs := kvsToEvents(s.store.lg, wg, revs, vs)
|
||||
// Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
|
||||
// We can only unlock after Unmarshal, which will do deep copy.
|
||||
// Otherwise we will trigger SIGSEGV during boltdb re-mmap.
|
||||
tx.RUnlock()
|
||||
if len(evs) == 0 {
|
||||
evs = s.events(minRev, curRev+1)
|
||||
} else {
|
||||
if evs[0].Kv.ModRevision > minRev {
|
||||
evs = append(s.events(minRev, evs[0].Kv.ModRevision), evs...)
|
||||
}
|
||||
|
||||
if len(evs) == 0 {
|
||||
evs = s.events(minRev, curRev+1)
|
||||
}
|
||||
if len(evs) != 0 && evs[len(evs)-1].Kv.ModRevision < curRev {
|
||||
evs = append(evs, s.events(evs[len(evs)-1].Kv.ModRevision+1, curRev+1)...)
|
||||
}
|
||||
}
|
||||
// TODO: Ensure thread safe access to victims
|
||||
victimsMinRev := minRev
|
||||
for _, vic := range s.victims {
|
||||
for w := range vic {
|
||||
if victimsMinRev > w.minRev {
|
||||
victimsMinRev = w.minRev
|
||||
}
|
||||
}
|
||||
}
|
||||
victimsRevIndex := 0
|
||||
for victimsRevIndex < len(evs) && evs[victimsRevIndex].Kv.ModRevision < compactionRev {
|
||||
victimsRevIndex++
|
||||
}
|
||||
evs = evs[victimsRevIndex:]
|
||||
minRevIndex := 0
|
||||
for minRevIndex < len(evs) && evs[minRevIndex].Kv.ModRevision < minRev {
|
||||
minRevIndex++
|
||||
}
|
||||
|
||||
victims := make(watcherBatch)
|
||||
wb := newWatcherBatch(wg, evs)
|
||||
wb := newWatcherBatch(wg, evs[minRevIndex:])
|
||||
for w := range wg.watchers {
|
||||
w.minRev = curRev + 1
|
||||
|
||||
@ -409,21 +429,34 @@ func (s *watchableStore) syncWatchers() int {
|
||||
}
|
||||
slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))
|
||||
|
||||
return s.unsynced.size()
|
||||
return s.unsynced.size(), evs
|
||||
}
|
||||
|
||||
func (s *watchableStore) events(minRev, maxRev int64) []mvccpb.Event {
|
||||
minBytes, maxBytes := NewRevBytes(), NewRevBytes()
|
||||
minBytes = RevToBytes(Revision{Main: minRev}, minBytes)
|
||||
maxBytes = RevToBytes(Revision{Main: maxRev}, maxBytes)
|
||||
// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
|
||||
// values are actual key-value pairs in backend.
|
||||
tx := s.store.b.ReadTx()
|
||||
tx.RLock()
|
||||
revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0)
|
||||
evs := kvsToEvents(s.store.lg, revs, vs)
|
||||
// Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
|
||||
// We can only unlock after Unmarshal, which will do deep copy.
|
||||
// Otherwise we will trigger SIGSEGV during boltdb re-mmap.
|
||||
tx.RUnlock()
|
||||
return evs
|
||||
}
|
||||
|
||||
// kvsToEvents gets all events for the watchers from all key-value pairs
|
||||
func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
|
||||
func kvsToEvents(lg *zap.Logger, revs, vals [][]byte) (evs []mvccpb.Event) {
|
||||
for i, v := range vals {
|
||||
var kv mvccpb.KeyValue
|
||||
if err := kv.Unmarshal(v); err != nil {
|
||||
lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
|
||||
}
|
||||
|
||||
if !wg.contains(string(kv.Key)) {
|
||||
continue
|
||||
}
|
||||
|
||||
ty := mvccpb.PUT
|
||||
if isTombstone(revs[i]) {
|
||||
ty = mvccpb.DELETE
|
||||
|
Loading…
x
Reference in New Issue
Block a user