Merge pull request #15021 from ramil600/em_manager

clientv3/naming/endpoints: fix endpoints prefix bug
This commit is contained in:
Benjamin Wang 2022-12-26 10:19:26 +08:00 committed by GitHub
commit 9169522d72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 3 deletions

View File

@ -92,7 +92,8 @@ func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts .
} }
func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) { func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) {
resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable()) key := m.target + "/"
resp, err := m.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -126,7 +127,8 @@ func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Upd
lg := m.client.GetLogger() lg := m.client.GetLogger()
opts := []clientv3.OpOption{clientv3.WithRev(rev), clientv3.WithPrefix()} opts := []clientv3.OpOption{clientv3.WithRev(rev), clientv3.WithPrefix()}
wch := m.client.Watch(ctx, m.target, opts...) key := m.target + "/"
wch := m.client.Watch(ctx, key, opts...)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -171,7 +173,8 @@ func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Upd
} }
func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) { func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) {
resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable()) key := m.target + "/"
resp, err := m.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -20,6 +20,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/client/v3/naming/endpoints" "go.etcd.io/etcd/client/v3/naming/endpoints"
"go.etcd.io/etcd/client/v3/naming/resolver" "go.etcd.io/etcd/client/v3/naming/resolver"
"go.etcd.io/etcd/pkg/v3/grpc_testing" "go.etcd.io/etcd/pkg/v3/grpc_testing"
@ -111,3 +112,41 @@ func TestEtcdGrpcResolver(t *testing.T) {
break break
} }
} }
func TestEtcdEndpointManager(t *testing.T) {
integration2.BeforeTest(t)
s1PayloadBody := []byte{'1'}
s1 := grpc_testing.NewDummyStubServer(s1PayloadBody)
err := s1.Start(nil)
assert.NoError(t, err)
defer s1.Stop()
s2PayloadBody := []byte{'2'}
s2 := grpc_testing.NewDummyStubServer(s2PayloadBody)
err = s2.Start(nil)
assert.NoError(t, err)
defer s2.Stop()
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3})
defer clus.Terminate(t)
// Check if any endpoint with the same prefix "foo" will not break the logic with multiple endpoints
em, err := endpoints.NewManager(clus.Client(0), "foo")
assert.NoError(t, err)
emOther, err := endpoints.NewManager(clus.Client(1), "foo_other")
assert.NoError(t, err)
e1 := endpoints.Endpoint{Addr: s1.Addr()}
e2 := endpoints.Endpoint{Addr: s2.Addr()}
em.AddEndpoint(context.Background(), "foo/e1", e1)
emOther.AddEndpoint(context.Background(), "foo_other/e2", e2)
epts, err := em.List(context.Background())
assert.NoError(t, err)
eptsOther, err := emOther.List(context.Background())
assert.NoError(t, err)
assert.Equal(t, len(epts), 1)
assert.Equal(t, len(eptsOther), 1)
}