From f0153222f1f773c7d163ca7d7014c4887f3fb310 Mon Sep 17 00:00:00 2001 From: Ramil Mirhasanov Date: Mon, 19 Dec 2022 14:34:16 +0300 Subject: [PATCH] clientv3/naming/endpoints: fix endpoints prefix bug fixes bug with multiple endpoints with same prefix Signed-off-by: Ramil Mirhasanov --- client/v3/naming/endpoints/endpoints_impl.go | 9 +++-- .../clientv3/naming/resolver_test.go | 39 +++++++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/client/v3/naming/endpoints/endpoints_impl.go b/client/v3/naming/endpoints/endpoints_impl.go index 7796f7c9c..f88a3eed1 100644 --- a/client/v3/naming/endpoints/endpoints_impl.go +++ b/client/v3/naming/endpoints/endpoints_impl.go @@ -92,7 +92,8 @@ func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts . } func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) { - resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable()) + key := m.target + "/" + resp, err := m.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSerializable()) if err != nil { return nil, err } @@ -126,7 +127,8 @@ func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Upd lg := m.client.GetLogger() opts := []clientv3.OpOption{clientv3.WithRev(rev), clientv3.WithPrefix()} - wch := m.client.Watch(ctx, m.target, opts...) + key := m.target + "/" + wch := m.client.Watch(ctx, key, opts...) for { select { case <-ctx.Done(): @@ -171,7 +173,8 @@ func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Upd } func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) { - resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable()) + key := m.target + "/" + resp, err := m.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSerializable()) if err != nil { return nil, err } diff --git a/tests/integration/clientv3/naming/resolver_test.go b/tests/integration/clientv3/naming/resolver_test.go index 1f9d2a5fc..14c3ad723 100644 --- a/tests/integration/clientv3/naming/resolver_test.go +++ b/tests/integration/clientv3/naming/resolver_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "go.etcd.io/etcd/client/v3/naming/endpoints" "go.etcd.io/etcd/client/v3/naming/resolver" "go.etcd.io/etcd/pkg/v3/grpc_testing" @@ -111,3 +112,41 @@ func TestEtcdGrpcResolver(t *testing.T) { break } } + +func TestEtcdEndpointManager(t *testing.T) { + integration2.BeforeTest(t) + + s1PayloadBody := []byte{'1'} + s1 := grpc_testing.NewDummyStubServer(s1PayloadBody) + err := s1.Start(nil) + assert.NoError(t, err) + defer s1.Stop() + + s2PayloadBody := []byte{'2'} + s2 := grpc_testing.NewDummyStubServer(s2PayloadBody) + err = s2.Start(nil) + assert.NoError(t, err) + defer s2.Stop() + + clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + // Check if any endpoint with the same prefix "foo" will not break the logic with multiple endpoints + em, err := endpoints.NewManager(clus.Client(0), "foo") + assert.NoError(t, err) + emOther, err := endpoints.NewManager(clus.Client(1), "foo_other") + assert.NoError(t, err) + + e1 := endpoints.Endpoint{Addr: s1.Addr()} + e2 := endpoints.Endpoint{Addr: s2.Addr()} + + em.AddEndpoint(context.Background(), "foo/e1", e1) + emOther.AddEndpoint(context.Background(), "foo_other/e2", e2) + + epts, err := em.List(context.Background()) + assert.NoError(t, err) + eptsOther, err := emOther.List(context.Background()) + assert.NoError(t, err) + assert.Equal(t, len(epts), 1) + assert.Equal(t, len(eptsOther), 1) +}