diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index 2a0488bb1..f0b5ecdc7 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -103,7 +103,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { os.Exit(1) } - kvp := grpcproxy.NewKvProxy(client) + kvp, _ := grpcproxy.NewKvProxy(client) watchp, _ := grpcproxy.NewWatchProxy(client) clusterp := grpcproxy.NewClusterProxy(client) leasep := grpcproxy.NewLeaseProxy(client) diff --git a/integration/cluster_proxy.go b/integration/cluster_proxy.go index 4392489c6..75319218e 100644 --- a/integration/cluster_proxy.go +++ b/integration/cluster_proxy.go @@ -30,8 +30,9 @@ var ( ) type grpcClientProxy struct { - grpc grpcAPI - wdonec <-chan struct{} + grpc grpcAPI + wdonec <-chan struct{} + kvdonec <-chan struct{} } func toGRPC(c *clientv3.Client) grpcAPI { @@ -43,26 +44,30 @@ func toGRPC(c *clientv3.Client) grpcAPI { } wp, wpch := grpcproxy.NewWatchProxy(c) + kvp, kvpch := grpcproxy.NewKvProxy(c) grpc := grpcAPI{ pb.NewClusterClient(c.ActiveConnection()), - grpcproxy.KvServerToKvClient(grpcproxy.NewKvProxy(c)), + grpcproxy.KvServerToKvClient(kvp), pb.NewLeaseClient(c.ActiveConnection()), grpcproxy.WatchServerToWatchClient(wp), pb.NewMaintenanceClient(c.ActiveConnection()), pb.NewAuthClient(c.ActiveConnection()), } - proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch} + proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch, kvdonec: kvpch} return grpc } -type watchCloser struct { +type proxyCloser struct { clientv3.Watcher - wdonec <-chan struct{} + wdonec <-chan struct{} + kvdonec <-chan struct{} } -func (wc *watchCloser) Close() error { - err := wc.Watcher.Close() - <-wc.wdonec +func (pc *proxyCloser) Close() error { + // client ctx is canceled before calling close, so kv will close out + <-pc.kvdonec + err := pc.Watcher.Close() + <-pc.wdonec return err } @@ -74,9 +79,10 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) { rpc := toGRPC(c) c.KV = clientv3.NewKVFromKVClient(rpc.KV) pmu.Lock() - c.Watcher = &watchCloser{ + c.Watcher = &proxyCloser{ Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch), wdonec: proxies[c].wdonec, + kvdonec: proxies[c].kvdonec, } pmu.Unlock() return c, nil diff --git a/proxy/grpcproxy/cache/store.go b/proxy/grpcproxy/cache/store.go index b348d1803..155bbf900 100644 --- a/proxy/grpcproxy/cache/store.go +++ b/proxy/grpcproxy/cache/store.go @@ -39,6 +39,7 @@ type Cache interface { Get(req *pb.RangeRequest) (*pb.RangeResponse, error) Compact(revision int64) Invalidate(key []byte, endkey []byte) + Close() } // keyFunc returns the key of an request, which is used to look up in the cache for it's caching response. @@ -58,6 +59,8 @@ func NewCache(maxCacheEntries int) Cache { } } +func (c *cache) Close() { c.lru.Stop() } + // cache implements Cache type cache struct { mu sync.RWMutex diff --git a/proxy/grpcproxy/kv.go b/proxy/grpcproxy/kv.go index 1d6cf452a..368851357 100644 --- a/proxy/grpcproxy/kv.go +++ b/proxy/grpcproxy/kv.go @@ -27,11 +27,18 @@ type kvProxy struct { cache cache.Cache } -func NewKvProxy(c *clientv3.Client) pb.KVServer { - return &kvProxy{ +func NewKvProxy(c *clientv3.Client) (pb.KVServer, <-chan struct{}) { + kv := &kvProxy{ kv: c.KV, cache: cache.NewCache(cache.DefaultMaxEntries), } + donec := make(chan struct{}) + go func() { + defer close(donec) + <-c.Ctx().Done() + kv.cache.Close() + }() + return kv, donec } func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { diff --git a/proxy/grpcproxy/kv_test.go b/proxy/grpcproxy/kv_test.go index 4475c43b9..b0fecc37a 100644 --- a/proxy/grpcproxy/kv_test.go +++ b/proxy/grpcproxy/kv_test.go @@ -76,7 +76,7 @@ func newKVProxyServer(endpoints []string, t *testing.T) *kvproxyTestServer { t.Fatal(err) } - kvp := NewKvProxy(client) + kvp, _ := NewKvProxy(client) kvts := &kvproxyTestServer{ kp: kvp,