mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
[3.4] backport 12675: Cleanup grpc clientv3/naming API
Signed-off-by: Chao Chen <chaochn@amazon.com>
This commit is contained in:
parent
7c4696a7e8
commit
e61f1d886b
@ -55,7 +55,7 @@ func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts)
|
|||||||
ops := make([]clientv3.Op, 0, len(updates))
|
ops := make([]clientv3.Op, 0, len(updates))
|
||||||
for _, update := range updates {
|
for _, update := range updates {
|
||||||
if !strings.HasPrefix(update.Key, m.target+"/") {
|
if !strings.HasPrefix(update.Key, m.target+"/") {
|
||||||
return status.Errorf(codes.InvalidArgument, "endpoints: endpoint key should be prefixed with %s/", m.target)
|
return status.Errorf(codes.InvalidArgument, "endpoints: endpoint key should be prefixed with '%s/' got: '%s'", m.target, update.Key)
|
||||||
}
|
}
|
||||||
switch update.Op {
|
switch update.Op {
|
||||||
case Add:
|
case Add:
|
||||||
|
@ -1,133 +0,0 @@
|
|||||||
// Copyright 2016 The etcd Authors
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package naming
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
etcd "go.etcd.io/etcd/clientv3"
|
|
||||||
|
|
||||||
"google.golang.org/grpc/codes"
|
|
||||||
"google.golang.org/grpc/naming"
|
|
||||||
"google.golang.org/grpc/status"
|
|
||||||
)
|
|
||||||
|
|
||||||
var ErrWatcherClosed = fmt.Errorf("naming: watch closed")
|
|
||||||
|
|
||||||
// GRPCResolver creates a grpc.Watcher for a target to track its resolution changes.
|
|
||||||
type GRPCResolver struct {
|
|
||||||
// Client is an initialized etcd client.
|
|
||||||
Client *etcd.Client
|
|
||||||
}
|
|
||||||
|
|
||||||
func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) {
|
|
||||||
switch nm.Op {
|
|
||||||
case naming.Add:
|
|
||||||
var v []byte
|
|
||||||
if v, err = json.Marshal(nm); err != nil {
|
|
||||||
return status.Error(codes.InvalidArgument, err.Error())
|
|
||||||
}
|
|
||||||
_, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)
|
|
||||||
case naming.Delete:
|
|
||||||
_, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)
|
|
||||||
default:
|
|
||||||
return status.Error(codes.InvalidArgument, "naming: bad naming op")
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel}
|
|
||||||
return w, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type gRPCWatcher struct {
|
|
||||||
c *etcd.Client
|
|
||||||
target string
|
|
||||||
ctx context.Context
|
|
||||||
cancel context.CancelFunc
|
|
||||||
wch etcd.WatchChan
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
// Next gets the next set of updates from the etcd resolver.
|
|
||||||
// Calls to Next should be serialized; concurrent calls are not safe since
|
|
||||||
// there is no way to reconcile the update ordering.
|
|
||||||
func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
|
|
||||||
if gw.wch == nil {
|
|
||||||
// first Next() returns all addresses
|
|
||||||
return gw.firstNext()
|
|
||||||
}
|
|
||||||
if gw.err != nil {
|
|
||||||
return nil, gw.err
|
|
||||||
}
|
|
||||||
|
|
||||||
// process new events on target/*
|
|
||||||
wr, ok := <-gw.wch
|
|
||||||
if !ok {
|
|
||||||
gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error())
|
|
||||||
return nil, gw.err
|
|
||||||
}
|
|
||||||
if gw.err = wr.Err(); gw.err != nil {
|
|
||||||
return nil, gw.err
|
|
||||||
}
|
|
||||||
|
|
||||||
updates := make([]*naming.Update, 0, len(wr.Events))
|
|
||||||
for _, e := range wr.Events {
|
|
||||||
var jupdate naming.Update
|
|
||||||
var err error
|
|
||||||
switch e.Type {
|
|
||||||
case etcd.EventTypePut:
|
|
||||||
err = json.Unmarshal(e.Kv.Value, &jupdate)
|
|
||||||
jupdate.Op = naming.Add
|
|
||||||
case etcd.EventTypeDelete:
|
|
||||||
err = json.Unmarshal(e.PrevKv.Value, &jupdate)
|
|
||||||
jupdate.Op = naming.Delete
|
|
||||||
default:
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err == nil {
|
|
||||||
updates = append(updates, &jupdate)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return updates, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
|
|
||||||
// Use serialized request so resolution still works if the target etcd
|
|
||||||
// server is partitioned away from the quorum.
|
|
||||||
resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
|
|
||||||
if gw.err = err; err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
updates := make([]*naming.Update, 0, len(resp.Kvs))
|
|
||||||
for _, kv := range resp.Kvs {
|
|
||||||
var jupdate naming.Update
|
|
||||||
if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
updates = append(updates, &jupdate)
|
|
||||||
}
|
|
||||||
|
|
||||||
opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
|
|
||||||
gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
|
|
||||||
return updates, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (gw *gRPCWatcher) Close() { gw.cancel() }
|
|
@ -1,139 +0,0 @@
|
|||||||
// Copyright 2016 The etcd Authors
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package naming_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"reflect"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
etcd "go.etcd.io/etcd/clientv3"
|
|
||||||
namingv3 "go.etcd.io/etcd/clientv3/naming"
|
|
||||||
"go.etcd.io/etcd/integration"
|
|
||||||
"go.etcd.io/etcd/pkg/testutil"
|
|
||||||
|
|
||||||
"google.golang.org/grpc/naming"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestGRPCResolver(t *testing.T) {
|
|
||||||
defer testutil.AfterTest(t)
|
|
||||||
|
|
||||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
|
||||||
defer clus.Terminate(t)
|
|
||||||
|
|
||||||
r := namingv3.GRPCResolver{
|
|
||||||
Client: clus.RandClient(),
|
|
||||||
}
|
|
||||||
|
|
||||||
w, err := r.Resolve("foo")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal("failed to resolve foo", err)
|
|
||||||
}
|
|
||||||
defer w.Close()
|
|
||||||
|
|
||||||
addOp := naming.Update{Op: naming.Add, Addr: "127.0.0.1", Metadata: "metadata"}
|
|
||||||
err = r.Update(context.TODO(), "foo", addOp)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal("failed to add foo", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
us, err := w.Next()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal("failed to get udpate", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
wu := &naming.Update{
|
|
||||||
Op: naming.Add,
|
|
||||||
Addr: "127.0.0.1",
|
|
||||||
Metadata: "metadata",
|
|
||||||
}
|
|
||||||
|
|
||||||
if !reflect.DeepEqual(us[0], wu) {
|
|
||||||
t.Fatalf("up = %#v, want %#v", us[0], wu)
|
|
||||||
}
|
|
||||||
|
|
||||||
delOp := naming.Update{Op: naming.Delete, Addr: "127.0.0.1"}
|
|
||||||
err = r.Update(context.TODO(), "foo", delOp)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to udpate %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
us, err = w.Next()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to get udpate %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
wu = &naming.Update{
|
|
||||||
Op: naming.Delete,
|
|
||||||
Addr: "127.0.0.1",
|
|
||||||
Metadata: "metadata",
|
|
||||||
}
|
|
||||||
|
|
||||||
if !reflect.DeepEqual(us[0], wu) {
|
|
||||||
t.Fatalf("up = %#v, want %#v", us[0], wu)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestGRPCResolverMulti ensures the resolver will initialize
|
|
||||||
// correctly with multiple hosts and correctly receive multiple
|
|
||||||
// updates in a single revision.
|
|
||||||
func TestGRPCResolverMulti(t *testing.T) {
|
|
||||||
defer testutil.AfterTest(t)
|
|
||||||
|
|
||||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
|
||||||
defer clus.Terminate(t)
|
|
||||||
c := clus.RandClient()
|
|
||||||
|
|
||||||
v, verr := json.Marshal(naming.Update{Addr: "127.0.0.1", Metadata: "md"})
|
|
||||||
if verr != nil {
|
|
||||||
t.Fatal(verr)
|
|
||||||
}
|
|
||||||
if _, err := c.Put(context.TODO(), "foo/host", string(v)); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if _, err := c.Put(context.TODO(), "foo/host2", string(v)); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
r := namingv3.GRPCResolver{c}
|
|
||||||
|
|
||||||
w, err := r.Resolve("foo")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal("failed to resolve foo", err)
|
|
||||||
}
|
|
||||||
defer w.Close()
|
|
||||||
|
|
||||||
updates, nerr := w.Next()
|
|
||||||
if nerr != nil {
|
|
||||||
t.Fatal(nerr)
|
|
||||||
}
|
|
||||||
if len(updates) != 2 {
|
|
||||||
t.Fatalf("expected two updates, got %+v", updates)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = c.Txn(context.TODO()).Then(etcd.OpDelete("foo/host"), etcd.OpDelete("foo/host2")).Commit()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
updates, nerr = w.Next()
|
|
||||||
if nerr != nil {
|
|
||||||
t.Fatal(nerr)
|
|
||||||
}
|
|
||||||
if len(updates) != 2 || (updates[0].Op != naming.Delete && updates[1].Op != naming.Delete) {
|
|
||||||
t.Fatalf("expected two updates, got %+v", updates)
|
|
||||||
}
|
|
||||||
}
|
|
@ -22,12 +22,10 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"go.etcd.io/etcd/clientv3"
|
"go.etcd.io/etcd/clientv3"
|
||||||
"go.etcd.io/etcd/clientv3/naming"
|
"go.etcd.io/etcd/clientv3/naming/endpoints"
|
||||||
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
|
||||||
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
||||||
|
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
gnaming "google.golang.org/grpc/naming"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// allow maximum 1 retry per second
|
// allow maximum 1 retry per second
|
||||||
@ -36,35 +34,46 @@ const resolveRetryRate = 1
|
|||||||
type clusterProxy struct {
|
type clusterProxy struct {
|
||||||
clus clientv3.Cluster
|
clus clientv3.Cluster
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
gr *naming.GRPCResolver
|
|
||||||
|
|
||||||
// advertise client URL
|
// advertise client URL
|
||||||
advaddr string
|
advaddr string
|
||||||
prefix string
|
prefix string
|
||||||
|
|
||||||
|
em endpoints.Manager
|
||||||
|
|
||||||
umu sync.RWMutex
|
umu sync.RWMutex
|
||||||
umap map[string]gnaming.Update
|
umap map[string]endpoints.Endpoint
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClusterProxy takes optional prefix to fetch grpc-proxy member endpoints.
|
// NewClusterProxy takes optional prefix to fetch grpc-proxy member endpoints.
|
||||||
// The returned channel is closed when there is grpc-proxy endpoint registered
|
// The returned channel is closed when there is grpc-proxy endpoint registered
|
||||||
// and the client's context is canceled so the 'register' loop returns.
|
// and the client's context is canceled so the 'register' loop returns.
|
||||||
|
// TODO: Expand the API to report creation errors
|
||||||
func NewClusterProxy(c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{}) {
|
func NewClusterProxy(c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{}) {
|
||||||
|
var em endpoints.Manager
|
||||||
|
if advaddr != "" && prefix != "" {
|
||||||
|
var err error
|
||||||
|
if em, err = endpoints.NewManager(c, prefix); err != nil {
|
||||||
|
plog.Errorf("failed to provision endpointsManager %q (%v)", prefix, err)
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
cp := &clusterProxy{
|
cp := &clusterProxy{
|
||||||
clus: c.Cluster,
|
clus: c.Cluster,
|
||||||
ctx: c.Ctx(),
|
ctx: c.Ctx(),
|
||||||
gr: &naming.GRPCResolver{Client: c},
|
|
||||||
|
|
||||||
advaddr: advaddr,
|
advaddr: advaddr,
|
||||||
prefix: prefix,
|
prefix: prefix,
|
||||||
umap: make(map[string]gnaming.Update),
|
umap: make(map[string]endpoints.Endpoint),
|
||||||
|
em: em,
|
||||||
}
|
}
|
||||||
|
|
||||||
donec := make(chan struct{})
|
donec := make(chan struct{})
|
||||||
if advaddr != "" && prefix != "" {
|
if em != nil {
|
||||||
go func() {
|
go func() {
|
||||||
defer close(donec)
|
defer close(donec)
|
||||||
cp.resolve(prefix)
|
cp.establishEndpointWatch(prefix)
|
||||||
}()
|
}()
|
||||||
return cp, donec
|
return cp, donec
|
||||||
}
|
}
|
||||||
@ -73,38 +82,36 @@ func NewClusterProxy(c *clientv3.Client, advaddr string, prefix string) (pb.Clus
|
|||||||
return cp, donec
|
return cp, donec
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *clusterProxy) resolve(prefix string) {
|
func (cp *clusterProxy) establishEndpointWatch(prefix string) {
|
||||||
rm := rate.NewLimiter(rate.Limit(resolveRetryRate), resolveRetryRate)
|
rm := rate.NewLimiter(rate.Limit(resolveRetryRate), resolveRetryRate)
|
||||||
for rm.Wait(cp.ctx) == nil {
|
for rm.Wait(cp.ctx) == nil {
|
||||||
wa, err := cp.gr.Resolve(prefix)
|
wc, err := cp.em.NewWatchChannel(cp.ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plog.Warningf("failed to resolve %q (%v)", prefix, err)
|
plog.Warningf("failed to establish endpoint watch %q (%v)", prefix, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
cp.monitor(wa)
|
cp.monitor(wc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *clusterProxy) monitor(wa gnaming.Watcher) {
|
func (cp *clusterProxy) monitor(wc endpoints.WatchChannel) {
|
||||||
for cp.ctx.Err() == nil {
|
for {
|
||||||
ups, err := wa.Next()
|
select {
|
||||||
if err != nil {
|
case <-cp.ctx.Done():
|
||||||
plog.Warningf("clusterProxy watcher error (%v)", err)
|
plog.Info("watching endpoints interrupted (%v)", cp.ctx.Err())
|
||||||
if rpctypes.ErrorDesc(err) == naming.ErrWatcherClosed.Error() {
|
return
|
||||||
return
|
case updates := <-wc:
|
||||||
|
cp.umu.Lock()
|
||||||
|
for _, up := range updates {
|
||||||
|
switch up.Op {
|
||||||
|
case endpoints.Add:
|
||||||
|
cp.umap[up.Endpoint.Addr] = up.Endpoint
|
||||||
|
case endpoints.Delete:
|
||||||
|
delete(cp.umap, up.Endpoint.Addr)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
cp.umu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
cp.umu.Lock()
|
|
||||||
for i := range ups {
|
|
||||||
switch ups[i].Op {
|
|
||||||
case gnaming.Add:
|
|
||||||
cp.umap[ups[i].Addr] = *ups[i]
|
|
||||||
case gnaming.Delete:
|
|
||||||
delete(cp.umap, ups[i].Addr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cp.umu.Unlock()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,10 +20,9 @@ import (
|
|||||||
|
|
||||||
"go.etcd.io/etcd/clientv3"
|
"go.etcd.io/etcd/clientv3"
|
||||||
"go.etcd.io/etcd/clientv3/concurrency"
|
"go.etcd.io/etcd/clientv3/concurrency"
|
||||||
"go.etcd.io/etcd/clientv3/naming"
|
"go.etcd.io/etcd/clientv3/naming/endpoints"
|
||||||
|
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
gnaming "google.golang.org/grpc/naming"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// allow maximum 1 retry per second
|
// allow maximum 1 retry per second
|
||||||
@ -67,8 +66,12 @@ func registerSession(c *clientv3.Client, prefix string, addr string, ttl int) (*
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
gr := &naming.GRPCResolver{Client: c}
|
em, err := endpoints.NewManager(c, prefix)
|
||||||
if err = gr.Update(c.Ctx(), prefix, gnaming.Update{Op: gnaming.Add, Addr: addr, Metadata: getMeta()}, clientv3.WithLease(ss.Lease())); err != nil {
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
endpoint := endpoints.Endpoint{Addr: addr, Metadata: getMeta()}
|
||||||
|
if err = em.AddEndpoint(c.Ctx(), prefix+"/"+addr, endpoint, clientv3.WithLease(ss.Lease())); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,11 +19,9 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.etcd.io/etcd/clientv3"
|
"go.etcd.io/etcd/clientv3"
|
||||||
"go.etcd.io/etcd/clientv3/naming"
|
"go.etcd.io/etcd/clientv3/naming/endpoints"
|
||||||
"go.etcd.io/etcd/integration"
|
"go.etcd.io/etcd/integration"
|
||||||
"go.etcd.io/etcd/pkg/testutil"
|
"go.etcd.io/etcd/pkg/testutil"
|
||||||
|
|
||||||
gnaming "google.golang.org/grpc/naming"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRegister(t *testing.T) {
|
func TestRegister(t *testing.T) {
|
||||||
@ -35,26 +33,16 @@ func TestRegister(t *testing.T) {
|
|||||||
paddr := clus.Members[0].GRPCAddr()
|
paddr := clus.Members[0].GRPCAddr()
|
||||||
|
|
||||||
testPrefix := "test-name"
|
testPrefix := "test-name"
|
||||||
wa := createWatcher(t, cli, testPrefix)
|
wa := mustCreateWatcher(t, cli, testPrefix)
|
||||||
ups, err := wa.Next()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if len(ups) != 0 {
|
|
||||||
t.Fatalf("len(ups) expected 0, got %d (%v)", len(ups), ups)
|
|
||||||
}
|
|
||||||
|
|
||||||
donec := Register(cli, testPrefix, paddr, 5)
|
donec := Register(cli, testPrefix, paddr, 5)
|
||||||
|
|
||||||
ups, err = wa.Next()
|
ups := <-wa
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if len(ups) != 1 {
|
if len(ups) != 1 {
|
||||||
t.Fatalf("len(ups) expected 1, got %d (%v)", len(ups), ups)
|
t.Fatalf("len(ups) expected 1, got %d (%v)", len(ups), ups)
|
||||||
}
|
}
|
||||||
if ups[0].Addr != paddr {
|
if ups[0].Endpoint.Addr != paddr {
|
||||||
t.Fatalf("ups[0].Addr expected %q, got %q", paddr, ups[0].Addr)
|
t.Fatalf("ups[0].Addr expected %q, got %q", paddr, ups[0].Endpoint.Addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
cli.Close()
|
cli.Close()
|
||||||
@ -66,11 +54,14 @@ func TestRegister(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func createWatcher(t *testing.T, c *clientv3.Client, prefix string) gnaming.Watcher {
|
func mustCreateWatcher(t *testing.T, c *clientv3.Client, prefix string) endpoints.WatchChannel {
|
||||||
gr := &naming.GRPCResolver{Client: c}
|
em, err := endpoints.NewManager(c, prefix)
|
||||||
watcher, err := gr.Resolve(prefix)
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create endpoints.Manager: %v", err)
|
||||||
|
}
|
||||||
|
wc, err := em.NewWatchChannel(c.Ctx())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to resolve %q (%v)", prefix, err)
|
t.Fatalf("failed to resolve %q (%v)", prefix, err)
|
||||||
}
|
}
|
||||||
return watcher
|
return wc
|
||||||
}
|
}
|
||||||
|
4
test
4
test
@ -192,7 +192,8 @@ function integration_pass {
|
|||||||
INTEGTESTPKG=("${REPO_PATH}/integration"
|
INTEGTESTPKG=("${REPO_PATH}/integration"
|
||||||
"${REPO_PATH}/client/integration"
|
"${REPO_PATH}/client/integration"
|
||||||
"${REPO_PATH}/clientv3/integration/..."
|
"${REPO_PATH}/clientv3/integration/..."
|
||||||
"${REPO_PATH}/contrib/raftexample")
|
"${REPO_PATH}/contrib/raftexample"
|
||||||
|
"${REPO_PATH}/proxy/grpcproxy")
|
||||||
else
|
else
|
||||||
INTEGTESTPKG=("${TEST[@]}")
|
INTEGTESTPKG=("${TEST[@]}")
|
||||||
fi
|
fi
|
||||||
@ -205,6 +206,7 @@ function integration_extra {
|
|||||||
go test -timeout 25m -v ${RACE} -cpu "${TEST_CPUS}" "$@" "${REPO_PATH}/clientv3/integration/..."
|
go test -timeout 25m -v ${RACE} -cpu "${TEST_CPUS}" "$@" "${REPO_PATH}/clientv3/integration/..."
|
||||||
go test -timeout 1m -v -cpu "${TEST_CPUS}" "$@" "${REPO_PATH}/contrib/raftexample"
|
go test -timeout 1m -v -cpu "${TEST_CPUS}" "$@" "${REPO_PATH}/contrib/raftexample"
|
||||||
go test -timeout 5m -v ${RACE} -tags v2v3 "$@" "${REPO_PATH}/etcdserver/api/v2store"
|
go test -timeout 5m -v ${RACE} -tags v2v3 "$@" "${REPO_PATH}/etcdserver/api/v2store"
|
||||||
|
go test -timeout 5m -v ${RACE} -cpu "${TEST_CPUS}" "$@" "${REPO_PATH}/proxy/grpcproxy"
|
||||||
go test -timeout 1m -v ${RACE} -cpu "${TEST_CPUS}" -run=Example "$@" "${TEST[@]}"
|
go test -timeout 1m -v ${RACE} -cpu "${TEST_CPUS}" -run=Example "$@" "${TEST[@]}"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user