From 571ed502d46c074d34bbf7a01feceecf4e05cacb Mon Sep 17 00:00:00 2001 From: limeng01 Date: Thu, 4 Feb 2021 16:00:00 +0800 Subject: [PATCH] endpoints: implement Update method for EndpointManager. - Add integration test for endpoints and resolver. --- client/v3/naming/endpoints/endpoints_impl.go | 85 +++++++++++++---- pkg/go.sum | 5 + pkg/grpc_testing/stub_server.go | 86 +++++++++++++++++ .../clientv3/naming/endpoints_test.go | 81 ++++++++++++++++ .../clientv3/naming/resolver_test.go | 93 +++++++++++++++---- 5 files changed, 312 insertions(+), 38 deletions(-) create mode 100644 pkg/grpc_testing/stub_server.go diff --git a/client/v3/naming/endpoints/endpoints_impl.go b/client/v3/naming/endpoints/endpoints_impl.go index 1fef5ea37..0fb848fa2 100644 --- a/client/v3/naming/endpoints/endpoints_impl.go +++ b/client/v3/naming/endpoints/endpoints_impl.go @@ -4,37 +4,69 @@ package endpoints import ( "context" + "encoding/json" + "errors" "fmt" + "strings" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/naming/endpoints/internal" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type endpointManager struct { - // TODO: To be implemented, tracked by: https://github.com/etcd-io/etcd/issues/12652 + // Client is an initialized etcd client. + client *clientv3.Client + target string } +// NewManager creates an endpoint manager which implements the interface of 'Manager'. func NewManager(client *clientv3.Client, target string) (Manager, error) { - // To be implemented (https://github.com/etcd-io/etcd/issues/12652) - return nil, fmt.Errorf("Not implemented yet") + if client == nil { + return nil, errors.New("invalid etcd client") + } + + if target == "" { + return nil, errors.New("invalid target") + } + + em := &endpointManager{ + client: client, + target: target, + } + return em, nil } -func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) error { - // TODO: For loop in a single transaction: - internalUpdate := &internal.Update{} // translate UpdateWithOpts into json format. - switch internalUpdate.Op { - //case internal.Add: - // var v []byte - // if v, err = json.Marshal(internalUpdate); err != nil { - // return status.Error(codes.InvalidArgument, err.Error()) - // } - // _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...) - //case internal.Delete: - // _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...) - //default: - // return status.Error(codes.InvalidArgument, "naming: bad naming op") +func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) (err error) { + ops := make([]clientv3.Op, 0, len(updates)) + for _, update := range updates { + if !strings.HasPrefix(update.Key, m.target+"/") { + return status.Errorf(codes.InvalidArgument, "endpoints: endpoint key should be prefixed with %s/", m.target) + } + + switch update.Op { + case Add: + internalUpdate := &internal.Update{ + Op: internal.Add, + Addr: update.Endpoint.Addr, + Metadata: update.Endpoint.Metadata, + } + + var v []byte + if v, err = json.Marshal(internalUpdate); err != nil { + return status.Error(codes.InvalidArgument, err.Error()) + } + ops = append(ops, clientv3.OpPut(update.Key, string(v), update.Opts...)) + case Delete: + ops = append(ops, clientv3.OpDelete(update.Key, update.Opts...)) + default: + return status.Error(codes.InvalidArgument, "endpoints: bad update op") + } } - return fmt.Errorf("Not implemented yet") + _, err = m.client.KV.Txn(ctx).Then(ops...).Commit() + return err } func (m *endpointManager) AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error { @@ -116,6 +148,19 @@ func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, er } func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) { - // TODO: Implementation - return nil, fmt.Errorf("Not implemented yet") + resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable()) + if err != nil { + return nil, err + } + + eps := make(Key2EndpointMap) + for _, kv := range resp.Kvs { + var iup internal.Update + if err := json.Unmarshal(kv.Value, &iup); err != nil { + continue + } + + eps[string(kv.Key)] = Endpoint{Addr: iup.Addr, Metadata: iup.Metadata} + } + return eps, nil } diff --git a/pkg/go.sum b/pkg/go.sum index dfc6f2045..428127d1b 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -21,7 +21,9 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I= github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= @@ -64,6 +66,7 @@ golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -74,6 +77,7 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201009025420-dfb3f7c4e634 h1:bNEHhJCnrwMKNMmOx3yAynp5vs5/gRy+XWFtZFu7NBM= golang.org/x/sys v0.0.0-20201009025420-dfb3f7c4e634/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -87,6 +91,7 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= diff --git a/pkg/grpc_testing/stub_server.go b/pkg/grpc_testing/stub_server.go new file mode 100644 index 000000000..43d0d8584 --- /dev/null +++ b/pkg/grpc_testing/stub_server.go @@ -0,0 +1,86 @@ +package grpc_testing + +import ( + "context" + "fmt" + "net" + + "google.golang.org/grpc" + testpb "google.golang.org/grpc/test/grpc_testing" +) + +// StubServer is borrowed from the interal package of grpc-go. +// See https://github.com/grpc/grpc-go/blob/master/internal/stubserver/stubserver.go +// Since it cannot be imported directly, we have to copy and paste it here, +// and useless code for our testing is removed. + +// StubServer is a server that is easy to customize within individual test +// cases. +type StubServer struct { + // Guarantees we satisfy this interface; panics if unimplemented methods are called. + testpb.TestServiceServer + + EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) + UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) + FullDuplexCallF func(stream testpb.TestService_FullDuplexCallServer) error + + s *grpc.Server + + // Network and Address are parameters for Listen. Defaults will be used if these are empty before Start. + Network string + Address string + + cleanups []func() // Lambdas executed in Stop(); populated by Start(). +} + +// EmptyCall is the handler for testpb.EmptyCall. +func (ss *StubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return ss.EmptyCallF(ctx, in) +} + +// UnaryCall is the handler for testpb.UnaryCall. +func (ss *StubServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return ss.UnaryCallF(ctx, in) +} + +// FullDuplexCall is the handler for testpb.FullDuplexCall. +func (ss *StubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { + return ss.FullDuplexCallF(stream) +} + +// Start starts the server and creates a client connected to it. +func (ss *StubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error { + if ss.Network == "" { + ss.Network = "tcp" + } + if ss.Address == "" { + ss.Address = "localhost:0" + } + + lis, err := net.Listen(ss.Network, ss.Address) + if err != nil { + return fmt.Errorf("net.Listen(%q, %q) = %v", ss.Network, ss.Address, err) + } + ss.Address = lis.Addr().String() + ss.cleanups = append(ss.cleanups, func() { lis.Close() }) + + s := grpc.NewServer(sopts...) + testpb.RegisterTestServiceServer(s, ss) + go s.Serve(lis) + ss.cleanups = append(ss.cleanups, s.Stop) + ss.s = s + + return nil +} + +// Stop stops ss and cleans up all resources it consumed. +func (ss *StubServer) Stop() { + for i := len(ss.cleanups) - 1; i >= 0; i-- { + ss.cleanups[i]() + } +} + +// Addr gets the address the server listening on. +func (ss *StubServer) Addr() string { + return ss.Address +} diff --git a/tests/integration/clientv3/naming/endpoints_test.go b/tests/integration/clientv3/naming/endpoints_test.go index 4bc8a6463..606d0e537 100644 --- a/tests/integration/clientv3/naming/endpoints_test.go +++ b/tests/integration/clientv3/naming/endpoints_test.go @@ -133,3 +133,84 @@ func TestEndpointManagerAtomicity(t *testing.T) { t.Fatalf("expected two delete updates, got %+v", updates) } } + +func TestEndpointManagerCRUD(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + em, err := endpoints.NewManager(clus.RandClient(), "foo") + if err != nil { + t.Fatal("failed to create EndpointManager", err) + } + + // Add + k1 := "foo/a1" + e1 := endpoints.Endpoint{Addr: "127.0.0.1", Metadata: "metadata1"} + err = em.AddEndpoint(context.TODO(), k1, e1) + if err != nil { + t.Fatal("failed to add", k1, err) + } + + k2 := "foo/a2" + e2 := endpoints.Endpoint{Addr: "127.0.0.2", Metadata: "metadata2"} + err = em.AddEndpoint(context.TODO(), k2, e2) + if err != nil { + t.Fatal("failed to add", k2, err) + } + + eps, err := em.List(context.TODO()) + if err != nil { + t.Fatal("failed to list foo") + } + if len(eps) != 2 { + t.Fatalf("unexpected the number of endpoints: %d", len(eps)) + } + if !reflect.DeepEqual(eps[k1], e1) { + t.Fatalf("unexpected endpoints: %s", k1) + } + if !reflect.DeepEqual(eps[k2], e2) { + t.Fatalf("unexpected endpoints: %s", k2) + } + + // Delete + err = em.DeleteEndpoint(context.TODO(), k1) + if err != nil { + t.Fatal("failed to delete", k2, err) + } + + eps, err = em.List(context.TODO()) + if err != nil { + t.Fatal("failed to list foo") + } + if len(eps) != 1 { + t.Fatalf("unexpected the number of endpoints: %d", len(eps)) + } + if !reflect.DeepEqual(eps[k2], e2) { + t.Fatalf("unexpected endpoints: %s", k2) + } + + // Update + k3 := "foo/a3" + e3 := endpoints.Endpoint{Addr: "127.0.0.3", Metadata: "metadata3"} + updates := []*endpoints.UpdateWithOpts{ + {Update: endpoints.Update{Op: endpoints.Add, Key: k3, Endpoint: e3}}, + {Update: endpoints.Update{Op: endpoints.Delete, Key: k2}}, + } + err = em.Update(context.TODO(), updates) + if err != nil { + t.Fatal("failed to update", err) + } + + eps, err = em.List(context.TODO()) + if err != nil { + t.Fatal("failed to list foo") + } + if len(eps) != 1 { + t.Fatalf("unexpected the number of endpoints: %d", len(eps)) + } + if !reflect.DeepEqual(eps[k3], e3) { + t.Fatalf("unexpected endpoints: %s", k3) + } +} diff --git a/tests/integration/clientv3/naming/resolver_test.go b/tests/integration/clientv3/naming/resolver_test.go index f260095e0..5081ba7c2 100644 --- a/tests/integration/clientv3/naming/resolver_test.go +++ b/tests/integration/clientv3/naming/resolver_test.go @@ -15,56 +15,113 @@ package naming_test import ( + "bytes" "context" "testing" + "time" + etcdnaming "go.etcd.io/etcd/client/v3/naming" "go.etcd.io/etcd/client/v3/naming/endpoints" - "go.etcd.io/etcd/client/v3/naming/resolver" - "google.golang.org/grpc" - + grpctest "go.etcd.io/etcd/pkg/v3/grpc_testing" "go.etcd.io/etcd/pkg/v3/testutil" "go.etcd.io/etcd/tests/v3/integration" + + "google.golang.org/grpc" + testpb "google.golang.org/grpc/test/grpc_testing" ) // This test mimics scenario described in grpc_naming.md doc. func TestEtcdGrpcResolver(t *testing.T) { - t.Skip("Not implemented yet") - defer testutil.AfterTest(t) - // s1 := // TODO: Dummy GRPC service listening on 127.0.0.1:20000 - // s2 := // TODO: Dummy GRPC service listening on 127.0.0.1:20001 + s1PayloadBody := []byte{'1'} + s1 := newDummyStubServer(s1PayloadBody) + if err := s1.Start(nil); err != nil { + t.Fatal("failed to start dummy grpc server (s1)", err) + } + defer s1.Stop() - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + s2PayloadBody := []byte{'2'} + s2 := newDummyStubServer(s2PayloadBody) + if err := s2.Start(nil); err != nil { + t.Fatal("failed to start dummy grpc server (s2)", err) + } + defer s2.Stop() + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) - em, err := endpoints.NewManager(clus.RandClient(), "foo") + em, err := endpoints.NewManager(clus.Client(0), "foo") if err != nil { t.Fatal("failed to create EndpointManager", err) } - e1 := endpoints.Endpoint{Addr: "127.0.0.1:20000"} - e2 := endpoints.Endpoint{Addr: "127.0.0.1:20001"} + e1 := endpoints.Endpoint{Addr: s1.Addr()} + e2 := endpoints.Endpoint{Addr: s2.Addr()} err = em.AddEndpoint(context.TODO(), "foo/e1", e1) if err != nil { t.Fatal("failed to add foo", err) } - etcdResolver, err := resolver.NewBuilder(clus.RandClient()) - conn, err := grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver)) + r := &etcdnaming.GRPCResolver{Client: clus.Client(1)} + b := grpc.RoundRobin(r) + + conn, err := grpc.Dial("foo", grpc.WithInsecure(), grpc.WithBalancer(b)) if err != nil { - t.Fatal("failed to connect to foo (e1)", err) + t.Fatal("failed to connect to foo", err) } + defer conn.Close() - // TODO: send requests to conn, ensure s1 received it. + c := testpb.NewTestServiceClient(conn) + resp, err := c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}, grpc.WaitForReady(true)) + if err != nil { + t.Fatal("failed to invoke rpc to foo (e1)", err) + } + if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s1PayloadBody) { + t.Fatalf("unexpected response from foo (e1): %s", resp.GetPayload().GetBody()) + } em.DeleteEndpoint(context.TODO(), "foo/e1") em.AddEndpoint(context.TODO(), "foo/e2", e2) - // TODO: Send requests to conn and make sure s2 receive it. - // Might require restarting s1 to break the existing (open) connection. + // We use a loop with deadline of 30s to avoid test getting flake + // as it's asynchronous for gRPC Client to update underlying connections. + maxRetries := 300 + retryPeriod := 100 * time.Millisecond + retries := 0 + for { + time.Sleep(retryPeriod) + retries++ - conn.GetState() // this line is to avoid compiler warning that conn is unused. + resp, err = c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}) + if err != nil { + if retries < maxRetries { + continue + } + t.Fatal("failed to invoke rpc to foo (e2)", err) + } + if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s2PayloadBody) { + if retries < maxRetries { + continue + } + t.Fatalf("unexpected response from foo (e2): %s", resp.GetPayload().GetBody()) + } + + break + } +} + +func newDummyStubServer(body []byte) *grpctest.StubServer { + return &grpctest.StubServer{ + UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{ + Payload: &testpb.Payload{ + Type: testpb.PayloadType_COMPRESSABLE, + Body: body, + }, + }, nil + }, + } }