From 1704443c6de8e42c0294fe8f9e9e42f0a0b4245d Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Sat, 7 Oct 2017 06:16:40 -0700 Subject: [PATCH 1/5] clientv3: only health-check when timeout elapses since last failure Otherwise network-partitioned member with active health-check server would not be gray-listed, making health-balancer stuck with isolated endpoint. Also clarifies some log messages. Signed-off-by: Gyu-Ho Lee --- clientv3/health_balancer.go | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/clientv3/health_balancer.go b/clientv3/health_balancer.go index ff9cc010c..f3be1b64e 100644 --- a/clientv3/health_balancer.go +++ b/clientv3/health_balancer.go @@ -36,7 +36,8 @@ type healthBalancer struct { balancer // healthCheck checks an endpoint's health. - healthCheck healthCheckFunc + healthCheck healthCheckFunc + healthCheckTimeout time.Duration // mu protects addrs, eps, unhealthy map, and stopc. mu sync.RWMutex @@ -71,6 +72,7 @@ func newHealthBalancer(b balancer, timeout time.Duration, hc healthCheckFunc) *h if timeout < minHealthRetryDuration { timeout = minHealthRetryDuration } + hb.healthCheckTimeout = timeout hb.wg.Add(1) go func() { @@ -95,6 +97,9 @@ func (hb *healthBalancer) Up(addr grpc.Address) func(error) { hb.unhealthy[addr.Addr] = time.Now() hb.mu.Unlock() f(err) + if logger.V(4) { + logger.Infof("clientv3/health-balancer: %s becomes unhealthy (%v)", addr.Addr, err) + } } } @@ -140,7 +145,7 @@ func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) { if time.Since(v) > timeout { delete(hb.unhealthy, k) if logger.V(4) { - logger.Infof("clientv3/balancer: removes %s from unhealthy after %v", k, timeout) + logger.Infof("clientv3/health-balancer: removes %s from unhealthy after %v", k, timeout) } } } @@ -175,17 +180,29 @@ func (hb *healthBalancer) liveAddrs() []grpc.Address { func (hb *healthBalancer) mayPin(addr grpc.Address) bool { hb.mu.RLock() skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0 - _, bad := hb.unhealthy[addr.Addr] + failedTime, bad := hb.unhealthy[addr.Addr] + dur := hb.healthCheckTimeout hb.mu.RUnlock() if skip || !bad { return true } + // prevent isolated member's endpoint from being infinitely retried, as follows: + // 1. keepalive pings detects GoAway with http2.ErrCodeEnhanceYourCalm + // 2. balancer 'Up' unpins with grpc: failed with network I/O error + // 3. grpc-healthcheck still SERVING, thus retry to pin + // instead, return before grpc-healthcheck if failed within healthcheck timeout + if elapsed := time.Since(failedTime); elapsed < dur { + if logger.V(4) { + logger.Infof("clientv3/health-balancer: %s is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur) + } + return false + } if ok, _ := hb.healthCheck(addr.Addr); ok { hb.mu.Lock() delete(hb.unhealthy, addr.Addr) hb.mu.Unlock() if logger.V(4) { - logger.Infof("clientv3/balancer: %s is healthy", addr.Addr) + logger.Infof("clientv3/health-balancer: %s is healthy (health check success)", addr.Addr) } return true } @@ -193,7 +210,7 @@ func (hb *healthBalancer) mayPin(addr grpc.Address) bool { hb.unhealthy[addr.Addr] = time.Now() hb.mu.Unlock() if logger.V(4) { - logger.Infof("clientv3/balancer: %s becomes unhealthy", addr.Addr) + logger.Infof("clientv3/health-balancer: %s becomes unhealthy (health check failed)", addr.Addr) } return false } From fbed568b6ab4af24e9389ed2006df87055a1e0cc Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Sat, 7 Oct 2017 06:22:56 -0700 Subject: [PATCH 2/5] clientv3/balancer: mark partitioned member as unhealthy Previous behavior is when server returns errors, retry wrapper does not do anything, while passively expecting balancer to gray-list the isolated endpoint. This is problematic when multiple endpoints are passed, and network partition happens. This patch adds 'endpointError' method to 'balancer' interface to actively(possibly even before health-check API gets called) handle RPC errors and gray-list endpoints for the time being, thus speeding up the endpoint switch. This is safe in a single-endpoint case, because balancer will retry no matter what in such case. Signed-off-by: Gyu-Ho Lee --- clientv3/balancer.go | 4 ++++ clientv3/health_balancer.go | 9 +++++++++ clientv3/retry.go | 2 ++ 3 files changed, 15 insertions(+) diff --git a/clientv3/balancer.go b/clientv3/balancer.go index 35d8b3a4a..cf7419b54 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -44,6 +44,8 @@ type balancer interface { endpoints() []string // pinned returns the current pinned endpoint. pinned() string + // endpointError handles error from server-side. + endpointError(addr string, err error) // up is Up but includes whether the balancer will use the connection. up(addr grpc.Address) (func(error), bool) @@ -150,6 +152,8 @@ func (b *simpleBalancer) pinned() string { return b.pinAddr } +func (b *simpleBalancer) endpointError(addr string, err error) { return } + func getHost2ep(eps []string) map[string]string { hm := make(map[string]string, len(eps)) for i := range eps { diff --git a/clientv3/health_balancer.go b/clientv3/health_balancer.go index f3be1b64e..bac9a69f6 100644 --- a/clientv3/health_balancer.go +++ b/clientv3/health_balancer.go @@ -177,6 +177,15 @@ func (hb *healthBalancer) liveAddrs() []grpc.Address { return addrs } +func (hb *healthBalancer) endpointError(addr string, err error) { + hb.mu.Lock() + hb.unhealthy[addr] = time.Now() + hb.mu.Unlock() + if logger.V(4) { + logger.Infof("clientv3/health-balancer: marking %s as unhealthy (%v)", addr, err) + } +} + func (hb *healthBalancer) mayPin(addr grpc.Address) bool { hb.mu.RLock() skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0 diff --git a/clientv3/retry.go b/clientv3/retry.go index d6681441e..d33fff92d 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -66,6 +66,8 @@ func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc { if logger.V(4) { logger.Infof("clientv3/retry: error %v on pinned endpoint %s", err, pinned) } + // mark this before endpoint switch is triggered + c.balancer.endpointError(pinned, err) notify := c.balancer.ConnectNotify() if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable { c.balancer.next() From 8224c748c9378089c073da30188768a81cd843d7 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Mon, 9 Oct 2017 11:23:30 -0700 Subject: [PATCH 3/5] clientv3/integration: add balancer network partition tests Signed-off-by: Gyu-Ho Lee --- .../integration/network_partition_test.go | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 clientv3/integration/network_partition_test.go diff --git a/clientv3/integration/network_partition_test.go b/clientv3/integration/network_partition_test.go new file mode 100644 index 000000000..f2a737cf5 --- /dev/null +++ b/clientv3/integration/network_partition_test.go @@ -0,0 +1,97 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !cluster_proxy + +package integration + +import ( + "context" + "testing" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/integration" + "github.com/coreos/etcd/pkg/testutil" +) + +// TestNetworkPartitionBalancerPut tests when one member becomes isolated, +// first Put request fails, and following retry succeeds with client balancer +// switching to others. +func TestNetworkPartitionBalancerPut(t *testing.T) { + testNetworkPartitionBalancer(t, func(cli *clientv3.Client, ctx context.Context) error { + _, err := cli.Put(ctx, "a", "b") + return err + }) +} + +// TestNetworkPartitionBalancerGet tests when one member becomes isolated, +// first Get request fails, and following retry succeeds with client balancer +// switching to others. +func TestNetworkPartitionBalancerGet(t *testing.T) { + testNetworkPartitionBalancer(t, func(cli *clientv3.Client, ctx context.Context) error { + _, err := cli.Get(ctx, "a") + return err + }) +} + +func testNetworkPartitionBalancer(t *testing.T, op func(*clientv3.Client, context.Context) error) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{ + Size: 3, + GRPCKeepAliveMinTime: time.Millisecond, // avoid too_many_pings + }) + defer clus.Terminate(t) + + // expect pin ep[0] + ccfg := clientv3.Config{ + Endpoints: []string{clus.Members[0].GRPCAddr()}, + DialTimeout: 3 * time.Second, + DialKeepAliveTime: 2 * time.Second, + DialKeepAliveTimeout: 2 * time.Second, + } + cli, err := clientv3.New(ccfg) + if err != nil { + t.Fatal(err) + } + defer cli.Close() + + // add other endpoints for later endpoint switch + cli.SetEndpoints(clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[1].GRPCAddr()) + + time.Sleep(3 * time.Second) + clus.Members[0].InjectPartition(t, clus.Members[1:]) + defer clus.Members[0].RecoverPartition(t, clus.Members[1:]) + + for i := 0; i < 2; i++ { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + err = op(cli, ctx) + cancel() + if err == nil { + break + } + if err != context.DeadlineExceeded { + t.Fatalf("#%d: expected %v, got %v", i, context.DeadlineExceeded, err) + } + // give enough time for endpoint switch + // TODO: remove random sleep by syncing directly with balancer + if i == 0 { + time.Sleep(5 * time.Second) + } + } + if err != nil { + t.Fatalf("balancer did not switch in time (%v)", err) + } +} From 826de3c07ad278a48844430b873835b94f20ff4e Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Mon, 9 Oct 2017 11:24:28 -0700 Subject: [PATCH 4/5] words: whitelist more words Signed-off-by: Gyu-Ho Lee --- .words | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.words b/.words index b0957409b..66f5340b4 100644 --- a/.words +++ b/.words @@ -1,5 +1,8 @@ +ErrCodeEnhanceYourCalm +GoAway RPC RPCs +backoff blackholed cancelable cancelation @@ -10,6 +13,7 @@ etcd gRPC goroutine goroutines +healthcheck iff inflight keepalive From e9e17e3fe51934f9bc405b12c5151df283856c91 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Mon, 9 Oct 2017 15:33:14 -0700 Subject: [PATCH 5/5] clientv3: pin any endpoint when all unhealthy Signed-off-by: Gyu-Ho Lee --- clientv3/health_balancer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clientv3/health_balancer.go b/clientv3/health_balancer.go index bac9a69f6..6f01c25f6 100644 --- a/clientv3/health_balancer.go +++ b/clientv3/health_balancer.go @@ -188,7 +188,7 @@ func (hb *healthBalancer) endpointError(addr string, err error) { func (hb *healthBalancer) mayPin(addr grpc.Address) bool { hb.mu.RLock() - skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0 + skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.addrs) == len(hb.unhealthy) failedTime, bad := hb.unhealthy[addr.Addr] dur := hb.healthCheckTimeout hb.mu.RUnlock()