mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #16800 from chaochn47/release-3.4-clientv3-naming-impl
[3.4] Backport clientv3 naming implementation
This commit is contained in:
commit
e031012f74
@ -18,7 +18,7 @@ import (
|
||||
|
||||
cli, cerr := clientv3.NewFromURL("http://localhost:2379")
|
||||
etcdResolver, err := resolver.NewBuilder(clus.RandClient());
|
||||
conn, gerr := grpc.Dial("etcd://foo/bar/my-service", grpc.WithResolvers(etcdResolver))
|
||||
conn, gerr := grpc.Dial("etcd:///foo/bar/my-service", grpc.WithResolvers(etcdResolver))
|
||||
```
|
||||
|
||||
## Managing service endpoints
|
||||
@ -84,4 +84,4 @@ em := endpoints.NewManager(c, "foo")
|
||||
err := em.Update(context.TODO(), []*endpoints.UpdateWithOpts{
|
||||
endpoints.NewDeleteUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.4"}),
|
||||
endpoints.NewAddUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.14"})})
|
||||
```
|
||||
```
|
||||
|
@ -26,18 +26,19 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"go.etcd.io/etcd/clientv3/balancer"
|
||||
"go.etcd.io/etcd/clientv3/balancer/picker"
|
||||
"go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
|
||||
"go.etcd.io/etcd/clientv3/credentials"
|
||||
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"go.etcd.io/etcd/pkg/logutil"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
grpccredentials "google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"go.etcd.io/etcd/clientv3/balancer"
|
||||
"go.etcd.io/etcd/clientv3/balancer/picker"
|
||||
"go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
|
||||
"go.etcd.io/etcd/clientv3/credentials"
|
||||
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"go.etcd.io/etcd/pkg/logutil"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -95,7 +96,8 @@ type Client struct {
|
||||
|
||||
callOpts []grpc.CallOption
|
||||
|
||||
lg *zap.Logger
|
||||
lgMu *sync.RWMutex
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
// New creates a new etcdv3 client from a given configuration.
|
||||
@ -112,7 +114,7 @@ func New(cfg Config) (*Client, error) {
|
||||
// service interface implementations and do not need connection management.
|
||||
func NewCtxClient(ctx context.Context) *Client {
|
||||
cctx, cancel := context.WithCancel(ctx)
|
||||
return &Client{ctx: cctx, cancel: cancel}
|
||||
return &Client{ctx: cctx, cancel: cancel, lgMu: new(sync.RWMutex), lg: zap.NewNop()}
|
||||
}
|
||||
|
||||
// NewFromURL creates a new etcdv3 client from a URL.
|
||||
@ -125,6 +127,23 @@ func NewFromURLs(urls []string) (*Client, error) {
|
||||
return New(Config{Endpoints: urls})
|
||||
}
|
||||
|
||||
// WithLogger sets a logger
|
||||
func (c *Client) WithLogger(lg *zap.Logger) *Client {
|
||||
c.lgMu.Lock()
|
||||
c.lg = lg
|
||||
c.lgMu.Unlock()
|
||||
return c
|
||||
}
|
||||
|
||||
// GetLogger gets the logger.
|
||||
// NOTE: This method is for internal use of etcd-client library and should not be used as general-purpose logger.
|
||||
func (c *Client) GetLogger() *zap.Logger {
|
||||
c.lgMu.RLock()
|
||||
l := c.lg
|
||||
c.lgMu.RUnlock()
|
||||
return l
|
||||
}
|
||||
|
||||
// Close shuts down the client's etcd connections.
|
||||
func (c *Client) Close() error {
|
||||
c.cancel()
|
||||
@ -423,6 +442,7 @@ func newClient(cfg *Config) (*Client, error) {
|
||||
cancel: cancel,
|
||||
mu: new(sync.RWMutex),
|
||||
callOpts: defaultCallOpts,
|
||||
lgMu: new(sync.RWMutex),
|
||||
}
|
||||
|
||||
lcfg := logutil.DefaultZapLoggerConfig
|
||||
|
@ -27,8 +27,6 @@ import (
|
||||
)
|
||||
|
||||
func TestEndpointManager(t *testing.T) {
|
||||
t.Skip("Not implemented yet")
|
||||
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
@ -54,10 +52,10 @@ func TestEndpointManager(t *testing.T) {
|
||||
us := <-w
|
||||
|
||||
if us == nil {
|
||||
t.Fatal("failed to get update", err)
|
||||
t.Fatal("failed to get update")
|
||||
}
|
||||
|
||||
wu := endpoints.Update{
|
||||
wu := &endpoints.Update{
|
||||
Op: endpoints.Add,
|
||||
Key: "foo/a1",
|
||||
Endpoint: e1,
|
||||
@ -69,21 +67,21 @@ func TestEndpointManager(t *testing.T) {
|
||||
|
||||
err = em.DeleteEndpoint(context.TODO(), "foo/a1")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to udpate %v", err)
|
||||
t.Fatalf("failed to update %v", err)
|
||||
}
|
||||
|
||||
us = <-w
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get udpate %v", err)
|
||||
if us == nil {
|
||||
t.Fatal("failed to get update")
|
||||
}
|
||||
|
||||
wu = endpoints.Update{
|
||||
wu = &endpoints.Update{
|
||||
Op: endpoints.Delete,
|
||||
Key: "foo/a1",
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(us, wu) {
|
||||
t.Fatalf("up = %#v, want %#v", us[1], wu)
|
||||
if !reflect.DeepEqual(us[0], wu) {
|
||||
t.Fatalf("up = %#v, want %#v", us[0], wu)
|
||||
}
|
||||
}
|
||||
|
||||
@ -91,8 +89,6 @@ func TestEndpointManager(t *testing.T) {
|
||||
// correctly with multiple hosts and correctly receive multiple
|
||||
// updates in a single revision.
|
||||
func TestEndpointManagerAtomicity(t *testing.T) {
|
||||
t.Skip("Not implemented yet")
|
||||
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
@ -133,3 +129,84 @@ func TestEndpointManagerAtomicity(t *testing.T) {
|
||||
t.Fatalf("expected two delete updates, got %+v", updates)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEndpointManagerCRUD(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
em, err := endpoints.NewManager(clus.RandClient(), "foo")
|
||||
if err != nil {
|
||||
t.Fatal("failed to create EndpointManager", err)
|
||||
}
|
||||
|
||||
// Add
|
||||
k1 := "foo/a1"
|
||||
e1 := endpoints.Endpoint{Addr: "127.0.0.1", Metadata: "metadata1"}
|
||||
err = em.AddEndpoint(context.TODO(), k1, e1)
|
||||
if err != nil {
|
||||
t.Fatal("failed to add", k1, err)
|
||||
}
|
||||
|
||||
k2 := "foo/a2"
|
||||
e2 := endpoints.Endpoint{Addr: "127.0.0.2", Metadata: "metadata2"}
|
||||
err = em.AddEndpoint(context.TODO(), k2, e2)
|
||||
if err != nil {
|
||||
t.Fatal("failed to add", k2, err)
|
||||
}
|
||||
|
||||
eps, err := em.List(context.TODO())
|
||||
if err != nil {
|
||||
t.Fatal("failed to list foo")
|
||||
}
|
||||
if len(eps) != 2 {
|
||||
t.Fatalf("unexpected the number of endpoints: %d", len(eps))
|
||||
}
|
||||
if !reflect.DeepEqual(eps[k1], e1) {
|
||||
t.Fatalf("unexpected endpoints: %s", k1)
|
||||
}
|
||||
if !reflect.DeepEqual(eps[k2], e2) {
|
||||
t.Fatalf("unexpected endpoints: %s", k2)
|
||||
}
|
||||
|
||||
// Delete
|
||||
err = em.DeleteEndpoint(context.TODO(), k1)
|
||||
if err != nil {
|
||||
t.Fatal("failed to delete", k2, err)
|
||||
}
|
||||
|
||||
eps, err = em.List(context.TODO())
|
||||
if err != nil {
|
||||
t.Fatal("failed to list foo")
|
||||
}
|
||||
if len(eps) != 1 {
|
||||
t.Fatalf("unexpected the number of endpoints: %d", len(eps))
|
||||
}
|
||||
if !reflect.DeepEqual(eps[k2], e2) {
|
||||
t.Fatalf("unexpected endpoints: %s", k2)
|
||||
}
|
||||
|
||||
// Update
|
||||
k3 := "foo/a3"
|
||||
e3 := endpoints.Endpoint{Addr: "127.0.0.3", Metadata: "metadata3"}
|
||||
updates := []*endpoints.UpdateWithOpts{
|
||||
{Update: endpoints.Update{Op: endpoints.Add, Key: k3, Endpoint: e3}},
|
||||
{Update: endpoints.Update{Op: endpoints.Delete, Key: k2}},
|
||||
}
|
||||
err = em.Update(context.TODO(), updates)
|
||||
if err != nil {
|
||||
t.Fatal("failed to update", err)
|
||||
}
|
||||
|
||||
eps, err = em.List(context.TODO())
|
||||
if err != nil {
|
||||
t.Fatal("failed to list foo")
|
||||
}
|
||||
if len(eps) != 1 {
|
||||
t.Fatalf("unexpected the number of endpoints: %d", len(eps))
|
||||
}
|
||||
if !reflect.DeepEqual(eps[k3], e3) {
|
||||
t.Fatalf("unexpected endpoints: %s", k3)
|
||||
}
|
||||
}
|
||||
|
@ -15,56 +15,112 @@
|
||||
package naming
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
testpb "google.golang.org/grpc/test/grpc_testing"
|
||||
|
||||
"go.etcd.io/etcd/clientv3/naming/endpoints"
|
||||
"go.etcd.io/etcd/clientv3/naming/resolver"
|
||||
"go.etcd.io/etcd/integration"
|
||||
grpctest "go.etcd.io/etcd/pkg/grpc_testing"
|
||||
"go.etcd.io/etcd/pkg/testutil"
|
||||
)
|
||||
|
||||
// This test mimics scenario described in grpc_naming.md doc.
|
||||
|
||||
func TestEtcdGrpcResolver(t *testing.T) {
|
||||
t.Skip("Not implemented yet")
|
||||
|
||||
defer testutil.AfterTest(t)
|
||||
s1PayloadBody := []byte{'1'}
|
||||
s1 := newDummyStubServer(s1PayloadBody)
|
||||
if err := s1.Start(nil); err != nil {
|
||||
t.Fatal("failed to start dummy grpc server (s1)", err)
|
||||
}
|
||||
defer s1.Stop()
|
||||
|
||||
// s1 := // TODO: Dummy GRPC service listening on 127.0.0.1:20000
|
||||
// s2 := // TODO: Dummy GRPC service listening on 127.0.0.1:20001
|
||||
s2PayloadBody := []byte{'2'}
|
||||
s2 := newDummyStubServer(s2PayloadBody)
|
||||
if err := s2.Start(nil); err != nil {
|
||||
t.Fatal("failed to start dummy grpc server (s2)", err)
|
||||
}
|
||||
defer s2.Stop()
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
em, err := endpoints.NewManager(clus.RandClient(), "foo")
|
||||
em, err := endpoints.NewManager(clus.Client(0), "foo")
|
||||
if err != nil {
|
||||
t.Fatal("failed to create EndpointManager", err)
|
||||
}
|
||||
|
||||
e1 := endpoints.Endpoint{Addr: "127.0.0.1:20000"}
|
||||
e2 := endpoints.Endpoint{Addr: "127.0.0.1:20001"}
|
||||
e1 := endpoints.Endpoint{Addr: s1.Addr()}
|
||||
e2 := endpoints.Endpoint{Addr: s2.Addr()}
|
||||
|
||||
err = em.AddEndpoint(context.TODO(), "foo/e1", e1)
|
||||
if err != nil {
|
||||
t.Fatal("failed to add foo", err)
|
||||
}
|
||||
etcdResolver, err := resolver.NewBuilder(clus.RandClient())
|
||||
|
||||
conn, err := grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver))
|
||||
b, err := resolver.NewBuilder(clus.Client(1))
|
||||
if err != nil {
|
||||
t.Fatal("failed to connect to foo (e1)", err)
|
||||
t.Fatal("failed to new resolver builder", err)
|
||||
}
|
||||
conn, err := grpc.Dial("etcd:///foo", grpc.WithInsecure(), grpc.WithResolvers(b))
|
||||
if err != nil {
|
||||
t.Fatal("failed to connect to foo", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// TODO: send requests to conn, ensure s1 received it.
|
||||
c := testpb.NewTestServiceClient(conn)
|
||||
resp, err := c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}, grpc.WaitForReady(true))
|
||||
if err != nil {
|
||||
t.Fatal("failed to invoke rpc to foo (e1)", err)
|
||||
}
|
||||
if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s1PayloadBody) {
|
||||
t.Fatalf("unexpected response from foo (e1): %s", resp.GetPayload().GetBody())
|
||||
}
|
||||
|
||||
em.DeleteEndpoint(context.TODO(), "foo/e1")
|
||||
em.AddEndpoint(context.TODO(), "foo/e2", e2)
|
||||
|
||||
// TODO: Send requests to conn and make sure s2 receive it.
|
||||
// Might require restarting s1 to break the existing (open) connection.
|
||||
// We use a loop with deadline of 30s to avoid test getting flake
|
||||
// as it's asynchronous for gRPC Client to update underlying connections.
|
||||
maxRetries := 300
|
||||
retryPeriod := 100 * time.Millisecond
|
||||
retries := 0
|
||||
for {
|
||||
time.Sleep(retryPeriod)
|
||||
retries++
|
||||
|
||||
conn.GetState() // this line is to avoid compiler warning that conn is unused.
|
||||
resp, err = c.UnaryCall(context.TODO(), &testpb.SimpleRequest{})
|
||||
if err != nil {
|
||||
if retries < maxRetries {
|
||||
continue
|
||||
}
|
||||
t.Fatal("failed to invoke rpc to foo (e2)", err)
|
||||
}
|
||||
if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s2PayloadBody) {
|
||||
if retries < maxRetries {
|
||||
continue
|
||||
}
|
||||
t.Fatalf("unexpected response from foo (e2): %s", resp.GetPayload().GetBody())
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
func newDummyStubServer(body []byte) *grpctest.StubServer {
|
||||
return &grpctest.StubServer{
|
||||
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
|
||||
return &testpb.SimpleResponse{
|
||||
Payload: &testpb.Payload{
|
||||
Type: testpb.PayloadType_COMPRESSABLE,
|
||||
Body: body,
|
||||
},
|
||||
}, nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -53,7 +53,7 @@ type Update struct {
|
||||
}
|
||||
|
||||
// WatchChannel is used to deliver notifications about endpoints updates.
|
||||
type WatchChannel chan []*Update
|
||||
type WatchChannel <-chan []*Update
|
||||
|
||||
// Key2EndpointMap maps etcd key into struct describing the endpoint.
|
||||
type Key2EndpointMap map[string]Endpoint
|
||||
|
@ -18,37 +18,66 @@ package endpoints
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"strings"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"go.etcd.io/etcd/clientv3/naming/endpoints/internal"
|
||||
)
|
||||
|
||||
type endpointManager struct {
|
||||
// TODO: To be implemented, tracked by: https://github.com/etcd-io/etcd/issues/12652
|
||||
client *clientv3.Client
|
||||
target string
|
||||
}
|
||||
|
||||
func NewManager(client *clientv3.Client, target string) (Manager, error) {
|
||||
// To be implemented (https://github.com/etcd-io/etcd/issues/12652)
|
||||
return nil, fmt.Errorf("Not implemented yet")
|
||||
if client == nil {
|
||||
return nil, errors.New("invalid etcd client")
|
||||
}
|
||||
|
||||
if target == "" {
|
||||
return nil, errors.New("invalid target")
|
||||
}
|
||||
|
||||
em := &endpointManager{
|
||||
client: client,
|
||||
target: target,
|
||||
}
|
||||
return em, nil
|
||||
}
|
||||
|
||||
func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) error {
|
||||
// TODO: For loop in a single transaction:
|
||||
internalUpdate := &internal.Update{} // translate UpdateWithOpts into json format.
|
||||
switch internalUpdate.Op {
|
||||
//case internal.Add:
|
||||
// var v []byte
|
||||
// if v, err = json.Marshal(internalUpdate); err != nil {
|
||||
// return status.Error(codes.InvalidArgument, err.Error())
|
||||
// }
|
||||
// _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)
|
||||
//case internal.Delete:
|
||||
// _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)
|
||||
//default:
|
||||
// return status.Error(codes.InvalidArgument, "naming: bad naming op")
|
||||
func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) (err error) {
|
||||
ops := make([]clientv3.Op, 0, len(updates))
|
||||
for _, update := range updates {
|
||||
if !strings.HasPrefix(update.Key, m.target+"/") {
|
||||
return status.Errorf(codes.InvalidArgument, "endpoints: endpoint key should be prefixed with '%s/' got: '%s'", m.target, update.Key)
|
||||
}
|
||||
switch update.Op {
|
||||
case Add:
|
||||
internalUpdate := &internal.Update{
|
||||
Op: internal.Add,
|
||||
Addr: update.Endpoint.Addr,
|
||||
Metadata: update.Endpoint.Metadata,
|
||||
}
|
||||
|
||||
var v []byte
|
||||
if v, err = json.Marshal(internalUpdate); err != nil {
|
||||
return status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
ops = append(ops, clientv3.OpPut(update.Key, string(v), update.Opts...))
|
||||
case Delete:
|
||||
ops = append(ops, clientv3.OpDelete(update.Key, update.Opts...))
|
||||
default:
|
||||
return status.Error(codes.InvalidArgument, "endpoints: bad update op")
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("Not implemented yet")
|
||||
_, err = m.client.KV.Txn(ctx).Then(ops...).Commit()
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *endpointManager) AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error {
|
||||
@ -60,76 +89,98 @@ func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts .
|
||||
}
|
||||
|
||||
func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) {
|
||||
return nil, fmt.Errorf("Not implemented yet")
|
||||
resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: Implementation to be inspired by:
|
||||
// Next gets the next set of updates from the etcd resolver.
|
||||
//// Calls to Next should be serialized; concurrent calls are not safe since
|
||||
//// there is no way to reconcile the update ordering.
|
||||
//func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
|
||||
// if gw.wch == nil {
|
||||
// // first Next() returns all addresses
|
||||
// return gw.firstNext()
|
||||
// }
|
||||
// if gw.err != nil {
|
||||
// return nil, gw.err
|
||||
// }
|
||||
//
|
||||
// // process new events on target/*
|
||||
// wr, ok := <-gw.wch
|
||||
// if !ok {
|
||||
// gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error())
|
||||
// return nil, gw.err
|
||||
// }
|
||||
// if gw.err = wr.Err(); gw.err != nil {
|
||||
// return nil, gw.err
|
||||
// }
|
||||
//
|
||||
// updates := make([]*naming.Update, 0, len(wr.Events))
|
||||
// for _, e := range wr.Events {
|
||||
// var jupdate naming.Update
|
||||
// var err error
|
||||
// switch e.Type {
|
||||
// case etcd.EventTypePut:
|
||||
// err = json.Unmarshal(e.Kv.Value, &jupdate)
|
||||
// jupdate.Op = naming.Add
|
||||
// case etcd.EventTypeDelete:
|
||||
// err = json.Unmarshal(e.PrevKv.Value, &jupdate)
|
||||
// jupdate.Op = naming.Delete
|
||||
// default:
|
||||
// continue
|
||||
// }
|
||||
// if err == nil {
|
||||
// updates = append(updates, &jupdate)
|
||||
// }
|
||||
// }
|
||||
// return updates, nil
|
||||
//}
|
||||
//
|
||||
//func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
|
||||
// // Use serialized request so resolution still works if the target etcd
|
||||
// // server is partitioned away from the quorum.
|
||||
// resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
|
||||
// if gw.err = err; err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
//
|
||||
// updates := make([]*naming.Update, 0, len(resp.Kvs))
|
||||
// for _, kv := range resp.Kvs {
|
||||
// var jupdate naming.Update
|
||||
// if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
|
||||
// continue
|
||||
// }
|
||||
// updates = append(updates, &jupdate)
|
||||
// }
|
||||
//
|
||||
// opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
|
||||
// gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
|
||||
// return updates, nil
|
||||
//}
|
||||
lg := m.client.GetLogger()
|
||||
initUpdates := make([]*Update, 0, len(resp.Kvs))
|
||||
for _, kv := range resp.Kvs {
|
||||
var iup internal.Update
|
||||
if err := json.Unmarshal(kv.Value, &iup); err != nil {
|
||||
lg.Warn("unmarshal endpoint update failed", zap.String("key", string(kv.Key)), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
up := &Update{
|
||||
Op: Add,
|
||||
Key: string(kv.Key),
|
||||
Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata},
|
||||
}
|
||||
initUpdates = append(initUpdates, up)
|
||||
}
|
||||
|
||||
upch := make(chan []*Update, 1)
|
||||
if len(initUpdates) > 0 {
|
||||
upch <- initUpdates
|
||||
}
|
||||
go m.watch(ctx, resp.Header.Revision+1, upch)
|
||||
return upch, nil
|
||||
}
|
||||
|
||||
func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Update) {
|
||||
defer close(upch)
|
||||
|
||||
lg := m.client.GetLogger()
|
||||
opts := []clientv3.OpOption{clientv3.WithRev(rev), clientv3.WithPrefix()}
|
||||
wch := m.client.Watch(ctx, m.target, opts...)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case wresp, ok := <-wch:
|
||||
if !ok {
|
||||
lg.Warn("watch closed", zap.String("target", m.target))
|
||||
return
|
||||
}
|
||||
if wresp.Err() != nil {
|
||||
lg.Warn("watch failed", zap.String("target", m.target), zap.Error(wresp.Err()))
|
||||
return
|
||||
}
|
||||
|
||||
deltaUps := make([]*Update, 0, len(wresp.Events))
|
||||
for _, e := range wresp.Events {
|
||||
var iup internal.Update
|
||||
var err error
|
||||
var op Operation
|
||||
switch e.Type {
|
||||
case clientv3.EventTypePut:
|
||||
err = json.Unmarshal(e.Kv.Value, &iup)
|
||||
op = Add
|
||||
if err != nil {
|
||||
lg.Warn("unmarshal endpoint update failed", zap.String("key", string(e.Kv.Key)), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
case clientv3.EventTypeDelete:
|
||||
iup = internal.Update{Op: internal.Delete}
|
||||
op = Delete
|
||||
default:
|
||||
continue
|
||||
}
|
||||
up := &Update{Op: op, Key: string(e.Kv.Key), Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata}}
|
||||
deltaUps = append(deltaUps, up)
|
||||
}
|
||||
if len(deltaUps) > 0 {
|
||||
upch <- deltaUps
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) {
|
||||
// TODO: Implementation
|
||||
return nil, fmt.Errorf("Not implemented yet")
|
||||
resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
eps := make(Key2EndpointMap)
|
||||
for _, kv := range resp.Kvs {
|
||||
var iup internal.Update
|
||||
if err := json.Unmarshal(kv.Value, &iup); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
eps[string(kv.Key)] = Endpoint{Addr: iup.Addr, Metadata: iup.Metadata}
|
||||
}
|
||||
return eps, nil
|
||||
}
|
||||
|
@ -1,133 +0,0 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package naming
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
etcd "go.etcd.io/etcd/clientv3"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/naming"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
var ErrWatcherClosed = fmt.Errorf("naming: watch closed")
|
||||
|
||||
// GRPCResolver creates a grpc.Watcher for a target to track its resolution changes.
|
||||
type GRPCResolver struct {
|
||||
// Client is an initialized etcd client.
|
||||
Client *etcd.Client
|
||||
}
|
||||
|
||||
func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) {
|
||||
switch nm.Op {
|
||||
case naming.Add:
|
||||
var v []byte
|
||||
if v, err = json.Marshal(nm); err != nil {
|
||||
return status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
_, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)
|
||||
case naming.Delete:
|
||||
_, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)
|
||||
default:
|
||||
return status.Error(codes.InvalidArgument, "naming: bad naming op")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel}
|
||||
return w, nil
|
||||
}
|
||||
|
||||
type gRPCWatcher struct {
|
||||
c *etcd.Client
|
||||
target string
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wch etcd.WatchChan
|
||||
err error
|
||||
}
|
||||
|
||||
// Next gets the next set of updates from the etcd resolver.
|
||||
// Calls to Next should be serialized; concurrent calls are not safe since
|
||||
// there is no way to reconcile the update ordering.
|
||||
func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
|
||||
if gw.wch == nil {
|
||||
// first Next() returns all addresses
|
||||
return gw.firstNext()
|
||||
}
|
||||
if gw.err != nil {
|
||||
return nil, gw.err
|
||||
}
|
||||
|
||||
// process new events on target/*
|
||||
wr, ok := <-gw.wch
|
||||
if !ok {
|
||||
gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error())
|
||||
return nil, gw.err
|
||||
}
|
||||
if gw.err = wr.Err(); gw.err != nil {
|
||||
return nil, gw.err
|
||||
}
|
||||
|
||||
updates := make([]*naming.Update, 0, len(wr.Events))
|
||||
for _, e := range wr.Events {
|
||||
var jupdate naming.Update
|
||||
var err error
|
||||
switch e.Type {
|
||||
case etcd.EventTypePut:
|
||||
err = json.Unmarshal(e.Kv.Value, &jupdate)
|
||||
jupdate.Op = naming.Add
|
||||
case etcd.EventTypeDelete:
|
||||
err = json.Unmarshal(e.PrevKv.Value, &jupdate)
|
||||
jupdate.Op = naming.Delete
|
||||
default:
|
||||
continue
|
||||
}
|
||||
if err == nil {
|
||||
updates = append(updates, &jupdate)
|
||||
}
|
||||
}
|
||||
return updates, nil
|
||||
}
|
||||
|
||||
func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
|
||||
// Use serialized request so resolution still works if the target etcd
|
||||
// server is partitioned away from the quorum.
|
||||
resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
|
||||
if gw.err = err; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
updates := make([]*naming.Update, 0, len(resp.Kvs))
|
||||
for _, kv := range resp.Kvs {
|
||||
var jupdate naming.Update
|
||||
if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
|
||||
continue
|
||||
}
|
||||
updates = append(updates, &jupdate)
|
||||
}
|
||||
|
||||
opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
|
||||
gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
|
||||
return updates, nil
|
||||
}
|
||||
|
||||
func (gw *gRPCWatcher) Close() { gw.cancel() }
|
@ -1,139 +0,0 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package naming_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
etcd "go.etcd.io/etcd/clientv3"
|
||||
namingv3 "go.etcd.io/etcd/clientv3/naming"
|
||||
"go.etcd.io/etcd/integration"
|
||||
"go.etcd.io/etcd/pkg/testutil"
|
||||
|
||||
"google.golang.org/grpc/naming"
|
||||
)
|
||||
|
||||
func TestGRPCResolver(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
r := namingv3.GRPCResolver{
|
||||
Client: clus.RandClient(),
|
||||
}
|
||||
|
||||
w, err := r.Resolve("foo")
|
||||
if err != nil {
|
||||
t.Fatal("failed to resolve foo", err)
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
addOp := naming.Update{Op: naming.Add, Addr: "127.0.0.1", Metadata: "metadata"}
|
||||
err = r.Update(context.TODO(), "foo", addOp)
|
||||
if err != nil {
|
||||
t.Fatal("failed to add foo", err)
|
||||
}
|
||||
|
||||
us, err := w.Next()
|
||||
if err != nil {
|
||||
t.Fatal("failed to get udpate", err)
|
||||
}
|
||||
|
||||
wu := &naming.Update{
|
||||
Op: naming.Add,
|
||||
Addr: "127.0.0.1",
|
||||
Metadata: "metadata",
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(us[0], wu) {
|
||||
t.Fatalf("up = %#v, want %#v", us[0], wu)
|
||||
}
|
||||
|
||||
delOp := naming.Update{Op: naming.Delete, Addr: "127.0.0.1"}
|
||||
err = r.Update(context.TODO(), "foo", delOp)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to udpate %v", err)
|
||||
}
|
||||
|
||||
us, err = w.Next()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get udpate %v", err)
|
||||
}
|
||||
|
||||
wu = &naming.Update{
|
||||
Op: naming.Delete,
|
||||
Addr: "127.0.0.1",
|
||||
Metadata: "metadata",
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(us[0], wu) {
|
||||
t.Fatalf("up = %#v, want %#v", us[0], wu)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGRPCResolverMulti ensures the resolver will initialize
|
||||
// correctly with multiple hosts and correctly receive multiple
|
||||
// updates in a single revision.
|
||||
func TestGRPCResolverMulti(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
c := clus.RandClient()
|
||||
|
||||
v, verr := json.Marshal(naming.Update{Addr: "127.0.0.1", Metadata: "md"})
|
||||
if verr != nil {
|
||||
t.Fatal(verr)
|
||||
}
|
||||
if _, err := c.Put(context.TODO(), "foo/host", string(v)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := c.Put(context.TODO(), "foo/host2", string(v)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r := namingv3.GRPCResolver{c}
|
||||
|
||||
w, err := r.Resolve("foo")
|
||||
if err != nil {
|
||||
t.Fatal("failed to resolve foo", err)
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
updates, nerr := w.Next()
|
||||
if nerr != nil {
|
||||
t.Fatal(nerr)
|
||||
}
|
||||
if len(updates) != 2 {
|
||||
t.Fatalf("expected two updates, got %+v", updates)
|
||||
}
|
||||
|
||||
_, err = c.Txn(context.TODO()).Then(etcd.OpDelete("foo/host"), etcd.OpDelete("foo/host2")).Commit()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
updates, nerr = w.Next()
|
||||
if nerr != nil {
|
||||
t.Fatal(nerr)
|
||||
}
|
||||
if len(updates) != 2 || (updates[0].Op != naming.Delete && updates[1].Op != naming.Delete) {
|
||||
t.Fatalf("expected two updates, got %+v", updates)
|
||||
}
|
||||
}
|
@ -15,25 +15,107 @@
|
||||
package resolver
|
||||
|
||||
import (
|
||||
"google.golang.org/grpc/resolver"
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
gresolver "google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"go.etcd.io/etcd/clientv3/naming/endpoints"
|
||||
)
|
||||
|
||||
type builder struct {
|
||||
// ...
|
||||
c *clientv3.Client
|
||||
}
|
||||
|
||||
func (b builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
|
||||
// To be implemented...
|
||||
// Using endpoints.NewWatcher() to subscribe for endpoints changes.
|
||||
return nil, nil
|
||||
func (b builder) Build(target gresolver.Target, cc gresolver.ClientConn, opts gresolver.BuildOptions) (gresolver.Resolver, error) {
|
||||
r := &resolver{
|
||||
c: b.c,
|
||||
target: target.Endpoint,
|
||||
cc: cc,
|
||||
}
|
||||
r.ctx, r.cancel = context.WithCancel(context.Background())
|
||||
|
||||
em, err := endpoints.NewManager(r.c, r.target)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "resolver: failed to new endpoint manager: %s", err)
|
||||
}
|
||||
r.wch, err = em.NewWatchChannel(r.ctx)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "resolver: failed to new watch channer: %s", err)
|
||||
}
|
||||
|
||||
r.wg.Add(1)
|
||||
go r.watch()
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (b builder) Scheme() string {
|
||||
return "etcd"
|
||||
}
|
||||
|
||||
func NewBuilder(client *clientv3.Client) (resolver.Builder, error) {
|
||||
return builder{}, nil
|
||||
// NewBuilder creates a resolver builder.
|
||||
func NewBuilder(client *clientv3.Client) (gresolver.Builder, error) {
|
||||
return builder{c: client}, nil
|
||||
}
|
||||
|
||||
type resolver struct {
|
||||
c *clientv3.Client
|
||||
target string
|
||||
cc gresolver.ClientConn
|
||||
wch endpoints.WatchChannel
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func (r *resolver) watch() {
|
||||
defer r.wg.Done()
|
||||
|
||||
allUps := make(map[string]*endpoints.Update)
|
||||
for {
|
||||
select {
|
||||
case <-r.ctx.Done():
|
||||
return
|
||||
case ups, ok := <-r.wch:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
for _, up := range ups {
|
||||
switch up.Op {
|
||||
case endpoints.Add:
|
||||
allUps[up.Key] = up
|
||||
case endpoints.Delete:
|
||||
delete(allUps, up.Key)
|
||||
}
|
||||
}
|
||||
|
||||
addrs := convertToGRPCAddress(allUps)
|
||||
r.cc.UpdateState(gresolver.State{Addresses: addrs})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func convertToGRPCAddress(ups map[string]*endpoints.Update) []gresolver.Address {
|
||||
var addrs []gresolver.Address
|
||||
for _, up := range ups {
|
||||
addr := gresolver.Address{
|
||||
Addr: up.Endpoint.Addr,
|
||||
Metadata: up.Endpoint.Metadata,
|
||||
}
|
||||
addrs = append(addrs, addr)
|
||||
}
|
||||
return addrs
|
||||
}
|
||||
|
||||
// ResolveNow is a no-op here.
|
||||
// It's just a hint, resolver can ignore this if it's not necessary.
|
||||
func (r *resolver) ResolveNow(gresolver.ResolveNowOptions) {}
|
||||
|
||||
func (r *resolver) Close() {
|
||||
r.cancel()
|
||||
r.wg.Wait()
|
||||
}
|
||||
|
100
pkg/grpc_testing/stub_server.go
Normal file
100
pkg/grpc_testing/stub_server.go
Normal file
@ -0,0 +1,100 @@
|
||||
// Copyright 2023 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package grpc_testing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
testpb "google.golang.org/grpc/test/grpc_testing"
|
||||
)
|
||||
|
||||
// StubServer is borrowed from the interal package of grpc-go.
|
||||
// See https://github.com/grpc/grpc-go/blob/master/internal/stubserver/stubserver.go
|
||||
// Since it cannot be imported directly, we have to copy and paste it here,
|
||||
// and useless code for our testing is removed.
|
||||
|
||||
// StubServer is a server that is easy to customize within individual test
|
||||
// cases.
|
||||
type StubServer struct {
|
||||
// Guarantees we satisfy this interface; panics if unimplemented methods are called.
|
||||
testpb.TestServiceServer
|
||||
|
||||
EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error)
|
||||
UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
|
||||
FullDuplexCallF func(stream testpb.TestService_FullDuplexCallServer) error
|
||||
|
||||
s *grpc.Server
|
||||
|
||||
// Network and Address are parameters for Listen. Defaults will be used if these are empty before Start.
|
||||
Network string
|
||||
Address string
|
||||
|
||||
cleanups []func() // Lambdas executed in Stop(); populated by Start().
|
||||
}
|
||||
|
||||
// EmptyCall is the handler for testpb.EmptyCall.
|
||||
func (ss *StubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
|
||||
return ss.EmptyCallF(ctx, in)
|
||||
}
|
||||
|
||||
// UnaryCall is the handler for testpb.UnaryCall.
|
||||
func (ss *StubServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
|
||||
return ss.UnaryCallF(ctx, in)
|
||||
}
|
||||
|
||||
// FullDuplexCall is the handler for testpb.FullDuplexCall.
|
||||
func (ss *StubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
|
||||
return ss.FullDuplexCallF(stream)
|
||||
}
|
||||
|
||||
// Start starts the server and creates a client connected to it.
|
||||
func (ss *StubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error {
|
||||
if ss.Network == "" {
|
||||
ss.Network = "tcp"
|
||||
}
|
||||
if ss.Address == "" {
|
||||
ss.Address = "localhost:0"
|
||||
}
|
||||
|
||||
lis, err := net.Listen(ss.Network, ss.Address)
|
||||
if err != nil {
|
||||
return fmt.Errorf("net.Listen(%q, %q) = %v", ss.Network, ss.Address, err)
|
||||
}
|
||||
ss.Address = lis.Addr().String()
|
||||
ss.cleanups = append(ss.cleanups, func() { lis.Close() })
|
||||
|
||||
s := grpc.NewServer(sopts...)
|
||||
testpb.RegisterTestServiceServer(s, ss)
|
||||
go s.Serve(lis)
|
||||
ss.cleanups = append(ss.cleanups, s.Stop)
|
||||
ss.s = s
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop stops ss and cleans up all resources it consumed.
|
||||
func (ss *StubServer) Stop() {
|
||||
for i := len(ss.cleanups) - 1; i >= 0; i-- {
|
||||
ss.cleanups[i]()
|
||||
}
|
||||
}
|
||||
|
||||
// Addr gets the address the server listening on.
|
||||
func (ss *StubServer) Addr() string {
|
||||
return ss.Address
|
||||
}
|
@ -22,12 +22,10 @@ import (
|
||||
"sync"
|
||||
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"go.etcd.io/etcd/clientv3/naming"
|
||||
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"go.etcd.io/etcd/clientv3/naming/endpoints"
|
||||
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
gnaming "google.golang.org/grpc/naming"
|
||||
)
|
||||
|
||||
// allow maximum 1 retry per second
|
||||
@ -36,35 +34,46 @@ const resolveRetryRate = 1
|
||||
type clusterProxy struct {
|
||||
clus clientv3.Cluster
|
||||
ctx context.Context
|
||||
gr *naming.GRPCResolver
|
||||
|
||||
// advertise client URL
|
||||
advaddr string
|
||||
prefix string
|
||||
|
||||
em endpoints.Manager
|
||||
|
||||
umu sync.RWMutex
|
||||
umap map[string]gnaming.Update
|
||||
umap map[string]endpoints.Endpoint
|
||||
}
|
||||
|
||||
// NewClusterProxy takes optional prefix to fetch grpc-proxy member endpoints.
|
||||
// The returned channel is closed when there is grpc-proxy endpoint registered
|
||||
// and the client's context is canceled so the 'register' loop returns.
|
||||
// TODO: Expand the API to report creation errors
|
||||
func NewClusterProxy(c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{}) {
|
||||
var em endpoints.Manager
|
||||
if advaddr != "" && prefix != "" {
|
||||
var err error
|
||||
if em, err = endpoints.NewManager(c, prefix); err != nil {
|
||||
plog.Errorf("failed to provision endpointsManager %q (%v)", prefix, err)
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
cp := &clusterProxy{
|
||||
clus: c.Cluster,
|
||||
ctx: c.Ctx(),
|
||||
gr: &naming.GRPCResolver{Client: c},
|
||||
|
||||
advaddr: advaddr,
|
||||
prefix: prefix,
|
||||
umap: make(map[string]gnaming.Update),
|
||||
umap: make(map[string]endpoints.Endpoint),
|
||||
em: em,
|
||||
}
|
||||
|
||||
donec := make(chan struct{})
|
||||
if advaddr != "" && prefix != "" {
|
||||
if em != nil {
|
||||
go func() {
|
||||
defer close(donec)
|
||||
cp.resolve(prefix)
|
||||
cp.establishEndpointWatch(prefix)
|
||||
}()
|
||||
return cp, donec
|
||||
}
|
||||
@ -73,38 +82,36 @@ func NewClusterProxy(c *clientv3.Client, advaddr string, prefix string) (pb.Clus
|
||||
return cp, donec
|
||||
}
|
||||
|
||||
func (cp *clusterProxy) resolve(prefix string) {
|
||||
func (cp *clusterProxy) establishEndpointWatch(prefix string) {
|
||||
rm := rate.NewLimiter(rate.Limit(resolveRetryRate), resolveRetryRate)
|
||||
for rm.Wait(cp.ctx) == nil {
|
||||
wa, err := cp.gr.Resolve(prefix)
|
||||
wc, err := cp.em.NewWatchChannel(cp.ctx)
|
||||
if err != nil {
|
||||
plog.Warningf("failed to resolve %q (%v)", prefix, err)
|
||||
plog.Warningf("failed to establish endpoint watch %q (%v)", prefix, err)
|
||||
continue
|
||||
}
|
||||
cp.monitor(wa)
|
||||
cp.monitor(wc)
|
||||
}
|
||||
}
|
||||
|
||||
func (cp *clusterProxy) monitor(wa gnaming.Watcher) {
|
||||
for cp.ctx.Err() == nil {
|
||||
ups, err := wa.Next()
|
||||
if err != nil {
|
||||
plog.Warningf("clusterProxy watcher error (%v)", err)
|
||||
if rpctypes.ErrorDesc(err) == naming.ErrWatcherClosed.Error() {
|
||||
return
|
||||
func (cp *clusterProxy) monitor(wc endpoints.WatchChannel) {
|
||||
for {
|
||||
select {
|
||||
case <-cp.ctx.Done():
|
||||
plog.Info("watching endpoints interrupted (%v)", cp.ctx.Err())
|
||||
return
|
||||
case updates := <-wc:
|
||||
cp.umu.Lock()
|
||||
for _, up := range updates {
|
||||
switch up.Op {
|
||||
case endpoints.Add:
|
||||
cp.umap[up.Endpoint.Addr] = up.Endpoint
|
||||
case endpoints.Delete:
|
||||
delete(cp.umap, up.Endpoint.Addr)
|
||||
}
|
||||
}
|
||||
cp.umu.Unlock()
|
||||
}
|
||||
|
||||
cp.umu.Lock()
|
||||
for i := range ups {
|
||||
switch ups[i].Op {
|
||||
case gnaming.Add:
|
||||
cp.umap[ups[i].Addr] = *ups[i]
|
||||
case gnaming.Delete:
|
||||
delete(cp.umap, ups[i].Addr)
|
||||
}
|
||||
}
|
||||
cp.umu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -20,10 +20,9 @@ import (
|
||||
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"go.etcd.io/etcd/clientv3/concurrency"
|
||||
"go.etcd.io/etcd/clientv3/naming"
|
||||
"go.etcd.io/etcd/clientv3/naming/endpoints"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
gnaming "google.golang.org/grpc/naming"
|
||||
)
|
||||
|
||||
// allow maximum 1 retry per second
|
||||
@ -67,8 +66,12 @@ func registerSession(c *clientv3.Client, prefix string, addr string, ttl int) (*
|
||||
return nil, err
|
||||
}
|
||||
|
||||
gr := &naming.GRPCResolver{Client: c}
|
||||
if err = gr.Update(c.Ctx(), prefix, gnaming.Update{Op: gnaming.Add, Addr: addr, Metadata: getMeta()}, clientv3.WithLease(ss.Lease())); err != nil {
|
||||
em, err := endpoints.NewManager(c, prefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
endpoint := endpoints.Endpoint{Addr: addr, Metadata: getMeta()}
|
||||
if err = em.AddEndpoint(c.Ctx(), prefix+"/"+addr, endpoint, clientv3.WithLease(ss.Lease())); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -19,11 +19,9 @@ import (
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"go.etcd.io/etcd/clientv3/naming"
|
||||
"go.etcd.io/etcd/clientv3/naming/endpoints"
|
||||
"go.etcd.io/etcd/integration"
|
||||
"go.etcd.io/etcd/pkg/testutil"
|
||||
|
||||
gnaming "google.golang.org/grpc/naming"
|
||||
)
|
||||
|
||||
func TestRegister(t *testing.T) {
|
||||
@ -35,26 +33,16 @@ func TestRegister(t *testing.T) {
|
||||
paddr := clus.Members[0].GRPCAddr()
|
||||
|
||||
testPrefix := "test-name"
|
||||
wa := createWatcher(t, cli, testPrefix)
|
||||
ups, err := wa.Next()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(ups) != 0 {
|
||||
t.Fatalf("len(ups) expected 0, got %d (%v)", len(ups), ups)
|
||||
}
|
||||
wa := mustCreateWatcher(t, cli, testPrefix)
|
||||
|
||||
donec := Register(cli, testPrefix, paddr, 5)
|
||||
|
||||
ups, err = wa.Next()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ups := <-wa
|
||||
if len(ups) != 1 {
|
||||
t.Fatalf("len(ups) expected 1, got %d (%v)", len(ups), ups)
|
||||
}
|
||||
if ups[0].Addr != paddr {
|
||||
t.Fatalf("ups[0].Addr expected %q, got %q", paddr, ups[0].Addr)
|
||||
if ups[0].Endpoint.Addr != paddr {
|
||||
t.Fatalf("ups[0].Addr expected %q, got %q", paddr, ups[0].Endpoint.Addr)
|
||||
}
|
||||
|
||||
cli.Close()
|
||||
@ -66,11 +54,14 @@ func TestRegister(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func createWatcher(t *testing.T, c *clientv3.Client, prefix string) gnaming.Watcher {
|
||||
gr := &naming.GRPCResolver{Client: c}
|
||||
watcher, err := gr.Resolve(prefix)
|
||||
func mustCreateWatcher(t *testing.T, c *clientv3.Client, prefix string) endpoints.WatchChannel {
|
||||
em, err := endpoints.NewManager(c, prefix)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create endpoints.Manager: %v", err)
|
||||
}
|
||||
wc, err := em.NewWatchChannel(c.Ctx())
|
||||
if err != nil {
|
||||
t.Fatalf("failed to resolve %q (%v)", prefix, err)
|
||||
}
|
||||
return watcher
|
||||
return wc
|
||||
}
|
||||
|
4
test
4
test
@ -192,7 +192,8 @@ function integration_pass {
|
||||
INTEGTESTPKG=("${REPO_PATH}/integration"
|
||||
"${REPO_PATH}/client/integration"
|
||||
"${REPO_PATH}/clientv3/integration/..."
|
||||
"${REPO_PATH}/contrib/raftexample")
|
||||
"${REPO_PATH}/contrib/raftexample"
|
||||
"${REPO_PATH}/proxy/grpcproxy")
|
||||
else
|
||||
INTEGTESTPKG=("${TEST[@]}")
|
||||
fi
|
||||
@ -205,6 +206,7 @@ function integration_extra {
|
||||
go test -timeout 25m -v ${RACE} -cpu "${TEST_CPUS}" "$@" "${REPO_PATH}/clientv3/integration/..."
|
||||
go test -timeout 1m -v -cpu "${TEST_CPUS}" "$@" "${REPO_PATH}/contrib/raftexample"
|
||||
go test -timeout 5m -v ${RACE} -tags v2v3 "$@" "${REPO_PATH}/etcdserver/api/v2store"
|
||||
go test -timeout 5m -v ${RACE} -cpu "${TEST_CPUS}" "$@" "${REPO_PATH}/proxy/grpcproxy"
|
||||
go test -timeout 1m -v ${RACE} -cpu "${TEST_CPUS}" -run=Example "$@" "${TEST[@]}"
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user