diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index c212f44ff..96f856f2d 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -39,9 +39,10 @@ func NewWatchProxy(c *clientv3.Client) pb.WatchServer { wp := &watchProxy{ cw: c.Watcher, wgs: watchergroups{ - cw: c.Watcher, - groups: make(map[watchRange]*watcherGroup), - proxyCtx: c.Ctx(), + cw: c.Watcher, + groups: make(map[watchRange]*watcherGroup), + idToGroup: make(map[receiverID]*watcherGroup), + proxyCtx: c.Ctx(), }, ctx: c.Ctx(), } @@ -65,7 +66,6 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { id: wp.nextStreamID, gRPCStream: stream, - ctrlCh: make(chan *pb.WatchResponse, 10), watchCh: make(chan *pb.WatchResponse, 10), proxyCtx: wp.ctx, @@ -86,7 +86,6 @@ type serverWatchStream struct { gRPCStream pb.Watch_WatchServer - ctrlCh chan *pb.WatchResponse watchCh chan *pb.WatchResponse nextWatcherID int64 @@ -96,7 +95,6 @@ type serverWatchStream struct { func (sws *serverWatchStream) close() { close(sws.watchCh) - close(sws.ctrlCh) var wg sync.WaitGroup sws.mu.Lock() @@ -166,14 +164,6 @@ func (sws *serverWatchStream) sendLoop() { if err := sws.gRPCStream.Send(wresp); err != nil { return } - - case c, ok := <-sws.ctrlCh: - if !ok { - return - } - if err := sws.gRPCStream.Send(c); err != nil { - return - } case <-sws.proxyCtx.Done(): return } @@ -222,12 +212,37 @@ func (sws *serverWatchStream) removeWatcher(id int64) { sws.mu.Lock() defer sws.mu.Unlock() - if sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: id}) { + var ( + rev int64 + ok bool + ) + + defer func() { + if !ok { + return + } + resp := &pb.WatchResponse{ + Header: &pb.ResponseHeader{ + // todo: fill in ClusterId + // todo: fill in MemberId: + Revision: rev, + // todo: fill in RaftTerm: + }, + WatchId: id, + Canceled: true, + } + sws.watchCh <- resp + }() + + rev, ok = sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: id}) + if ok { return } - if ws, ok := sws.singles[id]; ok { + var ws *watcherSingle + if ws, ok = sws.singles[id]; ok { delete(sws.singles, id) ws.stop() + rev = ws.lastStoreRev } } diff --git a/proxy/grpcproxy/watcher_group.go b/proxy/grpcproxy/watcher_group.go index 8d8616a89..00f1b9d8b 100644 --- a/proxy/grpcproxy/watcher_group.go +++ b/proxy/grpcproxy/watcher_group.go @@ -70,7 +70,6 @@ func (wg *watcherGroup) broadcast(wr clientv3.WatchResponse) { func (wg *watcherGroup) add(rid receiverID, w watcher) { wg.mu.Lock() defer wg.mu.Unlock() - wg.receivers[rid] = w } @@ -92,3 +91,9 @@ func (wg *watcherGroup) stop() { wg.cancel() <-wg.donec } + +func (wg *watcherGroup) revision() int64 { + wg.mu.Lock() + defer wg.mu.Unlock() + return wg.rev +} diff --git a/proxy/grpcproxy/watcher_groups.go b/proxy/grpcproxy/watcher_groups.go index c0570e3e8..2e040aded 100644 --- a/proxy/grpcproxy/watcher_groups.go +++ b/proxy/grpcproxy/watcher_groups.go @@ -39,6 +39,7 @@ func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) { if wg, ok := groups[w.wr]; ok { wg.add(rid, w) + wgs.idToGroup[rid] = wg return } @@ -54,20 +55,22 @@ func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) { watchg.add(rid, w) go watchg.run() groups[w.wr] = watchg + wgs.idToGroup[rid] = watchg } -func (wgs *watchergroups) removeWatcher(rid receiverID) bool { +func (wgs *watchergroups) removeWatcher(rid receiverID) (int64, bool) { wgs.mu.Lock() defer wgs.mu.Unlock() if g, ok := wgs.idToGroup[rid]; ok { g.delete(rid) + delete(wgs.idToGroup, rid) if g.isEmpty() { g.stop() } - return true + return g.revision(), true } - return false + return -1, false } func (wgs *watchergroups) maybeJoinWatcherSingle(rid receiverID, ws watcherSingle) bool {