From 3aa5711dca702229ee79d26e923112fa61fb0bd1 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Mon, 19 Mar 2018 14:07:33 -0700 Subject: [PATCH] clientv3: move health balancer to "balancer" Signed-off-by: Gyuho Lee --- clientv3/balancer/doc.go | 16 ++ .../grpc1.7-health.go} | 164 +++++++++++------- .../grpc1.7-health_test.go} | 19 +- clientv3/client.go | 33 ++-- clientv3/retry.go | 8 +- 5 files changed, 152 insertions(+), 88 deletions(-) create mode 100644 clientv3/balancer/doc.go rename clientv3/{health_balancer.go => balancer/grpc1.7-health.go} (72%) rename clientv3/{health_balancer_test.go => balancer/grpc1.7-health_test.go} (90%) diff --git a/clientv3/balancer/doc.go b/clientv3/balancer/doc.go new file mode 100644 index 000000000..45af5e9d1 --- /dev/null +++ b/clientv3/balancer/doc.go @@ -0,0 +1,16 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package balancer implements client balancer. +package balancer diff --git a/clientv3/health_balancer.go b/clientv3/balancer/grpc1.7-health.go similarity index 72% rename from clientv3/health_balancer.go rename to clientv3/balancer/grpc1.7-health.go index ce447c7e9..7d24b93f6 100644 --- a/clientv3/health_balancer.go +++ b/clientv3/balancer/grpc1.7-health.go @@ -1,4 +1,4 @@ -// Copyright 2017 The etcd Authors +// Copyright 2018 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -package clientv3 +package balancer import ( "context" "errors" + "io/ioutil" "net/url" "strings" "sync" @@ -24,10 +25,14 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/grpclog" healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" ) +// TODO: replace with something better +var lg = grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard) + const ( minHealthRetryDuration = 3 * time.Second unknownService = "unknown service grpc.health.v1.Health" @@ -38,18 +43,16 @@ const ( // This error is returned only when opts.BlockingWait is true. var ErrNoAddrAvilable = status.Error(codes.Unavailable, "there is no address available") -type healthCheckFunc func(ep string) (bool, error) - -type notifyMsg int +type NotifyMsg int const ( - notifyReset notifyMsg = iota - notifyNext + NotifyReset NotifyMsg = iota + NotifyNext ) -// healthBalancer does the bare minimum to expose multiple eps +// GRPC17Health does the bare minimum to expose multiple eps // to the grpc reconnection code path -type healthBalancer struct { +type GRPC17Health struct { // addrs are the client's endpoint addresses for grpc addrs []grpc.Address @@ -64,7 +67,7 @@ type healthBalancer struct { readyOnce sync.Once // healthCheck checks an endpoint's health. - healthCheck healthCheckFunc + healthCheck func(ep string) (bool, error) healthCheckTimeout time.Duration unhealthyMu sync.RWMutex @@ -88,7 +91,7 @@ type healthBalancer struct { donec chan struct{} // updateAddrsC notifies updateNotifyLoop to update addrs. - updateAddrsC chan notifyMsg + updateAddrsC chan NotifyMsg // grpc issues TLS cert checks using the string passed into dial so // that string must be the host. To recover the full scheme://host URL, @@ -102,21 +105,29 @@ type healthBalancer struct { closed bool } -func newHealthBalancer(eps []string, timeout time.Duration, hc healthCheckFunc) *healthBalancer { +// DialFunc defines gRPC dial function. +type DialFunc func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) + +// NewGRPC17Health returns a new health balancer with gRPC v1.7. +func NewGRPC17Health( + eps []string, + timeout time.Duration, + dialFunc DialFunc, +) *GRPC17Health { notifyCh := make(chan []grpc.Address) addrs := eps2addrs(eps) - hb := &healthBalancer{ + hb := &GRPC17Health{ addrs: addrs, eps: eps, notifyCh: notifyCh, readyc: make(chan struct{}), - healthCheck: hc, + healthCheck: func(ep string) (bool, error) { return grpcHealthCheck(ep, dialFunc) }, unhealthyHostPorts: make(map[string]time.Time), upc: make(chan struct{}), stopc: make(chan struct{}), downc: make(chan struct{}), donec: make(chan struct{}), - updateAddrsC: make(chan notifyMsg), + updateAddrsC: make(chan NotifyMsg), hostPort2ep: getHostPort2ep(eps), } if timeout < minHealthRetryDuration { @@ -134,78 +145,81 @@ func newHealthBalancer(eps []string, timeout time.Duration, hc healthCheckFunc) return hb } -func (b *healthBalancer) Start(target string, config grpc.BalancerConfig) error { return nil } +func (b *GRPC17Health) Start(target string, config grpc.BalancerConfig) error { return nil } -func (b *healthBalancer) ConnectNotify() <-chan struct{} { +func (b *GRPC17Health) ConnectNotify() <-chan struct{} { b.mu.Lock() defer b.mu.Unlock() return b.upc } -func (b *healthBalancer) ready() <-chan struct{} { return b.readyc } +func (b *GRPC17Health) UpdateAddrsC() chan NotifyMsg { return b.updateAddrsC } +func (b *GRPC17Health) StopC() chan struct{} { return b.stopc } -func (b *healthBalancer) endpoint(hostPort string) string { +func (b *GRPC17Health) Ready() <-chan struct{} { return b.readyc } + +func (b *GRPC17Health) Endpoint(hostPort string) string { b.mu.RLock() defer b.mu.RUnlock() return b.hostPort2ep[hostPort] } -func (b *healthBalancer) pinned() string { +func (b *GRPC17Health) Pinned() string { b.mu.RLock() defer b.mu.RUnlock() return b.pinAddr } -func (b *healthBalancer) hostPortError(hostPort string, err error) { - if b.endpoint(hostPort) == "" { - lg.Lvl(4).Infof("clientv3/balancer: %q is stale (skip marking as unhealthy on %q)", hostPort, err.Error()) +func (b *GRPC17Health) HostPortError(hostPort string, err error) { + if b.Endpoint(hostPort) == "" { + lg.Infof("clientv3/balancer: %q is stale (skip marking as unhealthy on %q)", hostPort, err.Error()) return } b.unhealthyMu.Lock() b.unhealthyHostPorts[hostPort] = time.Now() b.unhealthyMu.Unlock() - lg.Lvl(4).Infof("clientv3/balancer: %q is marked unhealthy (%q)", hostPort, err.Error()) + lg.Infof("clientv3/balancer: %q is marked unhealthy (%q)", hostPort, err.Error()) } -func (b *healthBalancer) removeUnhealthy(hostPort, msg string) { - if b.endpoint(hostPort) == "" { - lg.Lvl(4).Infof("clientv3/balancer: %q was not in unhealthy (%q)", hostPort, msg) +func (b *GRPC17Health) removeUnhealthy(hostPort, msg string) { + if b.Endpoint(hostPort) == "" { + lg.Infof("clientv3/balancer: %q was not in unhealthy (%q)", hostPort, msg) return } b.unhealthyMu.Lock() delete(b.unhealthyHostPorts, hostPort) b.unhealthyMu.Unlock() - lg.Lvl(4).Infof("clientv3/balancer: %q is removed from unhealthy (%q)", hostPort, msg) + lg.Infof("clientv3/balancer: %q is removed from unhealthy (%q)", hostPort, msg) } -func (b *healthBalancer) countUnhealthy() (count int) { +func (b *GRPC17Health) countUnhealthy() (count int) { b.unhealthyMu.RLock() count = len(b.unhealthyHostPorts) b.unhealthyMu.RUnlock() return count } -func (b *healthBalancer) isUnhealthy(hostPort string) (unhealthy bool) { +func (b *GRPC17Health) isUnhealthy(hostPort string) (unhealthy bool) { b.unhealthyMu.RLock() _, unhealthy = b.unhealthyHostPorts[hostPort] b.unhealthyMu.RUnlock() return unhealthy } -func (b *healthBalancer) cleanupUnhealthy() { +func (b *GRPC17Health) cleanupUnhealthy() { b.unhealthyMu.Lock() for k, v := range b.unhealthyHostPorts { if time.Since(v) > b.healthCheckTimeout { delete(b.unhealthyHostPorts, k) - lg.Lvl(4).Infof("clientv3/balancer: removed %q from unhealthy after %v", k, b.healthCheckTimeout) + lg.Infof("clientv3/balancer: removed %q from unhealthy after %v", k, b.healthCheckTimeout) } } b.unhealthyMu.Unlock() } -func (b *healthBalancer) liveAddrs() ([]grpc.Address, map[string]struct{}) { +func (b *GRPC17Health) liveAddrs() ([]grpc.Address, map[string]struct{}) { unhealthyCnt := b.countUnhealthy() b.mu.RLock() @@ -231,15 +245,15 @@ func (b *healthBalancer) liveAddrs() ([]grpc.Address, map[string]struct{}) { return addrs, liveHostPorts } -func (b *healthBalancer) updateUnhealthy() { +func (b *GRPC17Health) updateUnhealthy() { for { select { case <-time.After(b.healthCheckTimeout): b.cleanupUnhealthy() - pinned := b.pinned() + pinned := b.Pinned() if pinned == "" || b.isUnhealthy(pinned) { select { - case b.updateAddrsC <- notifyNext: + case b.updateAddrsC <- NotifyNext: case <-b.stopc: return } @@ -250,7 +264,19 @@ func (b *healthBalancer) updateUnhealthy() { } } -func (b *healthBalancer) updateAddrs(eps ...string) { +// NeedUpdate returns true if all connections are down or +// addresses do not include current pinned address. +func (b *GRPC17Health) NeedUpdate() bool { + // updating notifyCh can trigger new connections, + // need update addrs if all connections are down + // or addrs does not include pinAddr. + b.mu.RLock() + update := !hasAddr(b.addrs, b.pinAddr) + b.mu.RUnlock() + return update +} + +func (b *GRPC17Health) UpdateAddrs(eps ...string) { np := getHostPort2ep(eps) b.mu.Lock() @@ -278,12 +304,12 @@ func (b *healthBalancer) updateAddrs(eps ...string) { b.unhealthyMu.Unlock() } -func (b *healthBalancer) next() { +func (b *GRPC17Health) Next() { b.mu.RLock() downc := b.downc b.mu.RUnlock() select { - case b.updateAddrsC <- notifyNext: + case b.updateAddrsC <- NotifyNext: case <-b.stopc: } // wait until disconnect so new RPCs are not issued on old connection @@ -293,7 +319,7 @@ func (b *healthBalancer) next() { } } -func (b *healthBalancer) updateNotifyLoop() { +func (b *GRPC17Health) updateNotifyLoop() { defer close(b.donec) for { @@ -320,7 +346,7 @@ func (b *healthBalancer) updateNotifyLoop() { default: } case downc == nil: - b.notifyAddrs(notifyReset) + b.notifyAddrs(NotifyReset) select { case <-upc: case msg := <-b.updateAddrsC: @@ -338,7 +364,7 @@ func (b *healthBalancer) updateNotifyLoop() { } select { case <-downc: - b.notifyAddrs(notifyReset) + b.notifyAddrs(NotifyReset) case msg := <-b.updateAddrsC: b.notifyAddrs(msg) case <-b.stopc: @@ -348,8 +374,8 @@ func (b *healthBalancer) updateNotifyLoop() { } } -func (b *healthBalancer) notifyAddrs(msg notifyMsg) { - if msg == notifyNext { +func (b *GRPC17Health) notifyAddrs(msg NotifyMsg) { + if msg == NotifyNext { select { case b.notifyCh <- []grpc.Address{}: case <-b.stopc: @@ -380,7 +406,7 @@ func (b *healthBalancer) notifyAddrs(msg notifyMsg) { } } -func (b *healthBalancer) Up(addr grpc.Address) func(error) { +func (b *GRPC17Health) Up(addr grpc.Address) func(error) { if !b.mayPin(addr) { return func(err error) {} } @@ -402,7 +428,7 @@ func (b *healthBalancer) Up(addr grpc.Address) func(error) { } if b.pinAddr != "" { - lg.Lvl(4).Infof("clientv3/balancer: %q is up but not pinned (already pinned %q)", addr.Addr, b.pinAddr) + lg.Infof("clientv3/balancer: %q is up but not pinned (already pinned %q)", addr.Addr, b.pinAddr) return func(err error) {} } @@ -410,7 +436,7 @@ func (b *healthBalancer) Up(addr grpc.Address) func(error) { close(b.upc) b.downc = make(chan struct{}) b.pinAddr = addr.Addr - lg.Lvl(4).Infof("clientv3/balancer: pin %q", addr.Addr) + lg.Infof("clientv3/balancer: pin %q", addr.Addr) // notify client that a connection is up b.readyOnce.Do(func() { close(b.readyc) }) @@ -420,19 +446,19 @@ func (b *healthBalancer) Up(addr grpc.Address) func(error) { // timeout will induce a network I/O error, and retrying until success; // finding healthy endpoint on retry could take several timeouts and redials. // To avoid wasting retries, gray-list unhealthy endpoints. - b.hostPortError(addr.Addr, err) + b.HostPortError(addr.Addr, err) b.mu.Lock() b.upc = make(chan struct{}) close(b.downc) b.pinAddr = "" b.mu.Unlock() - lg.Lvl(4).Infof("clientv3/balancer: unpin %q (%q)", addr.Addr, err.Error()) + lg.Infof("clientv3/balancer: unpin %q (%q)", addr.Addr, err.Error()) } } -func (b *healthBalancer) mayPin(addr grpc.Address) bool { - if b.endpoint(addr.Addr) == "" { // stale host:port +func (b *GRPC17Health) mayPin(addr grpc.Address) bool { + if b.Endpoint(addr.Addr) == "" { // stale host:port return false } @@ -454,7 +480,7 @@ func (b *healthBalancer) mayPin(addr grpc.Address) bool { // 3. grpc-healthcheck still SERVING, thus retry to pin // instead, return before grpc-healthcheck if failed within healthcheck timeout if elapsed := time.Since(failedTime); elapsed < b.healthCheckTimeout { - lg.Lvl(4).Infof("clientv3/balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, b.healthCheckTimeout) + lg.Infof("clientv3/balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, b.healthCheckTimeout) return false } @@ -463,11 +489,11 @@ func (b *healthBalancer) mayPin(addr grpc.Address) bool { return true } - b.hostPortError(addr.Addr, errors.New("health check failed")) + b.HostPortError(addr.Addr, errors.New("health check failed")) return false } -func (b *healthBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) { +func (b *GRPC17Health) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) { var ( addr string closed bool @@ -515,9 +541,9 @@ func (b *healthBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) return grpc.Address{Addr: addr}, func() {}, nil } -func (b *healthBalancer) Notify() <-chan []grpc.Address { return b.notifyCh } +func (b *GRPC17Health) Notify() <-chan []grpc.Address { return b.notifyCh } -func (b *healthBalancer) Close() error { +func (b *GRPC17Health) Close() error { b.mu.Lock() // In case gRPC calls close twice. TODO: remove the checking // when we are sure that gRPC wont call close twice. @@ -553,8 +579,8 @@ func (b *healthBalancer) Close() error { return nil } -func grpcHealthCheck(client *Client, ep string) (bool, error) { - conn, err := client.dial(ep) +func grpcHealthCheck(ep string, dialFunc func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error)) (bool, error) { + conn, err := dialFunc(ep) if err != nil { return false, err } @@ -607,3 +633,25 @@ func getHostPort2ep(eps []string) map[string]string { } return hm } + +func parseEndpoint(endpoint string) (proto string, host string, scheme string) { + proto = "tcp" + host = endpoint + url, uerr := url.Parse(endpoint) + if uerr != nil || !strings.Contains(endpoint, "://") { + return proto, host, scheme + } + scheme = url.Scheme + + // strip scheme:// prefix since grpc dials by host + host = url.Host + switch url.Scheme { + case "http", "https": + case "unix", "unixs": + proto = "unix" + host = url.Host + url.Path + default: + proto, host = "", "" + } + return proto, host, scheme +} diff --git a/clientv3/health_balancer_test.go b/clientv3/balancer/grpc1.7-health_test.go similarity index 90% rename from clientv3/health_balancer_test.go rename to clientv3/balancer/grpc1.7-health_test.go index dc9d28d0e..bf139a5b1 100644 --- a/clientv3/health_balancer_test.go +++ b/clientv3/balancer/grpc1.7-health_test.go @@ -1,4 +1,4 @@ -// Copyright 2017 The etcd Authors +// Copyright 2018 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package clientv3 +package balancer import ( "context" @@ -31,10 +31,10 @@ import ( var endpoints = []string{"localhost:2379", "localhost:22379", "localhost:32379"} func TestBalancerGetUnblocking(t *testing.T) { - hb := newHealthBalancer(endpoints, minHealthRetryDuration, func(string) (bool, error) { return true, nil }) + hb := NewGRPC17Health(endpoints, minHealthRetryDuration, func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { return nil, nil }) defer hb.Close() if addrs := <-hb.Notify(); len(addrs) != len(endpoints) { - t.Errorf("Initialize newHealthBalancer should have triggered Notify() chan, but it didn't") + t.Errorf("Initialize NewGRPC17Health should have triggered Notify() chan, but it didn't") } unblockingOpts := grpc.BalancerGetOptions{BlockingWait: false} @@ -75,10 +75,10 @@ func TestBalancerGetUnblocking(t *testing.T) { } func TestBalancerGetBlocking(t *testing.T) { - hb := newHealthBalancer(endpoints, minHealthRetryDuration, func(string) (bool, error) { return true, nil }) + hb := NewGRPC17Health(endpoints, minHealthRetryDuration, func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { return nil, nil }) defer hb.Close() if addrs := <-hb.Notify(); len(addrs) != len(endpoints) { - t.Errorf("Initialize newHealthBalancer should have triggered Notify() chan, but it didn't") + t.Errorf("Initialize NewGRPC17Health should have triggered Notify() chan, but it didn't") } blockingOpts := grpc.BalancerGetOptions{BlockingWait: true} @@ -166,15 +166,14 @@ func TestHealthBalancerGraylist(t *testing.T) { }() } - tf := func(s string) (bool, error) { return false, nil } - hb := newHealthBalancer(eps, 5*time.Second, tf) + hb := NewGRPC17Health(eps, 5*time.Second, func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { return nil, nil }) conn, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(hb)) testutil.AssertNil(t, err) defer conn.Close() kvc := pb.NewKVClient(conn) - <-hb.ready() + <-hb.Ready() kvc.Range(context.TODO(), &pb.RangeRequest{}) ep1 := <-connc @@ -200,7 +199,7 @@ func TestBalancerDoNotBlockOnClose(t *testing.T) { defer kcl.close() for i := 0; i < 5; i++ { - hb := newHealthBalancer(kcl.endpoints(), minHealthRetryDuration, func(string) (bool, error) { return true, nil }) + hb := NewGRPC17Health(kcl.endpoints(), minHealthRetryDuration, func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { return nil, nil }) conn, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(hb)) if err != nil { t.Fatal(err) diff --git a/clientv3/client.go b/clientv3/client.go index 01a93f5a3..b86931ac5 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -26,6 +26,7 @@ import ( "sync" "time" + "github.com/coreos/etcd/clientv3/balancer" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "google.golang.org/grpc" @@ -55,7 +56,7 @@ type Client struct { cfg Config creds *credentials.TransportCredentials - balancer *healthBalancer + balancer *balancer.GRPC17Health mu *sync.Mutex ctx context.Context @@ -122,18 +123,12 @@ func (c *Client) SetEndpoints(eps ...string) { c.mu.Lock() c.cfg.Endpoints = eps c.mu.Unlock() - c.balancer.updateAddrs(eps...) + c.balancer.UpdateAddrs(eps...) - // updating notifyCh can trigger new connections, - // need update addrs if all connections are down - // or addrs does not include pinAddr. - c.balancer.mu.RLock() - update := !hasAddr(c.balancer.addrs, c.balancer.pinAddr) - c.balancer.mu.RUnlock() - if update { + if c.balancer.NeedUpdate() { select { - case c.balancer.updateAddrsC <- notifyNext: - case <-c.balancer.stopc: + case c.balancer.UpdateAddrsC() <- balancer.NotifyNext: + case <-c.balancer.StopC(): } } } @@ -245,7 +240,7 @@ func (c *Client) dialSetupOpts(endpoint string, dopts ...grpc.DialOption) (opts opts = append(opts, dopts...) f := func(host string, t time.Duration) (net.Conn, error) { - proto, host, _ := parseEndpoint(c.balancer.endpoint(host)) + proto, host, _ := parseEndpoint(c.balancer.Endpoint(host)) if host == "" && endpoint != "" { // dialing an endpoint not in the balancer; use // endpoint passed into dial @@ -412,9 +407,7 @@ func newClient(cfg *Config) (*Client, error) { client.callOpts = callOpts } - client.balancer = newHealthBalancer(cfg.Endpoints, cfg.DialTimeout, func(ep string) (bool, error) { - return grpcHealthCheck(client, ep) - }) + client.balancer = balancer.NewGRPC17Health(cfg.Endpoints, cfg.DialTimeout, client.dial) // use Endpoints[0] so that for https:// without any tls config given, then // grpc will assume the certificate server name is the endpoint host. @@ -431,7 +424,7 @@ func newClient(cfg *Config) (*Client, error) { hasConn := false waitc := time.After(cfg.DialTimeout) select { - case <-client.balancer.ready(): + case <-client.balancer.Ready(): hasConn = true case <-ctx.Done(): case <-waitc: @@ -561,3 +554,11 @@ func canceledByCaller(stopCtx context.Context, err error) bool { return err == context.Canceled || err == context.DeadlineExceeded } + +func getHost(ep string) string { + url, uerr := url.Parse(ep) + if uerr != nil || !strings.Contains(ep, "://") { + return ep + } + return url.Host +} diff --git a/clientv3/retry.go b/clientv3/retry.go index 13b5b9d67..6226d787e 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -91,7 +91,7 @@ func (c *Client) newRetryWrapper() retryRPCFunc { if err := readyWait(rpcCtx, c.ctx, c.balancer.ConnectNotify()); err != nil { return err } - pinned := c.balancer.pinned() + pinned := c.balancer.Pinned() err := f(rpcCtx) if err == nil { return nil @@ -100,8 +100,8 @@ func (c *Client) newRetryWrapper() retryRPCFunc { if s, ok := status.FromError(err); ok && (s.Code() == codes.Unavailable || s.Code() == codes.DeadlineExceeded || s.Code() == codes.Internal) { // mark this before endpoint switch is triggered - c.balancer.hostPortError(pinned, err) - c.balancer.next() + c.balancer.HostPortError(pinned, err) + c.balancer.Next() lg.Lvl(4).Infof("clientv3/retry: switching from %q due to error %q", pinned, err.Error()) } @@ -115,7 +115,7 @@ func (c *Client) newRetryWrapper() retryRPCFunc { func (c *Client) newAuthRetryWrapper(retryf retryRPCFunc) retryRPCFunc { return func(rpcCtx context.Context, f rpcFunc, rp retryPolicy) error { for { - pinned := c.balancer.pinned() + pinned := c.balancer.Pinned() err := retryf(rpcCtx, f, rp) if err == nil { return nil