From f65ff885114d64c3b3afb0b12edfd83822996717 Mon Sep 17 00:00:00 2001 From: Ivan Valdes Date: Mon, 29 Apr 2024 10:22:00 -0700 Subject: [PATCH] grpcproxy: fix memberlist results not update when proxy node down If grpc starts with `--resolver-prefix`, its memberlist will return all alive proxy nodes. When one GRPC proxy node goes down it shouldn't return that node, however, it's returned. Backport of PR #15907 / commit ecfed91e5050431385961573e5c09658a66a59d5. Co-authored-by: yellowzf Signed-off-by: Ivan Valdes --- proxy/grpcproxy/cluster.go | 8 ++--- proxy/grpcproxy/cluster_test.go | 59 +++++++++++++++++++++++++++++---- 2 files changed, 56 insertions(+), 11 deletions(-) diff --git a/proxy/grpcproxy/cluster.go b/proxy/grpcproxy/cluster.go index 338827d46..cd25e1867 100644 --- a/proxy/grpcproxy/cluster.go +++ b/proxy/grpcproxy/cluster.go @@ -105,9 +105,9 @@ func (cp *clusterProxy) monitor(wc endpoints.WatchChannel) { for _, up := range updates { switch up.Op { case endpoints.Add: - cp.umap[up.Endpoint.Addr] = up.Endpoint + cp.umap[up.Key] = up.Endpoint case endpoints.Delete: - delete(cp.umap, up.Endpoint.Addr) + delete(cp.umap, up.Key) } } cp.umu.Unlock() @@ -162,12 +162,12 @@ func (cp *clusterProxy) membersFromUpdates() ([]*pb.Member, error) { cp.umu.RLock() defer cp.umu.RUnlock() mbs := make([]*pb.Member, 0, len(cp.umap)) - for addr, upt := range cp.umap { + for _, upt := range cp.umap { m, err := decodeMeta(fmt.Sprint(upt.Metadata)) if err != nil { return nil, err } - mbs = append(mbs, &pb.Member{Name: m.Name, ClientURLs: []string{addr}}) + mbs = append(mbs, &pb.Member{Name: m.Name, ClientURLs: []string{upt.Addr}}) } return mbs, nil } diff --git a/proxy/grpcproxy/cluster_test.go b/proxy/grpcproxy/cluster_test.go index 778681744..adca0664a 100644 --- a/proxy/grpcproxy/cluster_test.go +++ b/proxy/grpcproxy/cluster_test.go @@ -17,10 +17,13 @@ package grpcproxy import ( "context" "net" + "os" "testing" "time" + "github.com/stretchr/testify/assert" "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/clientv3/naming/endpoints" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/integration" "go.etcd.io/etcd/pkg/testutil" @@ -34,7 +37,10 @@ func TestClusterProxyMemberList(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - cts := newClusterProxyServer([]string{clus.Members[0].GRPCAddr()}, t) + serverEps := []string{clus.Members[0].GRPCAddr()} + prefix := "test-prefix" + hostname, _ := os.Hostname() + cts := newClusterProxyServer(serverEps, prefix, t) defer cts.close(t) cfg := clientv3.Config{ @@ -48,7 +54,7 @@ func TestClusterProxyMemberList(t *testing.T) { defer client.Close() // wait some time for register-loop to write keys - time.Sleep(time.Second) + time.Sleep(200 * time.Millisecond) var mresp *clientv3.MemberListResponse mresp, err = client.Cluster.MemberList(context.Background()) @@ -62,9 +68,38 @@ func TestClusterProxyMemberList(t *testing.T) { if len(mresp.Members[0].ClientURLs) != 1 { t.Fatalf("len(mresp.Members[0].ClientURLs) expected 1, got %d (%+v)", len(mresp.Members[0].ClientURLs), mresp.Members[0].ClientURLs[0]) } - if mresp.Members[0].ClientURLs[0] != cts.caddr { - t.Fatalf("mresp.Members[0].ClientURLs[0] expected %q, got %q", cts.caddr, mresp.Members[0].ClientURLs[0]) + assert.Contains(t, mresp.Members, &pb.Member{Name: hostname, ClientURLs: []string{cts.caddr}}) + + //test proxy member add + newMemberAddr := "127.0.0.2:6789" + Register(cts.c, prefix, newMemberAddr, 7) + // wait some time for proxy update members + time.Sleep(200 * time.Millisecond) + + //check add member succ + mresp, err = client.Cluster.MemberList(context.Background()) + if err != nil { + t.Fatalf("err %v, want nil", err) } + if len(mresp.Members) != 2 { + t.Fatalf("len(mresp.Members) expected 2, got %d (%+v)", len(mresp.Members), mresp.Members) + } + assert.Contains(t, mresp.Members, &pb.Member{Name: hostname, ClientURLs: []string{newMemberAddr}}) + + //test proxy member delete + deregisterMember(cts.c, prefix, newMemberAddr, t) + // wait some time for proxy update members + time.Sleep(200 * time.Millisecond) + + //check delete member succ + mresp, err = client.Cluster.MemberList(context.Background()) + if err != nil { + t.Fatalf("err %v, want nil", err) + } + if len(mresp.Members) != 1 { + t.Fatalf("len(mresp.Members) expected 1, got %d (%+v)", len(mresp.Members), mresp.Members) + } + assert.Contains(t, mresp.Members, &pb.Member{Name: hostname, ClientURLs: []string{cts.caddr}}) } type clusterproxyTestServer struct { @@ -88,7 +123,7 @@ func (cts *clusterproxyTestServer) close(t *testing.T) { } } -func newClusterProxyServer(endpoints []string, t *testing.T) *clusterproxyTestServer { +func newClusterProxyServer(endpoints []string, prefix string, t *testing.T) *clusterproxyTestServer { cfg := clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, @@ -113,8 +148,8 @@ func newClusterProxyServer(endpoints []string, t *testing.T) *clusterproxyTestSe cts.server.Serve(cts.l) }() - Register(client, "test-prefix", cts.l.Addr().String(), 7) - cts.cp, cts.donec = NewClusterProxy(client, cts.l.Addr().String(), "test-prefix") + Register(client, prefix, cts.l.Addr().String(), 7) + cts.cp, cts.donec = NewClusterProxy(client, cts.l.Addr().String(), prefix) cts.caddr = cts.l.Addr().String() pb.RegisterClusterServer(cts.server, cts.cp) close(servec) @@ -124,3 +159,13 @@ func newClusterProxyServer(endpoints []string, t *testing.T) *clusterproxyTestSe return cts } + +func deregisterMember(c *clientv3.Client, prefix, addr string, t *testing.T) { + em, err := endpoints.NewManager(c, prefix) + if err != nil { + t.Fatalf("new endpoint manager failed, err %v", err) + } + if err = em.DeleteEndpoint(c.Ctx(), prefix+"/"+addr); err != nil { + t.Fatalf("delete endpoint failed, err %v", err) + } +}