mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
mvcc: simplify watchable_store addVictim code
This commit is contained in:
parent
69015027b6
commit
e7a09db019
@ -358,7 +358,7 @@ func (s *watchableStore) syncWatchers() int {
|
|||||||
tx.RUnlock()
|
tx.RUnlock()
|
||||||
evs := kvsToEvents(s.store.lg, wg, revs, vs)
|
evs := kvsToEvents(s.store.lg, wg, revs, vs)
|
||||||
|
|
||||||
var victims watcherBatch
|
victims := make(watcherBatch)
|
||||||
wb := newWatcherBatch(wg, evs)
|
wb := newWatcherBatch(wg, evs)
|
||||||
for w := range wg.watchers {
|
for w := range wg.watchers {
|
||||||
w.minRev = curRev + 1
|
w.minRev = curRev + 1
|
||||||
@ -378,9 +378,6 @@ func (s *watchableStore) syncWatchers() int {
|
|||||||
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {
|
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {
|
||||||
pendingEventsGauge.Add(float64(len(eb.evs)))
|
pendingEventsGauge.Add(float64(len(eb.evs)))
|
||||||
} else {
|
} else {
|
||||||
if victims == nil {
|
|
||||||
victims = make(watcherBatch)
|
|
||||||
}
|
|
||||||
w.victim = true
|
w.victim = true
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -432,7 +429,7 @@ func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []m
|
|||||||
// notify notifies the fact that given event at the given rev just happened to
|
// notify notifies the fact that given event at the given rev just happened to
|
||||||
// watchers that watch on the key of the event.
|
// watchers that watch on the key of the event.
|
||||||
func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
|
func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
|
||||||
var victim watcherBatch
|
victim := make(watcherBatch)
|
||||||
for w, eb := range newWatcherBatch(&s.synced, evs) {
|
for w, eb := range newWatcherBatch(&s.synced, evs) {
|
||||||
if eb.revs != 1 {
|
if eb.revs != 1 {
|
||||||
s.store.lg.Panic(
|
s.store.lg.Panic(
|
||||||
@ -445,9 +442,6 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
|
|||||||
} else {
|
} else {
|
||||||
// move slow watcher to victims
|
// move slow watcher to victims
|
||||||
w.minRev = rev + 1
|
w.minRev = rev + 1
|
||||||
if victim == nil {
|
|
||||||
victim = make(watcherBatch)
|
|
||||||
}
|
|
||||||
w.victim = true
|
w.victim = true
|
||||||
victim[w] = eb
|
victim[w] = eb
|
||||||
s.synced.delete(w)
|
s.synced.delete(w)
|
||||||
@ -458,7 +452,7 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *watchableStore) addVictim(victim watcherBatch) {
|
func (s *watchableStore) addVictim(victim watcherBatch) {
|
||||||
if victim == nil {
|
if len(victim) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.victims = append(s.victims, victim)
|
s.victims = append(s.victims, victim)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user