mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
clientv3/naming/endpoints: fix endpoints prefix bug
fixes bug with multiple endpoints with same prefix Signed-off-by: Ramil Mirhasanov <ramil600@yahoo.com>
This commit is contained in:
parent
721d9feb0e
commit
2158f21ad5
@ -78,7 +78,8 @@ func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts .
|
||||
}
|
||||
|
||||
func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) {
|
||||
resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable())
|
||||
key := m.target + "/"
|
||||
resp, err := m.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSerializable())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -112,7 +113,8 @@ func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Upd
|
||||
|
||||
lg := m.client.GetLogger()
|
||||
opts := []clientv3.OpOption{clientv3.WithRev(rev), clientv3.WithPrefix()}
|
||||
wch := m.client.Watch(ctx, m.target, opts...)
|
||||
key := m.target + "/"
|
||||
wch := m.client.Watch(ctx, key, opts...)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -157,7 +159,8 @@ func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Upd
|
||||
}
|
||||
|
||||
func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) {
|
||||
resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable())
|
||||
key := m.target + "/"
|
||||
resp, err := m.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSerializable())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.etcd.io/etcd/client/v3/naming/endpoints"
|
||||
"go.etcd.io/etcd/client/v3/naming/resolver"
|
||||
"go.etcd.io/etcd/pkg/v3/grpc_testing"
|
||||
@ -112,3 +113,41 @@ func TestEtcdGrpcResolver(t *testing.T) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdEndpointManager(t *testing.T) {
|
||||
integration2.BeforeTest(t)
|
||||
|
||||
s1PayloadBody := []byte{'1'}
|
||||
s1 := grpc_testing.NewDummyStubServer(s1PayloadBody)
|
||||
err := s1.Start(nil)
|
||||
assert.NoError(t, err)
|
||||
defer s1.Stop()
|
||||
|
||||
s2PayloadBody := []byte{'2'}
|
||||
s2 := grpc_testing.NewDummyStubServer(s2PayloadBody)
|
||||
err = s2.Start(nil)
|
||||
assert.NoError(t, err)
|
||||
defer s2.Stop()
|
||||
|
||||
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
// Check if any endpoint with the same prefix "foo" will not break the logic with multiple endpoints
|
||||
em, err := endpoints.NewManager(clus.Client(0), "foo")
|
||||
assert.NoError(t, err)
|
||||
emOther, err := endpoints.NewManager(clus.Client(1), "foo_other")
|
||||
assert.NoError(t, err)
|
||||
|
||||
e1 := endpoints.Endpoint{Addr: s1.Addr()}
|
||||
e2 := endpoints.Endpoint{Addr: s2.Addr()}
|
||||
|
||||
em.AddEndpoint(context.Background(), "foo/e1", e1)
|
||||
emOther.AddEndpoint(context.Background(), "foo_other/e2", e2)
|
||||
|
||||
epts, err := em.List(context.Background())
|
||||
assert.NoError(t, err)
|
||||
eptsOther, err := emOther.List(context.Background())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(epts), 1)
|
||||
assert.Equal(t, len(eptsOther), 1)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user