diff --git a/client/v3/naming/endpoints/endpoints_impl.go b/client/v3/naming/endpoints/endpoints_impl.go index 37f04803e..94cb73188 100644 --- a/client/v3/naming/endpoints/endpoints_impl.go +++ b/client/v3/naming/endpoints/endpoints_impl.go @@ -78,7 +78,8 @@ func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts . } func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) { - resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable()) + key := m.target + "/" + resp, err := m.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSerializable()) if err != nil { return nil, err } @@ -112,7 +113,8 @@ func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Upd lg := m.client.GetLogger() opts := []clientv3.OpOption{clientv3.WithRev(rev), clientv3.WithPrefix()} - wch := m.client.Watch(ctx, m.target, opts...) + key := m.target + "/" + wch := m.client.Watch(ctx, key, opts...) for { select { case <-ctx.Done(): @@ -157,7 +159,8 @@ func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Upd } func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) { - resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable()) + key := m.target + "/" + resp, err := m.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSerializable()) if err != nil { return nil, err } diff --git a/tests/integration/clientv3/naming/resolver_test.go b/tests/integration/clientv3/naming/resolver_test.go index 980580c16..fc507d7bf 100644 --- a/tests/integration/clientv3/naming/resolver_test.go +++ b/tests/integration/clientv3/naming/resolver_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "go.etcd.io/etcd/client/v3/naming/endpoints" "go.etcd.io/etcd/client/v3/naming/resolver" "go.etcd.io/etcd/pkg/v3/grpc_testing" @@ -112,3 +113,41 @@ func TestEtcdGrpcResolver(t *testing.T) { break } } + +func TestEtcdEndpointManager(t *testing.T) { + integration2.BeforeTest(t) + + s1PayloadBody := []byte{'1'} + s1 := grpc_testing.NewDummyStubServer(s1PayloadBody) + err := s1.Start(nil) + assert.NoError(t, err) + defer s1.Stop() + + s2PayloadBody := []byte{'2'} + s2 := grpc_testing.NewDummyStubServer(s2PayloadBody) + err = s2.Start(nil) + assert.NoError(t, err) + defer s2.Stop() + + clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + // Check if any endpoint with the same prefix "foo" will not break the logic with multiple endpoints + em, err := endpoints.NewManager(clus.Client(0), "foo") + assert.NoError(t, err) + emOther, err := endpoints.NewManager(clus.Client(1), "foo_other") + assert.NoError(t, err) + + e1 := endpoints.Endpoint{Addr: s1.Addr()} + e2 := endpoints.Endpoint{Addr: s2.Addr()} + + em.AddEndpoint(context.Background(), "foo/e1", e1) + emOther.AddEndpoint(context.Background(), "foo_other/e2", e2) + + epts, err := em.List(context.Background()) + assert.NoError(t, err) + eptsOther, err := emOther.List(context.Background()) + assert.NoError(t, err) + assert.Equal(t, len(epts), 1) + assert.Equal(t, len(eptsOther), 1) +}