diff --git a/clientv3/balancer/balancer.go b/clientv3/balancer/balancer.go index 2bce6c8de..c2533e870 100644 --- a/clientv3/balancer/balancer.go +++ b/clientv3/balancer/balancer.go @@ -16,7 +16,6 @@ package balancer import ( "fmt" - "strings" "sync" "github.com/coreos/etcd/clientv3/balancer/picker" @@ -44,9 +43,6 @@ type Balancer interface { // Picker calls "Pick" for every client request. picker.Picker - - // SetEndpoints updates client's endpoints. - SetEndpoints(eps ...string) } type baseBalancer struct { @@ -56,8 +52,6 @@ type baseBalancer struct { mu sync.RWMutex - eps []string - addrToSc map[resolver.Address]balancer.SubConn scToAddr map[balancer.SubConn]resolver.Address scToSt map[balancer.SubConn]connectivity.State @@ -70,20 +64,12 @@ type baseBalancer struct { } // New returns a new balancer from specified picker policy. -func New(cfg Config) (Balancer, error) { - for _, ep := range cfg.Endpoints { - if !strings.HasPrefix(ep, "endpoint://") { - return nil, fmt.Errorf("'endpoint' target schema required for etcd load balancer endpoints but got '%s'", ep) - } - } - +func New(cfg Config) Balancer { bb := &baseBalancer{ policy: cfg.Policy, name: cfg.Policy.String(), lg: cfg.Logger, - eps: cfg.Endpoints, - addrToSc: make(map[resolver.Address]balancer.SubConn), scToAddr: make(map[balancer.SubConn]resolver.Address), scToSt: make(map[balancer.SubConn]connectivity.State), @@ -107,7 +93,7 @@ func New(cfg Config) (Balancer, error) { zap.String("policy", bb.policy.String()), zap.String("name", bb.name), ) - return bb, nil + return bb } // Name implements "grpc/balancer.Builder" interface. @@ -265,15 +251,6 @@ func (bb *baseBalancer) regeneratePicker() { ) } -// SetEndpoints updates client's endpoints. -// TODO: implement this -func (bb *baseBalancer) SetEndpoints(eps ...string) { - addrs := epsToAddrs(eps...) - bb.mu.Lock() - bb.Picker.UpdateAddrs(addrs) - bb.mu.Unlock() -} - // Close implements "grpc/balancer.Balancer" interface. // Close is a nop because base balancer doesn't have internal state to clean up, // and it doesn't need to call RemoveSubConn for the SubConns. diff --git a/clientv3/balancer/balancer_test.go b/clientv3/balancer/balancer_test.go index 80243d963..5de5b5930 100644 --- a/clientv3/balancer/balancer_test.go +++ b/clientv3/balancer/balancer_test.go @@ -68,16 +68,12 @@ func TestRoundRobinBalancedResolvableNoFailover(t *testing.T) { rsv.InitialAddrs(resolvedAddrs) cfg := Config{ - Policy: picker.RoundrobinBalanced, - Name: genName(), - Logger: zap.NewExample(), - Endpoints: []string{fmt.Sprintf("endpoint://nofailover/*")}, + Policy: picker.RoundrobinBalanced, + Name: genName(), + Logger: zap.NewExample(), } - rrb, err := New(cfg) - if err != nil { - t.Fatalf("failed to create builder: %v", err) - } - conn, err := grpc.Dial(cfg.Endpoints[0], grpc.WithInsecure(), grpc.WithBalancerName(rrb.Name())) + rrb := New(cfg) + conn, err := grpc.Dial(fmt.Sprintf("endpoint://nofailover/*"), grpc.WithInsecure(), grpc.WithBalancerName(rrb.Name())) if err != nil { t.Fatalf("failed to dial mock server: %v", err) } @@ -134,16 +130,12 @@ func TestRoundRobinBalancedResolvableFailoverFromServerFail(t *testing.T) { rsv.InitialAddrs(resolvedAddrs) cfg := Config{ - Policy: picker.RoundrobinBalanced, - Name: genName(), - Logger: zap.NewExample(), - Endpoints: []string{fmt.Sprintf("endpoint://serverfail/mock.server")}, + Policy: picker.RoundrobinBalanced, + Name: genName(), + Logger: zap.NewExample(), } - rrb, err := New(cfg) - if err != nil { - t.Fatalf("failed to create builder: %v", err) - } - conn, err := grpc.Dial(cfg.Endpoints[0], grpc.WithInsecure(), grpc.WithBalancerName(rrb.Name())) + rrb := New(cfg) + conn, err := grpc.Dial(fmt.Sprintf("endpoint://serverfail/mock.server"), grpc.WithInsecure(), grpc.WithBalancerName(rrb.Name())) if err != nil { t.Fatalf("failed to dial mock server: %s", err) } @@ -251,16 +243,12 @@ func TestRoundRobinBalancedResolvableFailoverFromRequestFail(t *testing.T) { rsv.InitialAddrs(resolvedAddrs) cfg := Config{ - Policy: picker.RoundrobinBalanced, - Name: genName(), - Logger: zap.NewExample(), - Endpoints: []string{fmt.Sprintf("endpoint://requestfail/mock.server")}, + Policy: picker.RoundrobinBalanced, + Name: genName(), + Logger: zap.NewExample(), } - rrb, err := New(cfg) - if err != nil { - t.Fatalf("failed to create builder: %v", err) - } - conn, err := grpc.Dial(cfg.Endpoints[0], grpc.WithInsecure(), grpc.WithBalancerName(rrb.Name())) + rrb := New(cfg) + conn, err := grpc.Dial(fmt.Sprintf("endpoint://requestfail/mock.server"), grpc.WithInsecure(), grpc.WithBalancerName(rrb.Name())) if err != nil { t.Fatalf("failed to dial mock server: %s", err) } diff --git a/clientv3/balancer/config.go b/clientv3/balancer/config.go index 5c649e220..2156984df 100644 --- a/clientv3/balancer/config.go +++ b/clientv3/balancer/config.go @@ -33,7 +33,4 @@ type Config struct { // Logger configures balancer logging. // If nil, logs are discarded. Logger *zap.Logger - - // Endpoints is a list of server endpoints. - Endpoints []string } diff --git a/clientv3/balancer/picker/err.go b/clientv3/balancer/picker/err.go index 281f453fc..c70ce158b 100644 --- a/clientv3/balancer/picker/err.go +++ b/clientv3/balancer/picker/err.go @@ -18,7 +18,6 @@ import ( "context" "google.golang.org/grpc/balancer" - "google.golang.org/grpc/resolver" ) // NewErr returns a picker that always returns err on "Pick". @@ -33,7 +32,3 @@ type errPicker struct { func (p *errPicker) Pick(context.Context, balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { return nil, nil, p.err } - -func (p *errPicker) UpdateAddrs(addrs []resolver.Address) { - return -} diff --git a/clientv3/balancer/picker/picker.go b/clientv3/balancer/picker/picker.go index 93412e2af..7ea761bdb 100644 --- a/clientv3/balancer/picker/picker.go +++ b/clientv3/balancer/picker/picker.go @@ -16,16 +16,9 @@ package picker import ( "google.golang.org/grpc/balancer" - "google.golang.org/grpc/resolver" ) // Picker defines balancer Picker methods. type Picker interface { balancer.Picker - - // UpdateAddrs updates current endpoints in picker. - // Used when endpoints are updated manually. - // TODO: handle resolver target change - // TODO: handle resolved addresses change - UpdateAddrs(addrs []resolver.Address) } diff --git a/clientv3/balancer/picker/roundrobin_balanced.go b/clientv3/balancer/picker/roundrobin_balanced.go index 9175562a2..b043d572d 100644 --- a/clientv3/balancer/picker/roundrobin_balanced.go +++ b/clientv3/balancer/picker/roundrobin_balanced.go @@ -48,8 +48,6 @@ type rrBalanced struct { addrToSc map[resolver.Address]balancer.SubConn scToAddr map[balancer.SubConn]resolver.Address - - updateAddrs func(addrs []resolver.Address) } // Pick is called for every client request. @@ -92,14 +90,3 @@ func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickOptions) (bala } return sc, doneFunc, nil } - -// UpdateAddrs -// TODO: implement this -func (rb *rrBalanced) UpdateAddrs(addrs []resolver.Address) { - rb.mu.Lock() - // close all resolved sub-connections first - for _, sc := range rb.scs { - sc.UpdateAddresses([]resolver.Address{}) - } - rb.mu.Unlock() -} diff --git a/clientv3/balancer/resolver/endpoint/endpoint.go b/clientv3/balancer/resolver/endpoint/endpoint.go index 617f11f23..78950e5da 100644 --- a/clientv3/balancer/resolver/endpoint/endpoint.go +++ b/clientv3/balancer/resolver/endpoint/endpoint.go @@ -165,6 +165,10 @@ func (r *Resolver) Close() { bldr.removeResolver(r) } +func (r *Resolver) Target(endpoint string) string { + return fmt.Sprintf("%s://%s/%s", scheme, r.clusterName, endpoint) +} + // Parse endpoint parses a endpoint of the form (http|https)://*|(unix|unixs)://) and returns a // protocol ('tcp' or 'unix'), host (or filepath if a unix socket) and scheme (http, https, unix, unixs). func ParseEndpoint(endpoint string) (proto string, host string, scheme string) { diff --git a/clientv3/client.go b/clientv3/client.go index 794d15928..4075a6c7f 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -44,8 +44,18 @@ import ( var ( ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints") ErrOldCluster = errors.New("etcdclient: old cluster version") + + defaultBalancer balancer.Balancer ) +func init() { + defaultBalancer = balancer.New(balancer.Config{ + Policy: picker.RoundrobinBalanced, + Name: fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String()), + Logger: zap.NewNop(), // zap.NewExample(), + }) +} + // Client provides and manages an etcd v3 client session. type Client struct { Cluster @@ -112,6 +122,9 @@ func (c *Client) Close() error { if c.conn != nil { return toErr(c.ctx, c.conn.Close()) } + if c.resolver != nil { + c.resolver.Close() + } return c.ctx.Err() } @@ -281,8 +294,8 @@ func (c *Client) dialSetupOpts(target string, dopts ...grpc.DialOption) (opts [] } // Dial connects to a single endpoint using the client's config. -func (c *Client) Dial(ep string) (*grpc.ClientConn, error) { - return c.dial(ep) +func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) { + return c.dial(endpoint) } func (c *Client) getToken(ctx context.Context) error { @@ -294,7 +307,7 @@ func (c *Client) getToken(ctx context.Context) error { host := getHost(endpoint) // use dial options without dopts to avoid reusing the client balancer var dOpts []grpc.DialOption - dOpts, err = c.dialSetupOpts(endpoint) + dOpts, err = c.dialSetupOpts(c.resolver.Target(endpoint)) if err != nil { continue } @@ -420,28 +433,23 @@ func newClient(cfg *Config) (*Client, error) { client.callOpts = callOpts } - rsv := endpoint.EndpointResolver("default") + clientId := fmt.Sprintf("client-%s", strconv.FormatInt(time.Now().UnixNano(), 36)) + rsv := endpoint.EndpointResolver(clientId) rsv.InitialEndpoints(cfg.Endpoints) - bCfg := balancer.Config{ - Policy: picker.RoundrobinBalanced, - Name: "rrbalancer", - Logger: zap.NewExample(), // zap.NewNop(), - // TODO: these are really "targets", not "endpoints" - Endpoints: []string{fmt.Sprintf("endpoint://default/%s", cfg.Endpoints[0])}, - } - rrb, err := balancer.New(bCfg) - if err != nil { - return nil, err + + targets := []string{} + for _, ep := range cfg.Endpoints { + targets = append(targets, fmt.Sprintf("endpoint://%s/%s", clientId, ep)) } + client.resolver = rsv - client.balancer = rrb + client.balancer = defaultBalancer // TODO: allow alternate balancers to be passed in via config? // use Endpoints[0] so that for https:// without any tls config given, then // grpc will assume the certificate server name is the endpoint host. - conn, err := client.dial(bCfg.Endpoints[0], grpc.WithBalancerName(rrb.Name())) + conn, err := client.dial(targets[0], grpc.WithBalancerName(client.balancer.Name())) if err != nil { client.cancel() - client.balancer.Close() rsv.Close() return nil, err }