From 1c9fa07cd757fc93f9185ac2c9c74e8af35d26b9 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Tue, 28 Jun 2022 13:44:47 +0800 Subject: [PATCH] Fix deadlock in 'go test -tags cluster_proxy -v ./integration/... ./clientv3/...' Cherry pick https://github.com/etcd-io/etcd/pull/12319 to 3.4. Signed-off-by: Benjamin Wang --- clientv3/naming/grpc_test.go | 7 +++--- etcdmain/grpc_proxy.go | 4 ++-- integration/cluster_proxy.go | 44 ++++++++++++++++++++++-------------- proxy/grpcproxy/lease.go | 6 ++--- proxy/grpcproxy/watch.go | 6 ++--- 5 files changed, 39 insertions(+), 28 deletions(-) diff --git a/clientv3/naming/grpc_test.go b/clientv3/naming/grpc_test.go index 66a1feae0..0041a89a8 100644 --- a/clientv3/naming/grpc_test.go +++ b/clientv3/naming/grpc_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package naming +package naming_test import ( "context" @@ -21,6 +21,7 @@ import ( "testing" etcd "go.etcd.io/etcd/clientv3" + namingv3 "go.etcd.io/etcd/clientv3/naming" "go.etcd.io/etcd/integration" "go.etcd.io/etcd/pkg/testutil" @@ -33,7 +34,7 @@ func TestGRPCResolver(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - r := GRPCResolver{ + r := namingv3.GRPCResolver{ Client: clus.RandClient(), } @@ -107,7 +108,7 @@ func TestGRPCResolverMulti(t *testing.T) { t.Fatal(err) } - r := GRPCResolver{c} + r := namingv3.GRPCResolver{c} w, err := r.Resolve("foo") if err != nil { diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index 323d1d942..a2688f5aa 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -350,12 +350,12 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server { } kvp, _ := grpcproxy.NewKvProxy(client) - watchp, _ := grpcproxy.NewWatchProxy(client) + watchp, _ := grpcproxy.NewWatchProxy(client.Ctx(), client) if grpcProxyResolverPrefix != "" { grpcproxy.Register(client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL) } clusterp, _ := grpcproxy.NewClusterProxy(client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix) - leasep, _ := grpcproxy.NewLeaseProxy(client) + leasep, _ := grpcproxy.NewLeaseProxy(client.Ctx(), client) mainp := grpcproxy.NewMaintenanceProxy(client) authp := grpcproxy.NewAuthProxy(client) electionp := grpcproxy.NewElectionProxy(client) diff --git a/integration/cluster_proxy.go b/integration/cluster_proxy.go index 4a09a48a3..080b55f90 100644 --- a/integration/cluster_proxy.go +++ b/integration/cluster_proxy.go @@ -17,6 +17,7 @@ package integration import ( + "context" "sync" "go.etcd.io/etcd/clientv3" @@ -35,16 +36,23 @@ var ( const proxyNamespace = "proxy-namespace" type grpcClientProxy struct { - grpc grpcAPI - wdonec <-chan struct{} - kvdonec <-chan struct{} - lpdonec <-chan struct{} + ctx context.Context + ctxCancel func() + grpc grpcAPI + wdonec <-chan struct{} + kvdonec <-chan struct{} + lpdonec <-chan struct{} } func toGRPC(c *clientv3.Client) grpcAPI { pmu.Lock() defer pmu.Unlock() + // dedicated context bound to 'grpc-proxy' lifetype + // (so in practice lifetime of the client connection to the proxy). + // TODO: Refactor to a separate clientv3.Client instance instead of the context alone. + ctx, ctxCancel := context.WithCancel(context.WithValue(context.TODO(), "_name", "grpcProxyContext")) + if v, ok := proxies[c]; ok { return v.grpc } @@ -55,8 +63,8 @@ func toGRPC(c *clientv3.Client) grpcAPI { c.Lease = namespace.NewLease(c.Lease, proxyNamespace) // test coalescing/caching proxy kvp, kvpch := grpcproxy.NewKvProxy(c) - wp, wpch := grpcproxy.NewWatchProxy(c) - lp, lpch := grpcproxy.NewLeaseProxy(c) + wp, wpch := grpcproxy.NewWatchProxy(ctx, c) + lp, lpch := grpcproxy.NewLeaseProxy(ctx, c) mp := grpcproxy.NewMaintenanceProxy(c) clp, _ := grpcproxy.NewClusterProxy(c, "", "") // without registering proxy URLs authp := grpcproxy.NewAuthProxy(c) @@ -73,20 +81,21 @@ func toGRPC(c *clientv3.Client) grpcAPI { adapter.LockServerToLockClient(lockp), adapter.ElectionServerToElectionClient(electp), } - proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch, kvdonec: kvpch, lpdonec: lpch} + proxies[c] = grpcClientProxy{ctx: ctx, ctxCancel: ctxCancel, grpc: grpc, wdonec: wpch, kvdonec: kvpch, lpdonec: lpch} return grpc } type proxyCloser struct { clientv3.Watcher - wdonec <-chan struct{} - kvdonec <-chan struct{} - lclose func() - lpdonec <-chan struct{} + proxyCtxCancel func() + wdonec <-chan struct{} + kvdonec <-chan struct{} + lclose func() + lpdonec <-chan struct{} } func (pc *proxyCloser) Close() error { - // client ctx is canceled before calling close, so kv and lp will close out + pc.proxyCtxCancel() <-pc.kvdonec err := pc.Watcher.Close() <-pc.wdonec @@ -106,11 +115,12 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) { lc := c.Lease c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, c, cfg.DialTimeout) c.Watcher = &proxyCloser{ - Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch, c), - wdonec: proxies[c].wdonec, - kvdonec: proxies[c].kvdonec, - lclose: func() { lc.Close() }, - lpdonec: proxies[c].lpdonec, + Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch, c), + wdonec: proxies[c].wdonec, + kvdonec: proxies[c].kvdonec, + lclose: func() { lc.Close() }, + lpdonec: proxies[c].lpdonec, + proxyCtxCancel: proxies[c].ctxCancel, } pmu.Unlock() return c, nil diff --git a/proxy/grpcproxy/lease.go b/proxy/grpcproxy/lease.go index a6e5515ae..237176559 100644 --- a/proxy/grpcproxy/lease.go +++ b/proxy/grpcproxy/lease.go @@ -48,13 +48,13 @@ type leaseProxy struct { wg sync.WaitGroup } -func NewLeaseProxy(c *clientv3.Client) (pb.LeaseServer, <-chan struct{}) { - cctx, cancel := context.WithCancel(c.Ctx()) +func NewLeaseProxy(ctx context.Context, c *clientv3.Client) (pb.LeaseServer, <-chan struct{}) { + cctx, cancel := context.WithCancel(ctx) lp := &leaseProxy{ leaseClient: pb.NewLeaseClient(c.ActiveConnection()), lessor: c.Lease, ctx: cctx, - leader: newLeader(c.Ctx(), c.Watcher), + leader: newLeader(ctx, c.Watcher), } ch := make(chan struct{}) go func() { diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index 8b0c4c003..770941b8d 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -46,12 +46,12 @@ type watchProxy struct { kv clientv3.KV } -func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) { - cctx, cancel := context.WithCancel(c.Ctx()) +func NewWatchProxy(ctx context.Context, c *clientv3.Client) (pb.WatchServer, <-chan struct{}) { + cctx, cancel := context.WithCancel(ctx) wp := &watchProxy{ cw: c.Watcher, ctx: cctx, - leader: newLeader(c.Ctx(), c.Watcher), + leader: newLeader(ctx, c.Watcher), kv: c.KV, // for permission checking }