diff --git a/client/v3/naming/endpoints/endpoints_impl.go b/client/v3/naming/endpoints/endpoints_impl.go index 61abb832e..37f04803e 100644 --- a/client/v3/naming/endpoints/endpoints_impl.go +++ b/client/v3/naming/endpoints/endpoints_impl.go @@ -43,7 +43,7 @@ func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) ops := make([]clientv3.Op, 0, len(updates)) for _, update := range updates { if !strings.HasPrefix(update.Key, m.target+"/") { - return status.Errorf(codes.InvalidArgument, "endpoints: endpoint key should be prefixed with %s/", m.target) + return status.Errorf(codes.InvalidArgument, "endpoints: endpoint key should be prefixed with '%s/' got: '%s'", m.target, update.Key) } switch update.Op { diff --git a/client/v3/naming/grpc.go b/client/v3/naming/grpc.go deleted file mode 100644 index d12e7e2b8..000000000 --- a/client/v3/naming/grpc.go +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package naming - -import ( - "context" - "encoding/json" - "fmt" - - etcd "go.etcd.io/etcd/client/v3" - - "google.golang.org/grpc/codes" - "google.golang.org/grpc/naming" - "google.golang.org/grpc/status" -) - -var ErrWatcherClosed = fmt.Errorf("naming: watch closed") - -// GRPCResolver creates a grpc.Watcher for a target to track its resolution changes. -type GRPCResolver struct { - // Client is an initialized etcd client. - Client *etcd.Client -} - -func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) { - switch nm.Op { - case naming.Add: - var v []byte - if v, err = json.Marshal(nm); err != nil { - return status.Error(codes.InvalidArgument, err.Error()) - } - _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...) - case naming.Delete: - _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...) - default: - return status.Error(codes.InvalidArgument, "naming: bad naming op") - } - return err -} - -func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) { - ctx, cancel := context.WithCancel(context.Background()) - w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel} - return w, nil -} - -type gRPCWatcher struct { - c *etcd.Client - target string - ctx context.Context - cancel context.CancelFunc - wch etcd.WatchChan - err error -} - -// Next gets the next set of updates from the etcd resolver. -// Calls to Next should be serialized; concurrent calls are not safe since -// there is no way to reconcile the update ordering. -func (gw *gRPCWatcher) Next() ([]*naming.Update, error) { - if gw.wch == nil { - // first Next() returns all addresses - return gw.firstNext() - } - if gw.err != nil { - return nil, gw.err - } - - // process new events on target/* - wr, ok := <-gw.wch - if !ok { - gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error()) - return nil, gw.err - } - if gw.err = wr.Err(); gw.err != nil { - return nil, gw.err - } - - updates := make([]*naming.Update, 0, len(wr.Events)) - for _, e := range wr.Events { - var jupdate naming.Update - var err error - switch e.Type { - case etcd.EventTypePut: - err = json.Unmarshal(e.Kv.Value, &jupdate) - jupdate.Op = naming.Add - case etcd.EventTypeDelete: - err = json.Unmarshal(e.PrevKv.Value, &jupdate) - jupdate.Op = naming.Delete - default: - continue - } - if err == nil { - updates = append(updates, &jupdate) - } - } - return updates, nil -} - -func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) { - // Use serialized request so resolution still works if the target etcd - // server is partitioned away from the quorum. - resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable()) - if gw.err = err; err != nil { - return nil, err - } - - updates := make([]*naming.Update, 0, len(resp.Kvs)) - for _, kv := range resp.Kvs { - var jupdate naming.Update - if err := json.Unmarshal(kv.Value, &jupdate); err != nil { - continue - } - updates = append(updates, &jupdate) - } - - opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()} - gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...) - return updates, nil -} - -func (gw *gRPCWatcher) Close() { gw.cancel() } diff --git a/server/proxy/grpcproxy/cluster.go b/server/proxy/grpcproxy/cluster.go index 58e1a9e4a..1f7dccbe7 100644 --- a/server/proxy/grpcproxy/cluster.go +++ b/server/proxy/grpcproxy/cluster.go @@ -22,13 +22,11 @@ import ( "sync" pb "go.etcd.io/etcd/api/v3/etcdserverpb" - "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/client/v3/naming" + "go.etcd.io/etcd/client/v3/naming/endpoints" + "golang.org/x/time/rate" "go.uber.org/zap" - "golang.org/x/time/rate" - gnaming "google.golang.org/grpc/naming" ) // allow maximum 1 retry per second @@ -38,39 +36,51 @@ type clusterProxy struct { lg *zap.Logger clus clientv3.Cluster ctx context.Context - gr *naming.GRPCResolver // advertise client URL advaddr string prefix string + em endpoints.Manager + umu sync.RWMutex - umap map[string]gnaming.Update + umap map[string]endpoints.Endpoint } // NewClusterProxy takes optional prefix to fetch grpc-proxy member endpoints. // The returned channel is closed when there is grpc-proxy endpoint registered // and the client's context is canceled so the 'register' loop returns. +// TODO: Expand the API to report creation errors func NewClusterProxy(lg *zap.Logger, c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{}) { if lg == nil { lg = zap.NewNop() } + + var em endpoints.Manager + if advaddr != "" && prefix != "" { + var err error + if em, err = endpoints.NewManager(c, prefix); err != nil { + lg.Error("failed to provision endpointsManager", zap.String("prefix", prefix), zap.Error(err)) + return nil, nil + } + } + cp := &clusterProxy{ lg: lg, clus: c.Cluster, ctx: c.Ctx(), - gr: &naming.GRPCResolver{Client: c}, advaddr: advaddr, prefix: prefix, - umap: make(map[string]gnaming.Update), + umap: make(map[string]endpoints.Endpoint), + em: em, } donec := make(chan struct{}) - if advaddr != "" && prefix != "" { + if em != nil { go func() { defer close(donec) - cp.resolve(prefix) + cp.establishEndpointWatch(prefix) }() return cp, donec } @@ -79,38 +89,36 @@ func NewClusterProxy(lg *zap.Logger, c *clientv3.Client, advaddr string, prefix return cp, donec } -func (cp *clusterProxy) resolve(prefix string) { +func (cp *clusterProxy) establishEndpointWatch(prefix string) { rm := rate.NewLimiter(rate.Limit(resolveRetryRate), resolveRetryRate) for rm.Wait(cp.ctx) == nil { - wa, err := cp.gr.Resolve(prefix) + wc, err := cp.em.NewWatchChannel(cp.ctx) if err != nil { - cp.lg.Warn("failed to resolve prefix", zap.String("prefix", prefix), zap.Error(err)) + cp.lg.Warn("failed to establish endpoint watch", zap.String("prefix", prefix), zap.Error(err)) continue } - cp.monitor(wa) + cp.monitor(wc) } } -func (cp *clusterProxy) monitor(wa gnaming.Watcher) { - for cp.ctx.Err() == nil { - ups, err := wa.Next() - if err != nil { - cp.lg.Warn("clusterProxy watcher error", zap.Error(err)) - if rpctypes.ErrorDesc(err) == naming.ErrWatcherClosed.Error() { - return +func (cp *clusterProxy) monitor(wa endpoints.WatchChannel) { + for { + select { + case <-cp.ctx.Done(): + cp.lg.Info("watching endpoints interrupted", zap.Error(cp.ctx.Err())) + return + case updates := <-wa: + cp.umu.Lock() + for _, up := range updates { + switch up.Op { + case endpoints.Add: + cp.umap[up.Endpoint.Addr] = up.Endpoint + case endpoints.Delete: + delete(cp.umap, up.Endpoint.Addr) + } } + cp.umu.Unlock() } - - cp.umu.Lock() - for i := range ups { - switch ups[i].Op { - case gnaming.Add: - cp.umap[ups[i].Addr] = *ups[i] - case gnaming.Delete: - delete(cp.umap, ups[i].Addr) - } - } - cp.umu.Unlock() } } diff --git a/server/proxy/grpcproxy/register.go b/server/proxy/grpcproxy/register.go index dcd7d4d86..14ec034ea 100644 --- a/server/proxy/grpcproxy/register.go +++ b/server/proxy/grpcproxy/register.go @@ -20,11 +20,10 @@ import ( "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" - "go.etcd.io/etcd/client/v3/naming" + "go.etcd.io/etcd/client/v3/naming/endpoints" "go.uber.org/zap" "golang.org/x/time/rate" - gnaming "google.golang.org/grpc/naming" ) // allow maximum 1 retry per second @@ -68,8 +67,12 @@ func registerSession(lg *zap.Logger, c *clientv3.Client, prefix string, addr str return nil, err } - gr := &naming.GRPCResolver{Client: c} - if err = gr.Update(c.Ctx(), prefix, gnaming.Update{Op: gnaming.Add, Addr: addr, Metadata: getMeta()}, clientv3.WithLease(ss.Lease())); err != nil { + em, err := endpoints.NewManager(c, prefix) + if err != nil { + return nil, err + } + endpoint := endpoints.Endpoint{Addr: addr, Metadata: getMeta()} + if err = em.AddEndpoint(c.Ctx(), prefix+"/"+addr, endpoint, clientv3.WithLease(ss.Lease())); err != nil { return nil, err } diff --git a/tests/integration/clientv3/grpc_test.go b/tests/integration/clientv3/grpc_test.go deleted file mode 100644 index d2c13afab..000000000 --- a/tests/integration/clientv3/grpc_test.go +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package clientv3test - -import ( - "context" - "encoding/json" - "reflect" - "testing" - - etcd "go.etcd.io/etcd/client/v3" - namingv3 "go.etcd.io/etcd/client/v3/naming" - "go.etcd.io/etcd/pkg/v3/testutil" - "go.etcd.io/etcd/tests/v3/integration" - - "google.golang.org/grpc/naming" -) - -func TestGRPCResolver(t *testing.T) { - defer testutil.AfterTest(t) - - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - defer clus.Terminate(t) - - r := namingv3.GRPCResolver{ - Client: clus.RandClient(), - } - - w, err := r.Resolve("foo") - if err != nil { - t.Fatal("failed to resolve foo", err) - } - defer w.Close() - - addOp := naming.Update{Op: naming.Add, Addr: "127.0.0.1", Metadata: "metadata"} - err = r.Update(context.TODO(), "foo", addOp) - if err != nil { - t.Fatal("failed to add foo", err) - } - - us, err := w.Next() - if err != nil { - t.Fatal("failed to get udpate", err) - } - - wu := &naming.Update{ - Op: naming.Add, - Addr: "127.0.0.1", - Metadata: "metadata", - } - - if !reflect.DeepEqual(us[0], wu) { - t.Fatalf("up = %#v, want %#v", us[0], wu) - } - - delOp := naming.Update{Op: naming.Delete, Addr: "127.0.0.1"} - err = r.Update(context.TODO(), "foo", delOp) - if err != nil { - t.Fatalf("failed to udpate %v", err) - } - - us, err = w.Next() - if err != nil { - t.Fatalf("failed to get udpate %v", err) - } - - wu = &naming.Update{ - Op: naming.Delete, - Addr: "127.0.0.1", - Metadata: "metadata", - } - - if !reflect.DeepEqual(us[0], wu) { - t.Fatalf("up = %#v, want %#v", us[0], wu) - } -} - -// TestGRPCResolverMulti ensures the resolver will initialize -// correctly with multiple hosts and correctly receive multiple -// updates in a single revision. -func TestGRPCResolverMulti(t *testing.T) { - defer testutil.AfterTest(t) - - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - defer clus.Terminate(t) - c := clus.RandClient() - - v, verr := json.Marshal(naming.Update{Addr: "127.0.0.1", Metadata: "md"}) - if verr != nil { - t.Fatal(verr) - } - if _, err := c.Put(context.TODO(), "foo/host", string(v)); err != nil { - t.Fatal(err) - } - if _, err := c.Put(context.TODO(), "foo/host2", string(v)); err != nil { - t.Fatal(err) - } - - r := namingv3.GRPCResolver{Client: c} - - w, err := r.Resolve("foo") - if err != nil { - t.Fatal("failed to resolve foo", err) - } - defer w.Close() - - updates, nerr := w.Next() - if nerr != nil { - t.Fatal(nerr) - } - if len(updates) != 2 { - t.Fatalf("expected two updates, got %+v", updates) - } - - _, err = c.Txn(context.TODO()).Then(etcd.OpDelete("foo/host"), etcd.OpDelete("foo/host2")).Commit() - if err != nil { - t.Fatal(err) - } - - updates, nerr = w.Next() - if nerr != nil { - t.Fatal(nerr) - } - if len(updates) != 2 || (updates[0].Op != naming.Delete && updates[1].Op != naming.Delete) { - t.Fatalf("expected two updates, got %+v", updates) - } -} diff --git a/tests/integration/proxy/grpcproxy/register_test.go b/tests/integration/proxy/grpcproxy/register_test.go index d197f92c9..914986c5f 100644 --- a/tests/integration/proxy/grpcproxy/register_test.go +++ b/tests/integration/proxy/grpcproxy/register_test.go @@ -19,13 +19,12 @@ import ( "time" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/client/v3/naming" + "go.etcd.io/etcd/client/v3/naming/endpoints" "go.etcd.io/etcd/pkg/v3/testutil" "go.etcd.io/etcd/server/v3/proxy/grpcproxy" "go.etcd.io/etcd/tests/v3/integration" "go.uber.org/zap" - gnaming "google.golang.org/grpc/naming" ) func TestRegister(t *testing.T) { @@ -37,26 +36,16 @@ func TestRegister(t *testing.T) { paddr := clus.Members[0].GRPCAddr() testPrefix := "test-name" - wa := createWatcher(t, cli, testPrefix) - ups, err := wa.Next() - if err != nil { - t.Fatal(err) - } - if len(ups) != 0 { - t.Fatalf("len(ups) expected 0, got %d (%v)", len(ups), ups) - } + wa := mustCreateWatcher(t, cli, testPrefix) donec := grpcproxy.Register(zap.NewExample(), cli, testPrefix, paddr, 5) - ups, err = wa.Next() - if err != nil { - t.Fatal(err) - } + ups := <-wa if len(ups) != 1 { t.Fatalf("len(ups) expected 1, got %d (%v)", len(ups), ups) } - if ups[0].Addr != paddr { - t.Fatalf("ups[0].Addr expected %q, got %q", paddr, ups[0].Addr) + if ups[0].Endpoint.Addr != paddr { + t.Fatalf("ups[0].Addr expected %q, got %q", paddr, ups[0].Endpoint.Addr) } cli.Close() @@ -68,11 +57,14 @@ func TestRegister(t *testing.T) { } } -func createWatcher(t *testing.T, c *clientv3.Client, prefix string) gnaming.Watcher { - gr := &naming.GRPCResolver{Client: c} - watcher, err := gr.Resolve(prefix) +func mustCreateWatcher(t *testing.T, c *clientv3.Client, prefix string) endpoints.WatchChannel { + em, err := endpoints.NewManager(c, prefix) + if err != nil { + t.Fatalf("failed to create endpoints.Manager: %v", err) + } + wc, err := em.NewWatchChannel(c.Ctx()) if err != nil { t.Fatalf("failed to resolve %q (%v)", prefix, err) } - return watcher + return wc }