diff --git a/clientv3/balancer/balancer.go b/clientv3/balancer/balancer.go index 15bc48cbe..8356aaf00 100644 --- a/clientv3/balancer/balancer.go +++ b/clientv3/balancer/balancer.go @@ -16,7 +16,9 @@ package balancer import ( "fmt" + "strconv" "sync" + "time" "github.com/coreos/etcd/clientv3/balancer/picker" @@ -50,6 +52,7 @@ type builder struct { // Then, resolved addreses will be handled via "HandleResolvedAddrs". func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { bb := &baseBalancer{ + id: strconv.FormatInt(time.Now().UnixNano(), 36), policy: b.cfg.Policy, name: b.cfg.Policy.String(), lg: b.cfg.Logger, @@ -78,6 +81,7 @@ func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balan bb.lg.Info( "built balancer", + zap.String("balancer-id", bb.id), zap.String("policy", bb.policy.String()), zap.String("resolver-target", cc.Target()), ) @@ -102,6 +106,7 @@ type Balancer interface { } type baseBalancer struct { + id string policy picker.Policy name string lg *zap.Logger @@ -123,10 +128,10 @@ type baseBalancer struct { // gRPC sends initial or updated resolved addresses from "Build". func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { if err != nil { - bb.lg.Warn("HandleResolvedAddrs called with error", zap.Error(err)) + bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err)) return } - bb.lg.Info("resolved", zap.Strings("addresses", addrsToStrings(addrs))) + bb.lg.Info("resolved", zap.String("balancer-id", bb.id), zap.Strings("addresses", addrsToStrings(addrs))) bb.mu.Lock() defer bb.mu.Unlock() @@ -137,7 +142,7 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) if _, ok := bb.addrToSc[addr]; !ok { sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{}) if err != nil { - bb.lg.Warn("NewSubConn failed", zap.Error(err), zap.String("address", addr.Addr)) + bb.lg.Warn("NewSubConn failed", zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr)) continue } bb.addrToSc[addr] = sc @@ -155,6 +160,7 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) bb.lg.Info( "removed subconn", + zap.String("balancer-id", bb.id), zap.String("address", addr.Addr), zap.String("subconn", scToString(sc)), ) @@ -176,6 +182,7 @@ func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connecti if !ok { bb.lg.Warn( "state change for an unknown subconn", + zap.String("balancer-id", bb.id), zap.String("subconn", scToString(sc)), zap.String("state", s.String()), ) @@ -184,6 +191,7 @@ func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connecti bb.lg.Info( "state changed", + zap.String("balancer-id", bb.id), zap.Bool("connected", s == connectivity.Ready), zap.String("subconn", scToString(sc)), zap.String("address", bb.scToAddr[sc].Addr), @@ -221,6 +229,11 @@ func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connecti func (bb *baseBalancer) regeneratePicker() { if bb.currentState == connectivity.TransientFailure { + bb.lg.Info( + "generated transient error picker", + zap.String("balancer-id", bb.id), + zap.String("policy", bb.policy.String()), + ) bb.Picker = picker.NewErr(balancer.ErrTransientFailure) return } @@ -247,6 +260,7 @@ func (bb *baseBalancer) regeneratePicker() { bb.lg.Info( "generated picker", + zap.String("balancer-id", bb.id), zap.String("policy", bb.policy.String()), zap.Strings("subconn-ready", scsToStrings(addrToSc)), zap.Int("subconn-size", len(addrToSc)), diff --git a/clientv3/balancer/resolver/endpoint/endpoint.go b/clientv3/balancer/resolver/endpoint/endpoint.go index 9bb6c19ff..75a7e8b74 100644 --- a/clientv3/balancer/resolver/endpoint/endpoint.go +++ b/clientv3/balancer/resolver/endpoint/endpoint.go @@ -100,7 +100,6 @@ type Resolver struct { clusterName string cc resolver.ClientConn addrs []resolver.Address - hostToAddr map[string]resolver.Address sync.RWMutex } @@ -108,7 +107,6 @@ type Resolver struct { func (r *Resolver) InitialAddrs(addrs []resolver.Address) { r.Lock() r.addrs = addrs - r.hostToAddr = keyAddrsByHost(addrs) r.Unlock() } @@ -133,37 +131,13 @@ func epsToAddrs(eps ...string) (addrs []resolver.Address) { } // NewAddress updates the addresses of the resolver. -func (r *Resolver) NewAddress(addrs []resolver.Address) error { - if r.cc == nil { - return fmt.Errorf("resolver not yet built, use InitialAddrs to provide initialization endpoints") - } +func (r *Resolver) NewAddress(addrs []resolver.Address) { r.Lock() r.addrs = addrs - r.hostToAddr = keyAddrsByHost(addrs) r.Unlock() - r.cc.NewAddress(addrs) - return nil -} - -func keyAddrsByHost(addrs []resolver.Address) map[string]resolver.Address { - // TODO: etcd may be is running on multiple ports on the same host, what to do? Keep a list of addresses? - byHost := make(map[string]resolver.Address, len(addrs)) - for _, addr := range addrs { - _, host, _ := ParseEndpoint(addr.Addr) - byHost[host] = addr + if r.cc != nil { + r.cc.NewAddress(addrs) } - return byHost -} - -// Endpoint get the resolver address for the host, if any. -func (r *Resolver) Endpoint(host string) string { - var addr string - r.RLock() - if a, ok := r.hostToAddr[host]; ok { - addr = a.Addr - } - r.RUnlock() - return addr } func (*Resolver) ResolveNow(o resolver.ResolveNowOption) {} diff --git a/clientv3/client.go b/clientv3/client.go index 901ab4fb0..2c5736368 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -143,17 +143,15 @@ func (c *Client) Endpoints() (eps []string) { // SetEndpoints updates client's endpoints. func (c *Client) SetEndpoints(eps ...string) { - c.mu.Lock() - c.cfg.Endpoints = eps - c.mu.Unlock() - var addrs []resolver.Address for _, ep := range eps { addrs = append(addrs, resolver.Address{Addr: ep}) } + c.mu.Lock() + defer c.mu.Unlock() + c.cfg.Endpoints = eps c.resolver.NewAddress(addrs) - // TODO: Does the new grpc balancer provide a way to block until the endpoint changes are propagated? /*if c.balancer.NeedUpdate() { select { @@ -252,9 +250,8 @@ func (c *Client) dialSetupOpts(target string, dopts ...grpc.DialOption) (opts [] } opts = append(opts, dopts...) - f := func(host string, t time.Duration) (net.Conn, error) { - // TODO: eliminate this ParseEndpoint call, the endpoint is already parsed by the resolver. - proto, host, _ := endpoint.ParseEndpoint(c.resolver.Endpoint(host)) + f := func(dialEp string, t time.Duration) (net.Conn, error) { + proto, host, _ := endpoint.ParseEndpoint(dialEp) if host == "" && ep != "" { // dialing an endpoint not in the balancer; use // endpoint passed into dial diff --git a/clientv3/ordering/kv_test.go b/clientv3/ordering/kv_test.go index 9e884f3b2..ebe802334 100644 --- a/clientv3/ordering/kv_test.go +++ b/clientv3/ordering/kv_test.go @@ -82,6 +82,7 @@ func TestDetectKvOrderViolation(t *testing.T) { clus.Members[2].Restart(t) // force OrderingKv to query the third member cli.SetEndpoints(clus.Members[2].GRPCAddr()) + time.Sleep(2 * time.Second) // FIXME: Figure out how pause SetEndpoints sufficiently that this is not needed _, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable()) if err != errOrderViolation { @@ -147,7 +148,7 @@ func TestDetectTxnOrderViolation(t *testing.T) { clus.Members[2].Restart(t) // force OrderingKv to query the third member cli.SetEndpoints(clus.Members[2].GRPCAddr()) - + time.Sleep(2 * time.Second) // FIXME: Figure out how pause SetEndpoints sufficiently that this is not needed _, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable()) if err != errOrderViolation { t.Fatalf("expected %v, got %v", errOrderViolation, err)