From 9b5eb1ae5a0a3e9ec23a94b11c0020b2ce5db488 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 30 Dec 2016 11:24:23 -0800 Subject: [PATCH] grpcproxy, etcdmain, integration: return done channel with WatchServer Makes it possible to synchronously close the watch server. Fixes #7078 --- etcdmain/grpc_proxy.go | 2 +- integration/cluster_proxy.go | 37 +++++++++++++++++++++++++++++------- proxy/grpcproxy/watch.go | 6 ++++-- 3 files changed, 35 insertions(+), 10 deletions(-) diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index c85fd5d17..2a0488bb1 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -104,7 +104,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { } kvp := grpcproxy.NewKvProxy(client) - watchp := grpcproxy.NewWatchProxy(client) + watchp, _ := grpcproxy.NewWatchProxy(client) clusterp := grpcproxy.NewClusterProxy(client) leasep := grpcproxy.NewLeaseProxy(client) mainp := grpcproxy.NewMaintenanceProxy(client) diff --git a/integration/cluster_proxy.go b/integration/cluster_proxy.go index c5c76774f..ea4bbec03 100644 --- a/integration/cluster_proxy.go +++ b/integration/cluster_proxy.go @@ -26,25 +26,43 @@ import ( var ( pmu sync.Mutex - proxies map[*clientv3.Client]grpcAPI = make(map[*clientv3.Client]grpcAPI) + proxies map[*clientv3.Client]grpcClientProxy = make(map[*clientv3.Client]grpcClientProxy) ) +type grpcClientProxy struct { + grpc grpcAPI + wdonec <-chan struct{} +} + func toGRPC(c *clientv3.Client) grpcAPI { pmu.Lock() defer pmu.Unlock() if v, ok := proxies[c]; ok { - return v + return v.grpc } - api := grpcAPI{ + + wp, wpch := grpcproxy.NewWatchProxy(c) + grpc := grpcAPI{ pb.NewClusterClient(c.ActiveConnection()), grpcproxy.KvServerToKvClient(grpcproxy.NewKvProxy(c)), pb.NewLeaseClient(c.ActiveConnection()), - grpcproxy.WatchServerToWatchClient(grpcproxy.NewWatchProxy(c)), + grpcproxy.WatchServerToWatchClient(wp), pb.NewMaintenanceClient(c.ActiveConnection()), } - proxies[c] = api - return api + proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch} + return grpc +} + +type watchCloser struct { + clientv3.Watcher + wdonec <-chan struct{} +} + +func (wc *watchCloser) Close() error { + err := wc.Watcher.Close() + <-wc.wdonec + return err } func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) { @@ -54,6 +72,11 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) { } rpc := toGRPC(c) c.KV = clientv3.NewKVFromKVClient(rpc.KV) - c.Watcher = clientv3.NewWatchFromWatchClient(rpc.Watch) + pmu.Lock() + c.Watcher = &watchCloser{ + Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch), + wdonec: proxies[c].wdonec, + } + pmu.Unlock() return c, nil } diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index 88c63af7a..2c070b31c 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -49,7 +49,7 @@ const ( retryPerSecond = 10 ) -func NewWatchProxy(c *clientv3.Client) pb.WatchServer { +func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) { wp := &watchProxy{ cw: c.Watcher, ctx: clientv3.WithRequireLeader(c.Ctx()), @@ -57,7 +57,9 @@ func NewWatchProxy(c *clientv3.Client) pb.WatchServer { leaderc: make(chan struct{}), } wp.ranges = newWatchRanges(wp) + ch := make(chan struct{}) go func() { + defer close(ch) // a new streams without opening any watchers won't catch // a lost leader event, so have a special watch to monitor it rev := int64((uint64(1) << 63) - 2) @@ -77,7 +79,7 @@ func NewWatchProxy(c *clientv3.Client) pb.WatchServer { wp.wg.Wait() wp.ranges.stop() }() - return wp + return wp, ch } func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {