diff --git a/clientv3/integration/naming/endpoints_test.go b/clientv3/integration/naming/endpoints_test.go index be59479eb..25b6be28e 100644 --- a/clientv3/integration/naming/endpoints_test.go +++ b/clientv3/integration/naming/endpoints_test.go @@ -133,3 +133,84 @@ func TestEndpointManagerAtomicity(t *testing.T) { t.Fatalf("expected two delete updates, got %+v", updates) } } + +func TestEndpointManagerCRUD(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + em, err := endpoints.NewManager(clus.RandClient(), "foo") + if err != nil { + t.Fatal("failed to create EndpointManager", err) + } + + // Add + k1 := "foo/a1" + e1 := endpoints.Endpoint{Addr: "127.0.0.1", Metadata: "metadata1"} + err = em.AddEndpoint(context.TODO(), k1, e1) + if err != nil { + t.Fatal("failed to add", k1, err) + } + + k2 := "foo/a2" + e2 := endpoints.Endpoint{Addr: "127.0.0.2", Metadata: "metadata2"} + err = em.AddEndpoint(context.TODO(), k2, e2) + if err != nil { + t.Fatal("failed to add", k2, err) + } + + eps, err := em.List(context.TODO()) + if err != nil { + t.Fatal("failed to list foo") + } + if len(eps) != 2 { + t.Fatalf("unexpected the number of endpoints: %d", len(eps)) + } + if !reflect.DeepEqual(eps[k1], e1) { + t.Fatalf("unexpected endpoints: %s", k1) + } + if !reflect.DeepEqual(eps[k2], e2) { + t.Fatalf("unexpected endpoints: %s", k2) + } + + // Delete + err = em.DeleteEndpoint(context.TODO(), k1) + if err != nil { + t.Fatal("failed to delete", k2, err) + } + + eps, err = em.List(context.TODO()) + if err != nil { + t.Fatal("failed to list foo") + } + if len(eps) != 1 { + t.Fatalf("unexpected the number of endpoints: %d", len(eps)) + } + if !reflect.DeepEqual(eps[k2], e2) { + t.Fatalf("unexpected endpoints: %s", k2) + } + + // Update + k3 := "foo/a3" + e3 := endpoints.Endpoint{Addr: "127.0.0.3", Metadata: "metadata3"} + updates := []*endpoints.UpdateWithOpts{ + {Update: endpoints.Update{Op: endpoints.Add, Key: k3, Endpoint: e3}}, + {Update: endpoints.Update{Op: endpoints.Delete, Key: k2}}, + } + err = em.Update(context.TODO(), updates) + if err != nil { + t.Fatal("failed to update", err) + } + + eps, err = em.List(context.TODO()) + if err != nil { + t.Fatal("failed to list foo") + } + if len(eps) != 1 { + t.Fatalf("unexpected the number of endpoints: %d", len(eps)) + } + if !reflect.DeepEqual(eps[k3], e3) { + t.Fatalf("unexpected endpoints: %s", k3) + } +} diff --git a/clientv3/integration/naming/resolver_test.go b/clientv3/integration/naming/resolver_test.go index ecb90d591..f65f5c3f3 100644 --- a/clientv3/integration/naming/resolver_test.go +++ b/clientv3/integration/naming/resolver_test.go @@ -15,56 +15,111 @@ package naming import ( + "bytes" "context" "testing" + "time" "google.golang.org/grpc" + testpb "google.golang.org/grpc/test/grpc_testing" + etcdnaming "go.etcd.io/etcd/clientv3/naming" "go.etcd.io/etcd/clientv3/naming/endpoints" - "go.etcd.io/etcd/clientv3/naming/resolver" "go.etcd.io/etcd/integration" + grpctest "go.etcd.io/etcd/pkg/grpc_testing" "go.etcd.io/etcd/pkg/testutil" ) // This test mimics scenario described in grpc_naming.md doc. func TestEtcdGrpcResolver(t *testing.T) { - t.Skip("Not implemented yet") - defer testutil.AfterTest(t) + s1PayloadBody := []byte{'1'} + s1 := newDummyStubServer(s1PayloadBody) + if err := s1.Start(nil); err != nil { + t.Fatal("failed to start dummy grpc server (s1)", err) + } + defer s1.Stop() - // s1 := // TODO: Dummy GRPC service listening on 127.0.0.1:20000 - // s2 := // TODO: Dummy GRPC service listening on 127.0.0.1:20001 + s2PayloadBody := []byte{'2'} + s2 := newDummyStubServer(s2PayloadBody) + if err := s2.Start(nil); err != nil { + t.Fatal("failed to start dummy grpc server (s2)", err) + } + defer s2.Stop() - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) - em, err := endpoints.NewManager(clus.RandClient(), "foo") + em, err := endpoints.NewManager(clus.Client(0), "foo") if err != nil { t.Fatal("failed to create EndpointManager", err) } - e1 := endpoints.Endpoint{Addr: "127.0.0.1:20000"} - e2 := endpoints.Endpoint{Addr: "127.0.0.1:20001"} + e1 := endpoints.Endpoint{Addr: s1.Addr()} + e2 := endpoints.Endpoint{Addr: s2.Addr()} err = em.AddEndpoint(context.TODO(), "foo/e1", e1) if err != nil { t.Fatal("failed to add foo", err) } - etcdResolver, err := resolver.NewBuilder(clus.RandClient()) + r := &etcdnaming.GRPCResolver{Client: clus.Client(1)} + b := grpc.RoundRobin(r) - conn, err := grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver)) + conn, err := grpc.Dial("foo", grpc.WithInsecure(), grpc.WithBalancer(b)) if err != nil { - t.Fatal("failed to connect to foo (e1)", err) + t.Fatal("failed to connect to foo", err) } + defer conn.Close() - // TODO: send requests to conn, ensure s1 received it. + c := testpb.NewTestServiceClient(conn) + resp, err := c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}, grpc.WaitForReady(true)) + if err != nil { + t.Fatal("failed to invoke rpc to foo (e1)", err) + } + if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s1PayloadBody) { + t.Fatalf("unexpected response from foo (e1): %s", resp.GetPayload().GetBody()) + } em.DeleteEndpoint(context.TODO(), "foo/e1") em.AddEndpoint(context.TODO(), "foo/e2", e2) - // TODO: Send requests to conn and make sure s2 receive it. - // Might require restarting s1 to break the existing (open) connection. + // We use a loop with deadline of 30s to avoid test getting flake + // as it's asynchronous for gRPC Client to update underlying connections. + maxRetries := 300 + retryPeriod := 100 * time.Millisecond + retries := 0 + for { + time.Sleep(retryPeriod) + retries++ - conn.GetState() // this line is to avoid compiler warning that conn is unused. + resp, err = c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}) + if err != nil { + if retries < maxRetries { + continue + } + t.Fatal("failed to invoke rpc to foo (e2)", err) + } + if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s2PayloadBody) { + if retries < maxRetries { + continue + } + t.Fatalf("unexpected response from foo (e2): %s", resp.GetPayload().GetBody()) + } + + break + } +} + +func newDummyStubServer(body []byte) *grpctest.StubServer { + return &grpctest.StubServer{ + UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{ + Payload: &testpb.Payload{ + Type: testpb.PayloadType_COMPRESSABLE, + Body: body, + }, + }, nil + }, + } } diff --git a/clientv3/naming/endpoints/endpoints_impl.go b/clientv3/naming/endpoints/endpoints_impl.go index beeb58eea..e1767c8d1 100644 --- a/clientv3/naming/endpoints/endpoints_impl.go +++ b/clientv3/naming/endpoints/endpoints_impl.go @@ -18,37 +18,66 @@ package endpoints import ( "context" + "encoding/json" + "errors" "fmt" + "strings" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/naming/endpoints/internal" ) type endpointManager struct { - // TODO: To be implemented, tracked by: https://github.com/etcd-io/etcd/issues/12652 + client *clientv3.Client + target string } func NewManager(client *clientv3.Client, target string) (Manager, error) { - // To be implemented (https://github.com/etcd-io/etcd/issues/12652) - return nil, fmt.Errorf("Not implemented yet") + if client == nil { + return nil, errors.New("invalid etcd client") + } + + if target == "" { + return nil, errors.New("invalid target") + } + + em := &endpointManager{ + client: client, + target: target, + } + return em, nil } -func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) error { - // TODO: For loop in a single transaction: - internalUpdate := &internal.Update{} // translate UpdateWithOpts into json format. - switch internalUpdate.Op { - //case internal.Add: - // var v []byte - // if v, err = json.Marshal(internalUpdate); err != nil { - // return status.Error(codes.InvalidArgument, err.Error()) - // } - // _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...) - //case internal.Delete: - // _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...) - //default: - // return status.Error(codes.InvalidArgument, "naming: bad naming op") +func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) (err error) { + ops := make([]clientv3.Op, 0, len(updates)) + for _, update := range updates { + if !strings.HasPrefix(update.Key, m.target+"/") { + return status.Errorf(codes.InvalidArgument, "endpoints: endpoint key should be prefixed with %s/", m.target) + } + switch update.Op { + case Add: + internalUpdate := &internal.Update{ + Op: internal.Add, + Addr: update.Endpoint.Addr, + Metadata: update.Endpoint.Metadata, + } + + var v []byte + if v, err = json.Marshal(internalUpdate); err != nil { + return status.Error(codes.InvalidArgument, err.Error()) + } + ops = append(ops, clientv3.OpPut(update.Key, string(v), update.Opts...)) + case Delete: + ops = append(ops, clientv3.OpDelete(update.Key, update.Opts...)) + default: + return status.Error(codes.InvalidArgument, "endpoints: bad update op") + } } - return fmt.Errorf("Not implemented yet") + _, err = m.client.KV.Txn(ctx).Then(ops...).Commit() + return err } func (m *endpointManager) AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error { @@ -130,6 +159,19 @@ func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, er } func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) { - // TODO: Implementation - return nil, fmt.Errorf("Not implemented yet") + resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable()) + if err != nil { + return nil, err + } + + eps := make(Key2EndpointMap) + for _, kv := range resp.Kvs { + var iup internal.Update + if err := json.Unmarshal(kv.Value, &iup); err != nil { + continue + } + + eps[string(kv.Key)] = Endpoint{Addr: iup.Addr, Metadata: iup.Metadata} + } + return eps, nil } diff --git a/pkg/grpc_testing/stub_server.go b/pkg/grpc_testing/stub_server.go new file mode 100644 index 000000000..b2892ef0d --- /dev/null +++ b/pkg/grpc_testing/stub_server.go @@ -0,0 +1,100 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpc_testing + +import ( + "context" + "fmt" + "net" + + "google.golang.org/grpc" + testpb "google.golang.org/grpc/test/grpc_testing" +) + +// StubServer is borrowed from the interal package of grpc-go. +// See https://github.com/grpc/grpc-go/blob/master/internal/stubserver/stubserver.go +// Since it cannot be imported directly, we have to copy and paste it here, +// and useless code for our testing is removed. + +// StubServer is a server that is easy to customize within individual test +// cases. +type StubServer struct { + // Guarantees we satisfy this interface; panics if unimplemented methods are called. + testpb.TestServiceServer + + EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) + UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) + FullDuplexCallF func(stream testpb.TestService_FullDuplexCallServer) error + + s *grpc.Server + + // Network and Address are parameters for Listen. Defaults will be used if these are empty before Start. + Network string + Address string + + cleanups []func() // Lambdas executed in Stop(); populated by Start(). +} + +// EmptyCall is the handler for testpb.EmptyCall. +func (ss *StubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return ss.EmptyCallF(ctx, in) +} + +// UnaryCall is the handler for testpb.UnaryCall. +func (ss *StubServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return ss.UnaryCallF(ctx, in) +} + +// FullDuplexCall is the handler for testpb.FullDuplexCall. +func (ss *StubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { + return ss.FullDuplexCallF(stream) +} + +// Start starts the server and creates a client connected to it. +func (ss *StubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error { + if ss.Network == "" { + ss.Network = "tcp" + } + if ss.Address == "" { + ss.Address = "localhost:0" + } + + lis, err := net.Listen(ss.Network, ss.Address) + if err != nil { + return fmt.Errorf("net.Listen(%q, %q) = %v", ss.Network, ss.Address, err) + } + ss.Address = lis.Addr().String() + ss.cleanups = append(ss.cleanups, func() { lis.Close() }) + + s := grpc.NewServer(sopts...) + testpb.RegisterTestServiceServer(s, ss) + go s.Serve(lis) + ss.cleanups = append(ss.cleanups, s.Stop) + ss.s = s + + return nil +} + +// Stop stops ss and cleans up all resources it consumed. +func (ss *StubServer) Stop() { + for i := len(ss.cleanups) - 1; i >= 0; i-- { + ss.cleanups[i]() + } +} + +// Addr gets the address the server listening on. +func (ss *StubServer) Addr() string { + return ss.Address +}