mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
[3.4] backport 12669: Implement Endpoint Watch and new Resolver
Signed-off-by: Chao Chen <chaochn@amazon.com>
This commit is contained in:
parent
15d6a11e14
commit
7c4696a7e8
@ -18,7 +18,7 @@ import (
|
||||
|
||||
cli, cerr := clientv3.NewFromURL("http://localhost:2379")
|
||||
etcdResolver, err := resolver.NewBuilder(clus.RandClient());
|
||||
conn, gerr := grpc.Dial("etcd://foo/bar/my-service", grpc.WithResolvers(etcdResolver))
|
||||
conn, gerr := grpc.Dial("etcd:///foo/bar/my-service", grpc.WithResolvers(etcdResolver))
|
||||
```
|
||||
|
||||
## Managing service endpoints
|
||||
@ -84,4 +84,4 @@ em := endpoints.NewManager(c, "foo")
|
||||
err := em.Update(context.TODO(), []*endpoints.UpdateWithOpts{
|
||||
endpoints.NewDeleteUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.4"}),
|
||||
endpoints.NewAddUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.14"})})
|
||||
```
|
||||
```
|
||||
|
@ -26,18 +26,19 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"go.etcd.io/etcd/clientv3/balancer"
|
||||
"go.etcd.io/etcd/clientv3/balancer/picker"
|
||||
"go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
|
||||
"go.etcd.io/etcd/clientv3/credentials"
|
||||
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"go.etcd.io/etcd/pkg/logutil"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
grpccredentials "google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"go.etcd.io/etcd/clientv3/balancer"
|
||||
"go.etcd.io/etcd/clientv3/balancer/picker"
|
||||
"go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
|
||||
"go.etcd.io/etcd/clientv3/credentials"
|
||||
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"go.etcd.io/etcd/pkg/logutil"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -95,7 +96,8 @@ type Client struct {
|
||||
|
||||
callOpts []grpc.CallOption
|
||||
|
||||
lg *zap.Logger
|
||||
lgMu *sync.RWMutex
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
// New creates a new etcdv3 client from a given configuration.
|
||||
@ -112,7 +114,7 @@ func New(cfg Config) (*Client, error) {
|
||||
// service interface implementations and do not need connection management.
|
||||
func NewCtxClient(ctx context.Context) *Client {
|
||||
cctx, cancel := context.WithCancel(ctx)
|
||||
return &Client{ctx: cctx, cancel: cancel}
|
||||
return &Client{ctx: cctx, cancel: cancel, lgMu: new(sync.RWMutex), lg: zap.NewNop()}
|
||||
}
|
||||
|
||||
// NewFromURL creates a new etcdv3 client from a URL.
|
||||
@ -125,6 +127,23 @@ func NewFromURLs(urls []string) (*Client, error) {
|
||||
return New(Config{Endpoints: urls})
|
||||
}
|
||||
|
||||
// WithLogger sets a logger
|
||||
func (c *Client) WithLogger(lg *zap.Logger) *Client {
|
||||
c.lgMu.Lock()
|
||||
c.lg = lg
|
||||
c.lgMu.Unlock()
|
||||
return c
|
||||
}
|
||||
|
||||
// GetLogger gets the logger.
|
||||
// NOTE: This method is for internal use of etcd-client library and should not be used as general-purpose logger.
|
||||
func (c *Client) GetLogger() *zap.Logger {
|
||||
c.lgMu.RLock()
|
||||
l := c.lg
|
||||
c.lgMu.RUnlock()
|
||||
return l
|
||||
}
|
||||
|
||||
// Close shuts down the client's etcd connections.
|
||||
func (c *Client) Close() error {
|
||||
c.cancel()
|
||||
@ -423,6 +442,7 @@ func newClient(cfg *Config) (*Client, error) {
|
||||
cancel: cancel,
|
||||
mu: new(sync.RWMutex),
|
||||
callOpts: defaultCallOpts,
|
||||
lgMu: new(sync.RWMutex),
|
||||
}
|
||||
|
||||
lcfg := logutil.DefaultZapLoggerConfig
|
||||
|
@ -27,8 +27,6 @@ import (
|
||||
)
|
||||
|
||||
func TestEndpointManager(t *testing.T) {
|
||||
t.Skip("Not implemented yet")
|
||||
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
@ -54,10 +52,10 @@ func TestEndpointManager(t *testing.T) {
|
||||
us := <-w
|
||||
|
||||
if us == nil {
|
||||
t.Fatal("failed to get update", err)
|
||||
t.Fatal("failed to get update")
|
||||
}
|
||||
|
||||
wu := endpoints.Update{
|
||||
wu := &endpoints.Update{
|
||||
Op: endpoints.Add,
|
||||
Key: "foo/a1",
|
||||
Endpoint: e1,
|
||||
@ -69,21 +67,21 @@ func TestEndpointManager(t *testing.T) {
|
||||
|
||||
err = em.DeleteEndpoint(context.TODO(), "foo/a1")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to udpate %v", err)
|
||||
t.Fatalf("failed to update %v", err)
|
||||
}
|
||||
|
||||
us = <-w
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get udpate %v", err)
|
||||
if us == nil {
|
||||
t.Fatal("failed to get update")
|
||||
}
|
||||
|
||||
wu = endpoints.Update{
|
||||
wu = &endpoints.Update{
|
||||
Op: endpoints.Delete,
|
||||
Key: "foo/a1",
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(us, wu) {
|
||||
t.Fatalf("up = %#v, want %#v", us[1], wu)
|
||||
if !reflect.DeepEqual(us[0], wu) {
|
||||
t.Fatalf("up = %#v, want %#v", us[0], wu)
|
||||
}
|
||||
}
|
||||
|
||||
@ -91,8 +89,6 @@ func TestEndpointManager(t *testing.T) {
|
||||
// correctly with multiple hosts and correctly receive multiple
|
||||
// updates in a single revision.
|
||||
func TestEndpointManagerAtomicity(t *testing.T) {
|
||||
t.Skip("Not implemented yet")
|
||||
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
|
@ -23,8 +23,8 @@ import (
|
||||
"google.golang.org/grpc"
|
||||
testpb "google.golang.org/grpc/test/grpc_testing"
|
||||
|
||||
etcdnaming "go.etcd.io/etcd/clientv3/naming"
|
||||
"go.etcd.io/etcd/clientv3/naming/endpoints"
|
||||
"go.etcd.io/etcd/clientv3/naming/resolver"
|
||||
"go.etcd.io/etcd/integration"
|
||||
grpctest "go.etcd.io/etcd/pkg/grpc_testing"
|
||||
"go.etcd.io/etcd/pkg/testutil"
|
||||
@ -63,10 +63,12 @@ func TestEtcdGrpcResolver(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal("failed to add foo", err)
|
||||
}
|
||||
r := &etcdnaming.GRPCResolver{Client: clus.Client(1)}
|
||||
b := grpc.RoundRobin(r)
|
||||
|
||||
conn, err := grpc.Dial("foo", grpc.WithInsecure(), grpc.WithBalancer(b))
|
||||
b, err := resolver.NewBuilder(clus.Client(1))
|
||||
if err != nil {
|
||||
t.Fatal("failed to new resolver builder", err)
|
||||
}
|
||||
conn, err := grpc.Dial("etcd:///foo", grpc.WithInsecure(), grpc.WithResolvers(b))
|
||||
if err != nil {
|
||||
t.Fatal("failed to connect to foo", err)
|
||||
}
|
||||
@ -106,7 +108,6 @@ func TestEtcdGrpcResolver(t *testing.T) {
|
||||
}
|
||||
t.Fatalf("unexpected response from foo (e2): %s", resp.GetPayload().GetBody())
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -53,7 +53,7 @@ type Update struct {
|
||||
}
|
||||
|
||||
// WatchChannel is used to deliver notifications about endpoints updates.
|
||||
type WatchChannel chan []*Update
|
||||
type WatchChannel <-chan []*Update
|
||||
|
||||
// Key2EndpointMap maps etcd key into struct describing the endpoint.
|
||||
type Key2EndpointMap map[string]Endpoint
|
||||
|
@ -20,9 +20,9 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
@ -89,73 +89,82 @@ func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts .
|
||||
}
|
||||
|
||||
func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) {
|
||||
return nil, fmt.Errorf("Not implemented yet")
|
||||
resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: Implementation to be inspired by:
|
||||
// Next gets the next set of updates from the etcd resolver.
|
||||
//// Calls to Next should be serialized; concurrent calls are not safe since
|
||||
//// there is no way to reconcile the update ordering.
|
||||
//func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
|
||||
// if gw.wch == nil {
|
||||
// // first Next() returns all addresses
|
||||
// return gw.firstNext()
|
||||
// }
|
||||
// if gw.err != nil {
|
||||
// return nil, gw.err
|
||||
// }
|
||||
//
|
||||
// // process new events on target/*
|
||||
// wr, ok := <-gw.wch
|
||||
// if !ok {
|
||||
// gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error())
|
||||
// return nil, gw.err
|
||||
// }
|
||||
// if gw.err = wr.Err(); gw.err != nil {
|
||||
// return nil, gw.err
|
||||
// }
|
||||
//
|
||||
// updates := make([]*naming.Update, 0, len(wr.Events))
|
||||
// for _, e := range wr.Events {
|
||||
// var jupdate naming.Update
|
||||
// var err error
|
||||
// switch e.Type {
|
||||
// case etcd.EventTypePut:
|
||||
// err = json.Unmarshal(e.Kv.Value, &jupdate)
|
||||
// jupdate.Op = naming.Add
|
||||
// case etcd.EventTypeDelete:
|
||||
// err = json.Unmarshal(e.PrevKv.Value, &jupdate)
|
||||
// jupdate.Op = naming.Delete
|
||||
// default:
|
||||
// continue
|
||||
// }
|
||||
// if err == nil {
|
||||
// updates = append(updates, &jupdate)
|
||||
// }
|
||||
// }
|
||||
// return updates, nil
|
||||
//}
|
||||
//
|
||||
//func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
|
||||
// // Use serialized request so resolution still works if the target etcd
|
||||
// // server is partitioned away from the quorum.
|
||||
// resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
|
||||
// if gw.err = err; err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
//
|
||||
// updates := make([]*naming.Update, 0, len(resp.Kvs))
|
||||
// for _, kv := range resp.Kvs {
|
||||
// var jupdate naming.Update
|
||||
// if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
|
||||
// continue
|
||||
// }
|
||||
// updates = append(updates, &jupdate)
|
||||
// }
|
||||
//
|
||||
// opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
|
||||
// gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
|
||||
// return updates, nil
|
||||
//}
|
||||
lg := m.client.GetLogger()
|
||||
initUpdates := make([]*Update, 0, len(resp.Kvs))
|
||||
for _, kv := range resp.Kvs {
|
||||
var iup internal.Update
|
||||
if err := json.Unmarshal(kv.Value, &iup); err != nil {
|
||||
lg.Warn("unmarshal endpoint update failed", zap.String("key", string(kv.Key)), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
up := &Update{
|
||||
Op: Add,
|
||||
Key: string(kv.Key),
|
||||
Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata},
|
||||
}
|
||||
initUpdates = append(initUpdates, up)
|
||||
}
|
||||
|
||||
upch := make(chan []*Update, 1)
|
||||
if len(initUpdates) > 0 {
|
||||
upch <- initUpdates
|
||||
}
|
||||
go m.watch(ctx, resp.Header.Revision+1, upch)
|
||||
return upch, nil
|
||||
}
|
||||
|
||||
func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Update) {
|
||||
defer close(upch)
|
||||
|
||||
lg := m.client.GetLogger()
|
||||
opts := []clientv3.OpOption{clientv3.WithRev(rev), clientv3.WithPrefix()}
|
||||
wch := m.client.Watch(ctx, m.target, opts...)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case wresp, ok := <-wch:
|
||||
if !ok {
|
||||
lg.Warn("watch closed", zap.String("target", m.target))
|
||||
return
|
||||
}
|
||||
if wresp.Err() != nil {
|
||||
lg.Warn("watch failed", zap.String("target", m.target), zap.Error(wresp.Err()))
|
||||
return
|
||||
}
|
||||
|
||||
deltaUps := make([]*Update, 0, len(wresp.Events))
|
||||
for _, e := range wresp.Events {
|
||||
var iup internal.Update
|
||||
var err error
|
||||
var op Operation
|
||||
switch e.Type {
|
||||
case clientv3.EventTypePut:
|
||||
err = json.Unmarshal(e.Kv.Value, &iup)
|
||||
op = Add
|
||||
if err != nil {
|
||||
lg.Warn("unmarshal endpoint update failed", zap.String("key", string(e.Kv.Key)), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
case clientv3.EventTypeDelete:
|
||||
iup = internal.Update{Op: internal.Delete}
|
||||
op = Delete
|
||||
default:
|
||||
continue
|
||||
}
|
||||
up := &Update{Op: op, Key: string(e.Kv.Key), Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata}}
|
||||
deltaUps = append(deltaUps, up)
|
||||
}
|
||||
if len(deltaUps) > 0 {
|
||||
upch <- deltaUps
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) {
|
||||
|
@ -15,25 +15,107 @@
|
||||
package resolver
|
||||
|
||||
import (
|
||||
"google.golang.org/grpc/resolver"
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
gresolver "google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"go.etcd.io/etcd/clientv3/naming/endpoints"
|
||||
)
|
||||
|
||||
type builder struct {
|
||||
// ...
|
||||
c *clientv3.Client
|
||||
}
|
||||
|
||||
func (b builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
|
||||
// To be implemented...
|
||||
// Using endpoints.NewWatcher() to subscribe for endpoints changes.
|
||||
return nil, nil
|
||||
func (b builder) Build(target gresolver.Target, cc gresolver.ClientConn, opts gresolver.BuildOptions) (gresolver.Resolver, error) {
|
||||
r := &resolver{
|
||||
c: b.c,
|
||||
target: target.Endpoint,
|
||||
cc: cc,
|
||||
}
|
||||
r.ctx, r.cancel = context.WithCancel(context.Background())
|
||||
|
||||
em, err := endpoints.NewManager(r.c, r.target)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "resolver: failed to new endpoint manager: %s", err)
|
||||
}
|
||||
r.wch, err = em.NewWatchChannel(r.ctx)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "resolver: failed to new watch channer: %s", err)
|
||||
}
|
||||
|
||||
r.wg.Add(1)
|
||||
go r.watch()
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (b builder) Scheme() string {
|
||||
return "etcd"
|
||||
}
|
||||
|
||||
func NewBuilder(client *clientv3.Client) (resolver.Builder, error) {
|
||||
return builder{}, nil
|
||||
// NewBuilder creates a resolver builder.
|
||||
func NewBuilder(client *clientv3.Client) (gresolver.Builder, error) {
|
||||
return builder{c: client}, nil
|
||||
}
|
||||
|
||||
type resolver struct {
|
||||
c *clientv3.Client
|
||||
target string
|
||||
cc gresolver.ClientConn
|
||||
wch endpoints.WatchChannel
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func (r *resolver) watch() {
|
||||
defer r.wg.Done()
|
||||
|
||||
allUps := make(map[string]*endpoints.Update)
|
||||
for {
|
||||
select {
|
||||
case <-r.ctx.Done():
|
||||
return
|
||||
case ups, ok := <-r.wch:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
for _, up := range ups {
|
||||
switch up.Op {
|
||||
case endpoints.Add:
|
||||
allUps[up.Key] = up
|
||||
case endpoints.Delete:
|
||||
delete(allUps, up.Key)
|
||||
}
|
||||
}
|
||||
|
||||
addrs := convertToGRPCAddress(allUps)
|
||||
r.cc.UpdateState(gresolver.State{Addresses: addrs})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func convertToGRPCAddress(ups map[string]*endpoints.Update) []gresolver.Address {
|
||||
var addrs []gresolver.Address
|
||||
for _, up := range ups {
|
||||
addr := gresolver.Address{
|
||||
Addr: up.Endpoint.Addr,
|
||||
Metadata: up.Endpoint.Metadata,
|
||||
}
|
||||
addrs = append(addrs, addr)
|
||||
}
|
||||
return addrs
|
||||
}
|
||||
|
||||
// ResolveNow is a no-op here.
|
||||
// It's just a hint, resolver can ignore this if it's not necessary.
|
||||
func (r *resolver) ResolveNow(gresolver.ResolveNowOptions) {}
|
||||
|
||||
func (r *resolver) Close() {
|
||||
r.cancel()
|
||||
r.wg.Wait()
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user