From 51b4d6b7a8d59e97e985f7f79ecdb94872642d38 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 2 Sep 2016 11:46:16 -0700 Subject: [PATCH] grpcproxy: support cancel watcher We do not wait for the cancellation from actual etcd server, but generate it at the proxy side. The rule is to return the latest rev that the watcher has seen. This should be good enough for most use cases if not all. --- proxy/grpcproxy/watch.go | 47 ++++++++++++++++++++----------- proxy/grpcproxy/watcher_group.go | 7 ++++- proxy/grpcproxy/watcher_groups.go | 9 ++++-- 3 files changed, 43 insertions(+), 20 deletions(-) diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index c212f44ff..96f856f2d 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -39,9 +39,10 @@ func NewWatchProxy(c *clientv3.Client) pb.WatchServer { wp := &watchProxy{ cw: c.Watcher, wgs: watchergroups{ - cw: c.Watcher, - groups: make(map[watchRange]*watcherGroup), - proxyCtx: c.Ctx(), + cw: c.Watcher, + groups: make(map[watchRange]*watcherGroup), + idToGroup: make(map[receiverID]*watcherGroup), + proxyCtx: c.Ctx(), }, ctx: c.Ctx(), } @@ -65,7 +66,6 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { id: wp.nextStreamID, gRPCStream: stream, - ctrlCh: make(chan *pb.WatchResponse, 10), watchCh: make(chan *pb.WatchResponse, 10), proxyCtx: wp.ctx, @@ -86,7 +86,6 @@ type serverWatchStream struct { gRPCStream pb.Watch_WatchServer - ctrlCh chan *pb.WatchResponse watchCh chan *pb.WatchResponse nextWatcherID int64 @@ -96,7 +95,6 @@ type serverWatchStream struct { func (sws *serverWatchStream) close() { close(sws.watchCh) - close(sws.ctrlCh) var wg sync.WaitGroup sws.mu.Lock() @@ -166,14 +164,6 @@ func (sws *serverWatchStream) sendLoop() { if err := sws.gRPCStream.Send(wresp); err != nil { return } - - case c, ok := <-sws.ctrlCh: - if !ok { - return - } - if err := sws.gRPCStream.Send(c); err != nil { - return - } case <-sws.proxyCtx.Done(): return } @@ -222,12 +212,37 @@ func (sws *serverWatchStream) removeWatcher(id int64) { sws.mu.Lock() defer sws.mu.Unlock() - if sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: id}) { + var ( + rev int64 + ok bool + ) + + defer func() { + if !ok { + return + } + resp := &pb.WatchResponse{ + Header: &pb.ResponseHeader{ + // todo: fill in ClusterId + // todo: fill in MemberId: + Revision: rev, + // todo: fill in RaftTerm: + }, + WatchId: id, + Canceled: true, + } + sws.watchCh <- resp + }() + + rev, ok = sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: id}) + if ok { return } - if ws, ok := sws.singles[id]; ok { + var ws *watcherSingle + if ws, ok = sws.singles[id]; ok { delete(sws.singles, id) ws.stop() + rev = ws.lastStoreRev } } diff --git a/proxy/grpcproxy/watcher_group.go b/proxy/grpcproxy/watcher_group.go index 8d8616a89..00f1b9d8b 100644 --- a/proxy/grpcproxy/watcher_group.go +++ b/proxy/grpcproxy/watcher_group.go @@ -70,7 +70,6 @@ func (wg *watcherGroup) broadcast(wr clientv3.WatchResponse) { func (wg *watcherGroup) add(rid receiverID, w watcher) { wg.mu.Lock() defer wg.mu.Unlock() - wg.receivers[rid] = w } @@ -92,3 +91,9 @@ func (wg *watcherGroup) stop() { wg.cancel() <-wg.donec } + +func (wg *watcherGroup) revision() int64 { + wg.mu.Lock() + defer wg.mu.Unlock() + return wg.rev +} diff --git a/proxy/grpcproxy/watcher_groups.go b/proxy/grpcproxy/watcher_groups.go index c0570e3e8..2e040aded 100644 --- a/proxy/grpcproxy/watcher_groups.go +++ b/proxy/grpcproxy/watcher_groups.go @@ -39,6 +39,7 @@ func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) { if wg, ok := groups[w.wr]; ok { wg.add(rid, w) + wgs.idToGroup[rid] = wg return } @@ -54,20 +55,22 @@ func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) { watchg.add(rid, w) go watchg.run() groups[w.wr] = watchg + wgs.idToGroup[rid] = watchg } -func (wgs *watchergroups) removeWatcher(rid receiverID) bool { +func (wgs *watchergroups) removeWatcher(rid receiverID) (int64, bool) { wgs.mu.Lock() defer wgs.mu.Unlock() if g, ok := wgs.idToGroup[rid]; ok { g.delete(rid) + delete(wgs.idToGroup, rid) if g.isEmpty() { g.stop() } - return true + return g.revision(), true } - return false + return -1, false } func (wgs *watchergroups) maybeJoinWatcherSingle(rid receiverID, ws watcherSingle) bool {