diff --git a/proxy/grpcproxy/watcher_group.go b/proxy/grpcproxy/watcher_group.go index c7b7d9f60..4bce9ca46 100644 --- a/proxy/grpcproxy/watcher_group.go +++ b/proxy/grpcproxy/watcher_group.go @@ -68,10 +68,14 @@ func (wg *watcherGroup) broadcast(wr clientv3.WatchResponse) { } // add adds the watcher into the group with given ID. -// The current revision of the watcherGroup is returned. +// The current revision of the watcherGroup is returned or -1 +// if the watcher is at a revision prior to the watcher group. func (wg *watcherGroup) add(rid receiverID, w watcher) int64 { wg.mu.Lock() defer wg.mu.Unlock() + if wg.rev > w.rev { + return -1 + } wg.receivers[rid] = w return wg.rev } diff --git a/proxy/grpcproxy/watcher_groups.go b/proxy/grpcproxy/watcher_groups.go index c99937248..2b92beee9 100644 --- a/proxy/grpcproxy/watcher_groups.go +++ b/proxy/grpcproxy/watcher_groups.go @@ -102,11 +102,7 @@ func (wgs *watchergroups) maybeJoinWatcherSingle(rid receiverID, ws watcherSingl group, ok := wgs.groups[ws.w.wr] if ok { - if ws.w.rev >= group.rev { - group.add(receiverID{streamID: ws.sws.id, watcherID: ws.w.id}, ws.w) - return true - } - return false + return group.add(receiverID{streamID: ws.sws.id, watcherID: ws.w.id}, ws.w) != -1 } if ws.canPromote() {