diff --git a/clientv3/balancer.go b/clientv3/balancer.go index a54e78622..ef40cd33d 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -263,9 +263,28 @@ func (b *simpleBalancer) notifyAddrs(msg notifyMsg) { } b.mu.RLock() addrs := b.addrs + pinAddr := b.pinAddr + downc := b.downc b.mu.RUnlock() + + var waitDown bool + if pinAddr != "" { + waitDown = true + for _, a := range addrs { + if a.Addr == pinAddr { + waitDown = false + } + } + } + select { case b.notifyCh <- addrs: + if waitDown { + select { + case <-downc: + case <-b.stopc: + } + } case <-b.stopc: } } diff --git a/clientv3/health_balancer.go b/clientv3/health_balancer.go index d1846b151..8f4ba08ae 100644 --- a/clientv3/health_balancer.go +++ b/clientv3/health_balancer.go @@ -93,13 +93,8 @@ func (hb *healthBalancer) Up(addr grpc.Address) func(error) { // timeout will induce a network I/O error, and retrying until success; // finding healthy endpoint on retry could take several timeouts and redials. // To avoid wasting retries, gray-list unhealthy endpoints. - hb.mu.Lock() - hb.unhealthy[addr.Addr] = time.Now() - hb.mu.Unlock() + hb.hostPortError(addr.Addr, err) f(err) - if logger.V(4) { - logger.Infof("clientv3/health-balancer: %q becomes unhealthy (%q)", addr.Addr, err.Error()) - } } } @@ -143,13 +138,6 @@ func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) { case <-time.After(timeout): hb.mu.Lock() for k, v := range hb.unhealthy { - if _, ok := hb.hostPort2ep[k]; !ok { - delete(hb.unhealthy, k) - if logger.V(4) { - logger.Infof("clientv3/health-balancer: removes stale host:port %q from unhealthy", k) - } - continue - } if time.Since(v) > timeout { delete(hb.unhealthy, k) if logger.V(4) { @@ -187,11 +175,13 @@ func (hb *healthBalancer) liveAddrs() []grpc.Address { func (hb *healthBalancer) hostPortError(hostPort string, err error) { hb.mu.Lock() - hb.unhealthy[hostPort] = time.Now() - hb.mu.Unlock() - if logger.V(4) { - logger.Infof("clientv3/health-balancer: marking %q as unhealthy (%q)", hostPort, err.Error()) + if _, ok := hb.hostPort2ep[hostPort]; ok { + hb.unhealthy[hostPort] = time.Now() + if logger.V(4) { + logger.Infof("clientv3/health-balancer: marking %q as unhealthy (%q)", hostPort, err.Error()) + } } + hb.mu.Unlock() } func (hb *healthBalancer) mayPin(addr grpc.Address) bool { diff --git a/clientv3/retry.go b/clientv3/retry.go index 2f2af774c..e6d17d032 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -32,7 +32,7 @@ type retryStopErrFunc func(error) bool func isRepeatableStopError(err error) bool { eErr := rpctypes.Error(err) // always stop retry on etcd errors - if _, ok := eErr.(rpctypes.EtcdError); ok { + if serverErr, ok := eErr.(rpctypes.EtcdError); ok && serverErr.Code() != codes.Unavailable { return true } // only retry if unavailable @@ -62,11 +62,16 @@ func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRPCFunc { if logger.V(4) { logger.Infof("clientv3/retry: error %q on pinned endpoint %q", err.Error(), pinned) } - // mark this before endpoint switch is triggered - c.balancer.hostPortError(pinned, err) - if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable { + + if s, ok := status.FromError(err); ok && (s.Code() == codes.Unavailable || s.Code() == codes.DeadlineExceeded || s.Code() == codes.Internal) { + // mark this before endpoint switch is triggered + c.balancer.hostPortError(pinned, err) c.balancer.next() + if logger.V(4) { + logger.Infof("clientv3/retry: switching from %q due to error %q", pinned, err.Error()) + } } + if isStop(err) { return err }