From 5d7c1db3a96c7ed13568e21efc19ee04403e69be Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Thu, 21 Jan 2021 22:13:38 +0100 Subject: [PATCH] Introduce grpc-1.30+ compatible client/v3/naming API. This is not yet implementation, just API and tests to be filled with implementation in next CLs, tracked by: https://github.com/etcd-io/etcd/issues/12652 We propose here 3 packages: - clientv3/naming/endpoints -> That is abstraction layer over etcd that allows to write, read & watch Endpoints information. It's independent from GRPC API. It hides the storage details. - clientv3/naming/endpoints/internal -> That contains the grpc's compatible Update class to preserve the internal JSON mashalling format. - clientv3/naming/resolver -> That implements the GRPC resolver API, such that etcd can be used for connection.Dial in grpc. Please see the grpc_naming.md document changes & grpcproxy/cluster.go new integration, to see how the new abstractions work. --- Documentation/dev-guide/grpc_naming.md | 48 +++++-- client/v3/naming/doc.go | 31 ++-- client/v3/naming/endpoints/endpoints.go | 82 +++++++++++ client/v3/naming/endpoints/endpoints_impl.go | 121 ++++++++++++++++ client/v3/naming/endpoints/internal/update.go | 38 +++++ client/v3/naming/resolver/resolver.go | 24 ++++ .../clientv3/naming/endpoints_test.go | 135 ++++++++++++++++++ .../integration/clientv3/naming/main_test.go | 15 ++ .../clientv3/naming/resolver_test.go | 70 +++++++++ 9 files changed, 536 insertions(+), 28 deletions(-) create mode 100644 client/v3/naming/endpoints/endpoints.go create mode 100644 client/v3/naming/endpoints/endpoints_impl.go create mode 100644 client/v3/naming/endpoints/internal/update.go create mode 100644 client/v3/naming/resolver/resolver.go create mode 100644 tests/integration/clientv3/naming/endpoints_test.go create mode 100644 tests/integration/clientv3/naming/main_test.go create mode 100644 tests/integration/clientv3/naming/resolver_test.go diff --git a/Documentation/dev-guide/grpc_naming.md b/Documentation/dev-guide/grpc_naming.md index 6dfa4863f..6038b4c0b 100644 --- a/Documentation/dev-guide/grpc_naming.md +++ b/Documentation/dev-guide/grpc_naming.md @@ -8,40 +8,41 @@ etcd provides a gRPC resolver to support an alternative name system that fetches ## Using etcd discovery with go-grpc -The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client and given a target for resolution: +The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client: ```go import ( "go.etcd.io/etcd/v3/clientv3" - etcdnaming "go.etcd.io/etcd/v3/clientv3/naming" + resolver "go.etcd.io/etcd/v3/clientv3/naming/resolver" "google.golang.org/grpc" ) -... - cli, cerr := clientv3.NewFromURL("http://localhost:2379") -r := &etcdnaming.GRPCResolver{Client: cli} -b := grpc.RoundRobin(r) -conn, gerr := grpc.Dial("my-service", grpc.WithBalancer(b), grpc.WithBlock(), ...) +etcdResolver, err := resolver.NewBuilder(clus.RandClient()); +conn, gerr := grpc.Dial("etcd://foo/bar/my-service", grpc.WithResolvers(etcdResolver)) ``` ## Managing service endpoints -The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "my-service/") with JSON-encoded go-grpc `naming.Update` values as potential service endpoints. Endpoints are added to the service by creating new keys and removed from the service by deleting keys. +The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "foo/bar/my-service/") +with JSON-encoded (historically go-grpc `naming.Update`) values as potential service endpoints. +Endpoints are added to the service by creating new keys and removed from the service by deleting keys. ### Adding an endpoint New endpoints can be added to the service through `etcdctl`: ```sh -ETCDCTL_API=3 etcdctl put my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}' +ETCDCTL_API=3 etcdctl put foo/bar/my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}' ``` -The etcd client's `GRPCResolver.Update` method can also register new endpoints with a key matching the `Addr`: +The etcd client's `endpoints.Manager` method can also register new endpoints with a key matching the `Addr`: ```go -r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Add, Addr: "1.2.3.4", Metadata: "..."}) + +em := endpoints.NewManager(client, "foo/bar/my-service") +err := em.AddEndpoint(context.TODO(),"foo/bar/my-service/e1", endpoints.Endpoint{Addr:"1.2.3.4"}); ``` ### Deleting an endpoint @@ -49,13 +50,14 @@ r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Add, Addr: "1.2. Hosts can be deleted from the service through `etcdctl`: ```sh -ETCDCTL_API=3 etcdctl del my-service/1.2.3.4 +ETCDCTL_API=3 etcdctl del foo/bar/my-service/1.2.3.4 ``` -The etcd client's `GRPCResolver.Update` method also supports deleting endpoints: +The etcd client's `endpoints.Manager` method also supports deleting endpoints: ```go -r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Delete, Addr: "1.2.3.4"}) +em := endpoints.NewManager(client, "foo/bar/my-service") +err := em.DeleteEndpoint(context.TODO(), "foo/bar/my-service/e1"); ``` ### Registering an endpoint with a lease @@ -67,3 +69,21 @@ lease=`ETCDCTL_API=3 etcdctl lease grant 5 | cut -f2 -d' '` ETCDCTL_API=3 etcdctl put --lease=$lease my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}' ETCDCTL_API=3 etcdctl lease keep-alive $lease ``` +In the golang: + +```go +em := endpoints.NewManager(client, "foo/bar/my-service") +err := endpoints.AddEndpoint(context.TODO(), "foo/bar/my-service/e1", endpoints.Endpoint{Addr:"1.2.3.4"}); +``` + +### Atomically updating endpoints + +If it's desired to modify multiple endpoints in a single transaction, `endpoints.Manager` can be used directly: + +``` +em := endpoints.NewManager(c, "foo") + +err := em.Update(context.TODO(), []*endpoints.UpdateWithOpts{ + endpoints.NewDeleteUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.4"}), + endpoints.NewAddUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.14"})}) +``` \ No newline at end of file diff --git a/client/v3/naming/doc.go b/client/v3/naming/doc.go index df141f41a..1b50eba7a 100644 --- a/client/v3/naming/doc.go +++ b/client/v3/naming/doc.go @@ -12,45 +12,48 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package naming provides an etcd-backed gRPC resolver for discovering gRPC services. +// Package naming provides: +// - subpackage endpoints: an abstraction layer to store and read endpoints +// information from etcd. +// - subpackage resolver: an etcd-backed gRPC resolver for discovering gRPC +// services based on the endpoints configuration // // To use, first import the packages: // // import ( // "go.etcd.io/etcd/client/v3" -// etcdnaming "go.etcd.io/etcd/client/v3/naming" -// +// "go.etcd.io/etcd/client/v3/naming/endpoints" +// "go.etcd.io/etcd/client/v3/naming/resolver" // "google.golang.org/grpc" -// "google.golang.org/grpc/naming" // ) // // First, register new endpoint addresses for a service: // // func etcdAdd(c *clientv3.Client, service, addr string) error { -// r := &etcdnaming.GRPCResolver{Client: c} -// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr}) +// em := endpoints.NewManager(c, service) +// return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr}); // } // // Dial an RPC service using the etcd gRPC resolver and a gRPC Balancer: // // func etcdDial(c *clientv3.Client, service string) (*grpc.ClientConn, error) { -// r := &etcdnaming.GRPCResolver{Client: c} -// b := grpc.RoundRobin(r) -// return grpc.Dial(service, grpc.WithBalancer(b)) +// etcdResolver, err := resolver.NewBuilder(c); +// if err { return nil, err } +// return grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver)) // } // // Optionally, force delete an endpoint: // // func etcdDelete(c *clientv3, service, addr string) error { -// r := &etcdnaming.GRPCResolver{Client: c} -// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Delete, Addr: "1.2.3.4"}) +// em := endpoints.NewManager(c, service) +// return em.DeleteEndpoint(c.Ctx(), service+"/"+addr) // } // // Or register an expiring endpoint with a lease: // -// func etcdLeaseAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error { -// r := &etcdnaming.GRPCResolver{Client: c} -// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr}, clientv3.WithLease(lid)) +// func etcdAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error { +// em := endpoints.NewManager(c, service) +// return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr}, clientv3.WithLease(lid)); // } // package naming diff --git a/client/v3/naming/endpoints/endpoints.go b/client/v3/naming/endpoints/endpoints.go new file mode 100644 index 000000000..329117af9 --- /dev/null +++ b/client/v3/naming/endpoints/endpoints.go @@ -0,0 +1,82 @@ +package endpoints + +import ( + "context" + + clientv3 "go.etcd.io/etcd/client/v3" +) + +// Endpoint represents a single address the connection can be established with. +// +// Inspired by: https://pkg.go.dev/google.golang.org/grpc/resolver#Address. +// Please document etcd version since which version each field is supported. +type Endpoint struct { + // Addr is the server address on which a connection will be established. + // Since etcd 3.1 + Addr string + + // Metadata is the information associated with Addr, which may be used + // to make load balancing decision. + // Since etcd 3.1 + Metadata interface{} +} + +type Operation uint8 + +const ( + // Add indicates an Endpoint is added. + Add Operation = iota + // Delete indicates an existing address is deleted. + Delete +) + +// Update describes a single edit action of an Endpoint. +type Update struct { + // Op - action Add or Delete. + Op Operation + Key string + Endpoint Endpoint +} + +// WatchChannel is used to deliver notifications about endpoints updates. +type WatchChannel chan []*Update + +// Key2EndpointMap maps etcd key into struct describing the endpoint. +type Key2EndpointMap map[string]Endpoint + +// UpdateWithOpts describes endpoint update (add or delete) together +// with etcd options (e.g. to attach an endpoint to a lease). +type UpdateWithOpts struct { + Update + Opts []clientv3.OpOption +} + +// NewAddUpdateOpts constructs UpdateWithOpts for endpoint registration. +func NewAddUpdateOpts(key string, endpoint Endpoint, opts ...clientv3.OpOption) *UpdateWithOpts { + return &UpdateWithOpts{Update: Update{Op: Add, Key: key, Endpoint: endpoint}, Opts: opts} +} + +// NewDeleteUpdateOpts constructs UpdateWithOpts for endpoint deletion. +func NewDeleteUpdateOpts(key string, opts ...clientv3.OpOption) *UpdateWithOpts { + return &UpdateWithOpts{Update: Update{Op: Delete, Key: key}, Opts: opts} +} + +// Manager can be used to add/remove & inspect endpoints stored in etcd for +// a particular target. +type Manager interface { + // Update allows to atomically add/remove a few endpoints from etcd. + Update(ctx context.Context, updates []*UpdateWithOpts) error + + // AddEndpoint registers a single endpoint in etcd. + // For more advanced use-cases use the Update method. + AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error + // DeleteEndpoint deletes a single endpoint stored in etcd. + // For more advanced use-cases use the Update method. + DeleteEndpoint(ctx context.Context, key string, opts ...clientv3.OpOption) error + + // List returns all the endpoints for the current target as a map. + List(ctx context.Context) (Key2EndpointMap, error) + // NewWatchChannel creates a channel that populates or endpoint updates. + // Cancel the 'ctx' to close the watcher. + NewWatchChannel(ctx context.Context) (WatchChannel, error) +} diff --git a/client/v3/naming/endpoints/endpoints_impl.go b/client/v3/naming/endpoints/endpoints_impl.go new file mode 100644 index 000000000..1fef5ea37 --- /dev/null +++ b/client/v3/naming/endpoints/endpoints_impl.go @@ -0,0 +1,121 @@ +package endpoints + +// TODO: The API is not yet implemented. + +import ( + "context" + "fmt" + + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/naming/endpoints/internal" +) + +type endpointManager struct { + // TODO: To be implemented, tracked by: https://github.com/etcd-io/etcd/issues/12652 +} + +func NewManager(client *clientv3.Client, target string) (Manager, error) { + // To be implemented (https://github.com/etcd-io/etcd/issues/12652) + return nil, fmt.Errorf("Not implemented yet") +} + +func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) error { + // TODO: For loop in a single transaction: + internalUpdate := &internal.Update{} // translate UpdateWithOpts into json format. + switch internalUpdate.Op { + //case internal.Add: + // var v []byte + // if v, err = json.Marshal(internalUpdate); err != nil { + // return status.Error(codes.InvalidArgument, err.Error()) + // } + // _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...) + //case internal.Delete: + // _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...) + //default: + // return status.Error(codes.InvalidArgument, "naming: bad naming op") + } + return fmt.Errorf("Not implemented yet") +} + +func (m *endpointManager) AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error { + return m.Update(ctx, []*UpdateWithOpts{NewAddUpdateOpts(key, endpoint, opts...)}) +} + +func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts ...clientv3.OpOption) error { + return m.Update(ctx, []*UpdateWithOpts{NewDeleteUpdateOpts(key, opts...)}) +} + +func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) { + return nil, fmt.Errorf("Not implemented yet") + + // TODO: Implementation to be inspired by: + // Next gets the next set of updates from the etcd resolver. + //// Calls to Next should be serialized; concurrent calls are not safe since + //// there is no way to reconcile the update ordering. + //func (gw *gRPCWatcher) Next() ([]*naming.Update, error) { + // if gw.wch == nil { + // // first Next() returns all addresses + // return gw.firstNext() + // } + // if gw.err != nil { + // return nil, gw.err + // } + // + // // process new events on target/* + // wr, ok := <-gw.wch + // if !ok { + // gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error()) + // return nil, gw.err + // } + // if gw.err = wr.Err(); gw.err != nil { + // return nil, gw.err + // } + // + // updates := make([]*naming.Update, 0, len(wr.Events)) + // for _, e := range wr.Events { + // var jupdate naming.Update + // var err error + // switch e.Type { + // case etcd.EventTypePut: + // err = json.Unmarshal(e.Kv.Value, &jupdate) + // jupdate.Op = naming.Add + // case etcd.EventTypeDelete: + // err = json.Unmarshal(e.PrevKv.Value, &jupdate) + // jupdate.Op = naming.Delete + // default: + // continue + // } + // if err == nil { + // updates = append(updates, &jupdate) + // } + // } + // return updates, nil + //} + // + //func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) { + // // Use serialized request so resolution still works if the target etcd + // // server is partitioned away from the quorum. + // resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable()) + // if gw.err = err; err != nil { + // return nil, err + // } + // + // updates := make([]*naming.Update, 0, len(resp.Kvs)) + // for _, kv := range resp.Kvs { + // var jupdate naming.Update + // if err := json.Unmarshal(kv.Value, &jupdate); err != nil { + // continue + // } + // updates = append(updates, &jupdate) + // } + // + // opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()} + // gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...) + // return updates, nil + //} +} + +func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) { + // TODO: Implementation + return nil, fmt.Errorf("Not implemented yet") +} diff --git a/client/v3/naming/endpoints/internal/update.go b/client/v3/naming/endpoints/internal/update.go new file mode 100644 index 000000000..71aa83fed --- /dev/null +++ b/client/v3/naming/endpoints/internal/update.go @@ -0,0 +1,38 @@ +package internal + +// Operation describes action performed on endpoint (addition vs deletion). +// Must stay JSON-format compatible with: +// https://pkg.go.dev/google.golang.org/grpc@v1.29.1/naming#Operation +type Operation uint8 + +const ( + // Add indicates a new address is added. + Add Operation = iota + // Delete indicates an existing address is deleted. + Delete +) + +// Update defines a persistent (JSON marshalled) format representing +// endpoint within the etcd storage. +// +// As the format can be persisted by one version of etcd client library and +// read by other the format must be kept backward compatible and +// in particular must be superset of the grpc(<=1.29.1) naming.Update structure: +// https://pkg.go.dev/google.golang.org/grpc@v1.29.1/naming#Update +// +// Please document since which version of etcd-client given property is supported. +// Please keep the naming consistent with e.g. https://pkg.go.dev/google.golang.org/grpc/resolver#Address. +// +// Notice that it is not valid having both empty string Addr and nil Metadata in an Update. +type Update struct { + // Op indicates the operation of the update. + // Since etcd 3.1. + Op Operation + // Addr is the updated address. It is empty string if there is no address update. + // Since etcd 3.1. + Addr string + // Metadata is the updated metadata. It is nil if there is no metadata update. + // Metadata is not required for a custom naming implementation. + // Since etcd 3.1. + Metadata interface{} +} diff --git a/client/v3/naming/resolver/resolver.go b/client/v3/naming/resolver/resolver.go new file mode 100644 index 000000000..0c453c22e --- /dev/null +++ b/client/v3/naming/resolver/resolver.go @@ -0,0 +1,24 @@ +package resolver + +import ( + clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/grpc/resolver" +) + +type builder struct { + // ... +} + +func (b builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + // To be implemented... + // Using endpoints.NewWatcher() to subscribe for endpoints changes. + return nil, nil +} + +func (b builder) Scheme() string { + return "etcd" +} + +func NewBuilder(client *clientv3.Client) (resolver.Builder, error) { + return builder{}, nil +} diff --git a/tests/integration/clientv3/naming/endpoints_test.go b/tests/integration/clientv3/naming/endpoints_test.go new file mode 100644 index 000000000..4bc8a6463 --- /dev/null +++ b/tests/integration/clientv3/naming/endpoints_test.go @@ -0,0 +1,135 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package naming_test + +import ( + "context" + "reflect" + "testing" + + etcd "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/naming/endpoints" + + "go.etcd.io/etcd/pkg/v3/testutil" + "go.etcd.io/etcd/tests/v3/integration" +) + +func TestEndpointManager(t *testing.T) { + t.Skip("Not implemented yet") + + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + em, err := endpoints.NewManager(clus.RandClient(), "foo") + if err != nil { + t.Fatal("failed to create EndpointManager", err) + } + ctx, watchCancel := context.WithCancel(context.Background()) + defer watchCancel() + w, err := em.NewWatchChannel(ctx) + if err != nil { + t.Fatal("failed to establish watch", err) + } + + e1 := endpoints.Endpoint{Addr: "127.0.0.1", Metadata: "metadata"} + err = em.AddEndpoint(context.TODO(), "foo/a1", e1) + if err != nil { + t.Fatal("failed to add foo", err) + } + + us := <-w + + if us == nil { + t.Fatal("failed to get update", err) + } + + wu := endpoints.Update{ + Op: endpoints.Add, + Key: "foo/a1", + Endpoint: e1, + } + + if !reflect.DeepEqual(us[0], wu) { + t.Fatalf("up = %#v, want %#v", us[0], wu) + } + + err = em.DeleteEndpoint(context.TODO(), "foo/a1") + if err != nil { + t.Fatalf("failed to udpate %v", err) + } + + us = <-w + if err != nil { + t.Fatalf("failed to get udpate %v", err) + } + + wu = endpoints.Update{ + Op: endpoints.Delete, + Key: "foo/a1", + } + + if !reflect.DeepEqual(us, wu) { + t.Fatalf("up = %#v, want %#v", us[1], wu) + } +} + +// TestEndpointManagerAtomicity ensures the resolver will initialize +// correctly with multiple hosts and correctly receive multiple +// updates in a single revision. +func TestEndpointManagerAtomicity(t *testing.T) { + t.Skip("Not implemented yet") + + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + c := clus.RandClient() + em, err := endpoints.NewManager(c, "foo") + if err != nil { + t.Fatal("failed to create EndpointManager", err) + } + + err = em.Update(context.TODO(), []*endpoints.UpdateWithOpts{ + endpoints.NewAddUpdateOpts("foo/host", endpoints.Endpoint{Addr: "127.0.0.1:2000"}), + endpoints.NewAddUpdateOpts("foo/host2", endpoints.Endpoint{Addr: "127.0.0.1:2001"})}) + if err != nil { + t.Fatal(err) + } + + ctx, watchCancel := context.WithCancel(context.Background()) + defer watchCancel() + w, err := em.NewWatchChannel(ctx) + if err != nil { + t.Fatal(err) + } + + updates := <-w + if len(updates) != 2 { + t.Fatalf("expected two updates, got %+v", updates) + } + + _, err = c.Txn(context.TODO()).Then(etcd.OpDelete("foo/host"), etcd.OpDelete("foo/host2")).Commit() + if err != nil { + t.Fatal(err) + } + + updates = <-w + if len(updates) != 2 || (updates[0].Op != endpoints.Delete && updates[1].Op != endpoints.Delete) { + t.Fatalf("expected two delete updates, got %+v", updates) + } +} diff --git a/tests/integration/clientv3/naming/main_test.go b/tests/integration/clientv3/naming/main_test.go new file mode 100644 index 000000000..cc511b14a --- /dev/null +++ b/tests/integration/clientv3/naming/main_test.go @@ -0,0 +1,15 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package naming_test + +import ( + "testing" + + "go.etcd.io/etcd/pkg/v3/testutil" +) + +func TestMain(m *testing.M) { + testutil.MustTestMainWithLeakDetection(m) +} diff --git a/tests/integration/clientv3/naming/resolver_test.go b/tests/integration/clientv3/naming/resolver_test.go new file mode 100644 index 000000000..f260095e0 --- /dev/null +++ b/tests/integration/clientv3/naming/resolver_test.go @@ -0,0 +1,70 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package naming_test + +import ( + "context" + "testing" + + "go.etcd.io/etcd/client/v3/naming/endpoints" + "go.etcd.io/etcd/client/v3/naming/resolver" + "google.golang.org/grpc" + + "go.etcd.io/etcd/pkg/v3/testutil" + "go.etcd.io/etcd/tests/v3/integration" +) + +// This test mimics scenario described in grpc_naming.md doc. + +func TestEtcdGrpcResolver(t *testing.T) { + t.Skip("Not implemented yet") + + defer testutil.AfterTest(t) + + // s1 := // TODO: Dummy GRPC service listening on 127.0.0.1:20000 + // s2 := // TODO: Dummy GRPC service listening on 127.0.0.1:20001 + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + em, err := endpoints.NewManager(clus.RandClient(), "foo") + if err != nil { + t.Fatal("failed to create EndpointManager", err) + } + + e1 := endpoints.Endpoint{Addr: "127.0.0.1:20000"} + e2 := endpoints.Endpoint{Addr: "127.0.0.1:20001"} + + err = em.AddEndpoint(context.TODO(), "foo/e1", e1) + if err != nil { + t.Fatal("failed to add foo", err) + } + etcdResolver, err := resolver.NewBuilder(clus.RandClient()) + + conn, err := grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver)) + if err != nil { + t.Fatal("failed to connect to foo (e1)", err) + } + + // TODO: send requests to conn, ensure s1 received it. + + em.DeleteEndpoint(context.TODO(), "foo/e1") + em.AddEndpoint(context.TODO(), "foo/e2", e2) + + // TODO: Send requests to conn and make sure s2 receive it. + // Might require restarting s1 to break the existing (open) connection. + + conn.GetState() // this line is to avoid compiler warning that conn is unused. +}