From ecfed91e5050431385961573e5c09658a66a59d5 Mon Sep 17 00:00:00 2001 From: yellowzf Date: Sat, 6 May 2023 11:29:19 +0800 Subject: [PATCH] grpcproxy: fix memberlist results not update when proxy node down If start grpc proxy with --resolver-prefix, memberlist will return all alive proxy nodes, when one grpc proxy node is down, it is expected to not return the down node, but it is still return Signed-off-by: yellowzf --- server/proxy/grpcproxy/cluster.go | 8 +-- .../proxy/grpcproxy/cluster_test.go | 62 ++++++++++++++++--- 2 files changed, 58 insertions(+), 12 deletions(-) diff --git a/server/proxy/grpcproxy/cluster.go b/server/proxy/grpcproxy/cluster.go index 1f7dccbe7..e225fd8ef 100644 --- a/server/proxy/grpcproxy/cluster.go +++ b/server/proxy/grpcproxy/cluster.go @@ -112,9 +112,9 @@ func (cp *clusterProxy) monitor(wa endpoints.WatchChannel) { for _, up := range updates { switch up.Op { case endpoints.Add: - cp.umap[up.Endpoint.Addr] = up.Endpoint + cp.umap[up.Key] = up.Endpoint case endpoints.Delete: - delete(cp.umap, up.Endpoint.Addr) + delete(cp.umap, up.Key) } } cp.umu.Unlock() @@ -169,12 +169,12 @@ func (cp *clusterProxy) membersFromUpdates() ([]*pb.Member, error) { cp.umu.RLock() defer cp.umu.RUnlock() mbs := make([]*pb.Member, 0, len(cp.umap)) - for addr, upt := range cp.umap { + for _, upt := range cp.umap { m, err := decodeMeta(fmt.Sprint(upt.Metadata)) if err != nil { return nil, err } - mbs = append(mbs, &pb.Member{Name: m.Name, ClientURLs: []string{addr}}) + mbs = append(mbs, &pb.Member{Name: m.Name, ClientURLs: []string{upt.Addr}}) } return mbs, nil } diff --git a/tests/integration/proxy/grpcproxy/cluster_test.go b/tests/integration/proxy/grpcproxy/cluster_test.go index 162956444..d7b8d70cb 100644 --- a/tests/integration/proxy/grpcproxy/cluster_test.go +++ b/tests/integration/proxy/grpcproxy/cluster_test.go @@ -17,15 +17,18 @@ package grpcproxy import ( "context" "net" + "os" "testing" "time" pb "go.etcd.io/etcd/api/v3/etcdserverpb" - "go.etcd.io/etcd/client/v3" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/naming/endpoints" "go.etcd.io/etcd/server/v3/proxy/grpcproxy" "go.etcd.io/etcd/tests/v3/integration" "go.uber.org/zap/zaptest" + "github.com/stretchr/testify/assert" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -36,7 +39,11 @@ func TestClusterProxyMemberList(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - cts := newClusterProxyServer(zaptest.NewLogger(t), []string{clus.Members[0].GRPCURL()}, t) + lg := zaptest.NewLogger(t) + serverEps := []string{clus.Members[0].GRPCURL()} + prefix := "test-prefix" + hostname, _ := os.Hostname() + cts := newClusterProxyServer(lg, serverEps, prefix, t) defer cts.close(t) cfg := clientv3.Config{ @@ -50,7 +57,7 @@ func TestClusterProxyMemberList(t *testing.T) { defer client.Close() // wait some time for register-loop to write keys - time.Sleep(time.Second) + time.Sleep(200 * time.Millisecond) var mresp *clientv3.MemberListResponse mresp, err = client.Cluster.MemberList(context.Background()) @@ -64,9 +71,38 @@ func TestClusterProxyMemberList(t *testing.T) { if len(mresp.Members[0].ClientURLs) != 1 { t.Fatalf("len(mresp.Members[0].ClientURLs) expected 1, got %d (%+v)", len(mresp.Members[0].ClientURLs), mresp.Members[0].ClientURLs[0]) } - if mresp.Members[0].ClientURLs[0] != cts.caddr { - t.Fatalf("mresp.Members[0].ClientURLs[0] expected %q, got %q", cts.caddr, mresp.Members[0].ClientURLs[0]) + assert.Contains(t, mresp.Members, &pb.Member{Name: hostname, ClientURLs: []string{cts.caddr}}) + + //test proxy member add + newMemberAddr := "127.0.0.2:6789" + grpcproxy.Register(lg, cts.c, prefix, newMemberAddr, 7) + // wait some time for proxy update members + time.Sleep(200 * time.Millisecond) + + //check add member succ + mresp, err = client.Cluster.MemberList(context.Background()) + if err != nil { + t.Fatalf("err %v, want nil", err) } + if len(mresp.Members) != 2 { + t.Fatalf("len(mresp.Members) expected 2, got %d (%+v)", len(mresp.Members), mresp.Members) + } + assert.Contains(t, mresp.Members, &pb.Member{Name: hostname, ClientURLs: []string{newMemberAddr}}) + + //test proxy member delete + deregisterMember(cts.c, prefix, newMemberAddr, t) + // wait some time for proxy update members + time.Sleep(200 * time.Millisecond) + + //check delete member succ + mresp, err = client.Cluster.MemberList(context.Background()) + if err != nil { + t.Fatalf("err %v, want nil", err) + } + if len(mresp.Members) != 1 { + t.Fatalf("len(mresp.Members) expected 1, got %d (%+v)", len(mresp.Members), mresp.Members) + } + assert.Contains(t, mresp.Members, &pb.Member{Name: hostname, ClientURLs: []string{cts.caddr}}) } type clusterproxyTestServer struct { @@ -90,7 +126,7 @@ func (cts *clusterproxyTestServer) close(t *testing.T) { } } -func newClusterProxyServer(lg *zap.Logger, endpoints []string, t *testing.T) *clusterproxyTestServer { +func newClusterProxyServer(lg *zap.Logger, endpoints []string, prefix string, t *testing.T) *clusterproxyTestServer { cfg := clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, @@ -115,8 +151,8 @@ func newClusterProxyServer(lg *zap.Logger, endpoints []string, t *testing.T) *cl cts.server.Serve(cts.l) }() - grpcproxy.Register(lg, client, "test-prefix", cts.l.Addr().String(), 7) - cts.cp, cts.donec = grpcproxy.NewClusterProxy(lg, client, cts.l.Addr().String(), "test-prefix") + grpcproxy.Register(lg, client, prefix, cts.l.Addr().String(), 7) + cts.cp, cts.donec = grpcproxy.NewClusterProxy(lg, client, cts.l.Addr().String(), prefix) cts.caddr = cts.l.Addr().String() pb.RegisterClusterServer(cts.server, cts.cp) close(servec) @@ -126,3 +162,13 @@ func newClusterProxyServer(lg *zap.Logger, endpoints []string, t *testing.T) *cl return cts } + +func deregisterMember(c *clientv3.Client, prefix, addr string, t *testing.T) { + em, err := endpoints.NewManager(c, prefix) + if err != nil { + t.Fatalf("new endpoint manager failed, err %v", err) + } + if err = em.DeleteEndpoint(c.Ctx(), prefix+"/"+addr); err != nil { + t.Fatalf("delete endpoint failed, err %v", err) + } +}