From fbed568b6ab4af24e9389ed2006df87055a1e0cc Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Sat, 7 Oct 2017 06:22:56 -0700 Subject: [PATCH] clientv3/balancer: mark partitioned member as unhealthy Previous behavior is when server returns errors, retry wrapper does not do anything, while passively expecting balancer to gray-list the isolated endpoint. This is problematic when multiple endpoints are passed, and network partition happens. This patch adds 'endpointError' method to 'balancer' interface to actively(possibly even before health-check API gets called) handle RPC errors and gray-list endpoints for the time being, thus speeding up the endpoint switch. This is safe in a single-endpoint case, because balancer will retry no matter what in such case. Signed-off-by: Gyu-Ho Lee --- clientv3/balancer.go | 4 ++++ clientv3/health_balancer.go | 9 +++++++++ clientv3/retry.go | 2 ++ 3 files changed, 15 insertions(+) diff --git a/clientv3/balancer.go b/clientv3/balancer.go index 35d8b3a4a..cf7419b54 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -44,6 +44,8 @@ type balancer interface { endpoints() []string // pinned returns the current pinned endpoint. pinned() string + // endpointError handles error from server-side. + endpointError(addr string, err error) // up is Up but includes whether the balancer will use the connection. up(addr grpc.Address) (func(error), bool) @@ -150,6 +152,8 @@ func (b *simpleBalancer) pinned() string { return b.pinAddr } +func (b *simpleBalancer) endpointError(addr string, err error) { return } + func getHost2ep(eps []string) map[string]string { hm := make(map[string]string, len(eps)) for i := range eps { diff --git a/clientv3/health_balancer.go b/clientv3/health_balancer.go index f3be1b64e..bac9a69f6 100644 --- a/clientv3/health_balancer.go +++ b/clientv3/health_balancer.go @@ -177,6 +177,15 @@ func (hb *healthBalancer) liveAddrs() []grpc.Address { return addrs } +func (hb *healthBalancer) endpointError(addr string, err error) { + hb.mu.Lock() + hb.unhealthy[addr] = time.Now() + hb.mu.Unlock() + if logger.V(4) { + logger.Infof("clientv3/health-balancer: marking %s as unhealthy (%v)", addr, err) + } +} + func (hb *healthBalancer) mayPin(addr grpc.Address) bool { hb.mu.RLock() skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0 diff --git a/clientv3/retry.go b/clientv3/retry.go index d6681441e..d33fff92d 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -66,6 +66,8 @@ func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc { if logger.V(4) { logger.Infof("clientv3/retry: error %v on pinned endpoint %s", err, pinned) } + // mark this before endpoint switch is triggered + c.balancer.endpointError(pinned, err) notify := c.balancer.ConnectNotify() if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable { c.balancer.next()