From 65b59f44234861b92ad11f49a3b26dd98ae48cac Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Mon, 13 Feb 2017 13:38:46 -0800 Subject: [PATCH] grpcproxy: incorporate lease proxy into existing proxy framework --- clientv3/lease.go | 9 ++++++--- etcdmain/grpc_proxy.go | 2 +- integration/cluster_proxy.go | 15 ++++++++++----- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/clientv3/lease.go b/clientv3/lease.go index 2ef5df389..85289fe86 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -144,16 +144,19 @@ type keepAlive struct { } func NewLease(c *Client) Lease { + return NewLeaseFromLeaseClient(RetryLeaseClient(c), c.cfg.DialTimeout+time.Second) +} + +func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Duration) Lease { l := &lessor{ donec: make(chan struct{}), keepAlives: make(map[LeaseID]*keepAlive), - remote: RetryLeaseClient(c), - firstKeepAliveTimeout: c.cfg.DialTimeout + time.Second, + remote: remote, + firstKeepAliveTimeout: keepAliveTimeout, } if l.firstKeepAliveTimeout == time.Second { l.firstKeepAliveTimeout = defaultTTL } - l.stopCtx, l.stopCancel = context.WithCancel(context.Background()) return l } diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index f0b5ecdc7..b92825693 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -106,7 +106,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { kvp, _ := grpcproxy.NewKvProxy(client) watchp, _ := grpcproxy.NewWatchProxy(client) clusterp := grpcproxy.NewClusterProxy(client) - leasep := grpcproxy.NewLeaseProxy(client) + leasep, _ := grpcproxy.NewLeaseProxy(client) mainp := grpcproxy.NewMaintenanceProxy(client) authp := grpcproxy.NewAuthProxy(client) diff --git a/integration/cluster_proxy.go b/integration/cluster_proxy.go index 75319218e..4371f0b2a 100644 --- a/integration/cluster_proxy.go +++ b/integration/cluster_proxy.go @@ -33,6 +33,7 @@ type grpcClientProxy struct { grpc grpcAPI wdonec <-chan struct{} kvdonec <-chan struct{} + lpdonec <-chan struct{} } func toGRPC(c *clientv3.Client) grpcAPI { @@ -42,18 +43,18 @@ func toGRPC(c *clientv3.Client) grpcAPI { if v, ok := proxies[c]; ok { return v.grpc } - - wp, wpch := grpcproxy.NewWatchProxy(c) kvp, kvpch := grpcproxy.NewKvProxy(c) + wp, wpch := grpcproxy.NewWatchProxy(c) + lp, lpch := grpcproxy.NewLeaseProxy(c) grpc := grpcAPI{ pb.NewClusterClient(c.ActiveConnection()), grpcproxy.KvServerToKvClient(kvp), - pb.NewLeaseClient(c.ActiveConnection()), + grpcproxy.LeaseServerToLeaseClient(lp), grpcproxy.WatchServerToWatchClient(wp), pb.NewMaintenanceClient(c.ActiveConnection()), pb.NewAuthClient(c.ActiveConnection()), } - proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch, kvdonec: kvpch} + proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch, kvdonec: kvpch, lpdonec: lpch} return grpc } @@ -61,13 +62,15 @@ type proxyCloser struct { clientv3.Watcher wdonec <-chan struct{} kvdonec <-chan struct{} + lpdonec <-chan struct{} } func (pc *proxyCloser) Close() error { - // client ctx is canceled before calling close, so kv will close out + // client ctx is canceled before calling close, so kv and lp will close out <-pc.kvdonec err := pc.Watcher.Close() <-pc.wdonec + <-pc.lpdonec return err } @@ -79,10 +82,12 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) { rpc := toGRPC(c) c.KV = clientv3.NewKVFromKVClient(rpc.KV) pmu.Lock() + c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg.DialTimeout) c.Watcher = &proxyCloser{ Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch), wdonec: proxies[c].wdonec, kvdonec: proxies[c].kvdonec, + lpdonec: proxies[c].lpdonec, } pmu.Unlock() return c, nil