From 520bd5084e3120e0eb45963446e798b1f3211d2f Mon Sep 17 00:00:00 2001 From: Matt Brannock Date: Wed, 18 Jul 2018 13:09:27 -0700 Subject: [PATCH] integration, functional: Eliminate direct use of gRPC transport pkg gRPC has moved the transport package to an internal-only directory. This eliminates direct use of the transport package in the stress test in favor of the error code from gRPC that represents a connection problem. https://godoc.org/google.golang.org/grpc/internal/transport is the new location for the package, which says it's not intended to be imported directly. Instead, the maintainers suggested to use the code Unavailable to detect a connection problem. This change slightly reorganizes the stresser test error handling. --- functional/tester/stresser_key.go | 97 +++++++++++++++++----------- integration/v3_grpc_inflight_test.go | 8 ++- 2 files changed, 63 insertions(+), 42 deletions(-) diff --git a/functional/tester/stresser_key.go b/functional/tester/stresser_key.go index 54efddb28..b3e46cc0e 100644 --- a/functional/tester/stresser_key.go +++ b/functional/tester/stresser_key.go @@ -32,7 +32,8 @@ import ( "go.uber.org/zap" "golang.org/x/time/rate" "google.golang.org/grpc" - "google.golang.org/grpc/transport" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type keyStresser struct { @@ -128,44 +129,7 @@ func (s *keyStresser) run() { continue } - switch rpctypes.ErrorDesc(err) { - case context.DeadlineExceeded.Error(): - // This retries when request is triggered at the same time as - // leader failure. When we terminate the leader, the request to - // that leader cannot be processed, and times out. Also requests - // to followers cannot be forwarded to the old leader, so timing out - // as well. We want to keep stressing until the cluster elects a - // new leader and start processing requests again. - case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error(): - // This retries when request is triggered at the same time as - // leader failure and follower nodes receive time out errors - // from losing their leader. Followers should retry to connect - // to the new leader. - case etcdserver.ErrStopped.Error(): - // one of the etcd nodes stopped from failure injection - case transport.ErrConnClosing.Desc: - // server closed the transport (failure injected node) - case rpctypes.ErrNotCapable.Error(): - // capability check has not been done (in the beginning) - case rpctypes.ErrTooManyRequests.Error(): - // hitting the recovering member. - case raft.ErrProposalDropped.Error(): - // removed member, or leadership has changed (old leader got raftpb.MsgProp) - case context.Canceled.Error(): - // from stresser.Cancel method: - return - case grpc.ErrClientConnClosing.Error(): - // from stresser.Cancel method: - return - default: - s.lg.Warn( - "stress run exiting", - zap.String("stress-type", "KV"), - zap.String("endpoint", s.m.EtcdClientEndpoint), - zap.String("error-type", reflect.TypeOf(err).String()), - zap.String("error-desc", rpctypes.ErrorDesc(err)), - zap.Error(err), - ) + if !s.isRetryableError(err) { return } @@ -178,6 +142,61 @@ func (s *keyStresser) run() { } } +func (s *keyStresser) isRetryableError(err error) bool { + switch rpctypes.ErrorDesc(err) { + // retryable + case context.DeadlineExceeded.Error(): + // This retries when request is triggered at the same time as + // leader failure. When we terminate the leader, the request to + // that leader cannot be processed, and times out. Also requests + // to followers cannot be forwarded to the old leader, so timing out + // as well. We want to keep stressing until the cluster elects a + // new leader and start processing requests again. + return true + case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error(): + // This retries when request is triggered at the same time as + // leader failure and follower nodes receive time out errors + // from losing their leader. Followers should retry to connect + // to the new leader. + return true + case etcdserver.ErrStopped.Error(): + // one of the etcd nodes stopped from failure injection + return true + case rpctypes.ErrNotCapable.Error(): + // capability check has not been done (in the beginning) + return true + case rpctypes.ErrTooManyRequests.Error(): + // hitting the recovering member. + return true + case raft.ErrProposalDropped.Error(): + // removed member, or leadership has changed (old leader got raftpb.MsgProp) + return true + + // not retryable. + case context.Canceled.Error(): + // from stresser.Cancel method: + return false + case grpc.ErrClientConnClosing.Error(): + // from stresser.Cancel method: + return false + } + + if status.Convert(err).Code() == codes.Unavailable { + // gRPC connection errors are translated to status.Unavailable + return true + } + + s.lg.Warn( + "stress run exiting", + zap.String("stress-type", "KV"), + zap.String("endpoint", s.m.EtcdClientEndpoint), + zap.String("error-type", reflect.TypeOf(err).String()), + zap.String("error-desc", rpctypes.ErrorDesc(err)), + zap.Error(err), + ) + return false +} + func (s *keyStresser) Pause() map[string]int { return s.Close() } diff --git a/integration/v3_grpc_inflight_test.go b/integration/v3_grpc_inflight_test.go index 08c1a1f23..7b4d12a12 100644 --- a/integration/v3_grpc_inflight_test.go +++ b/integration/v3_grpc_inflight_test.go @@ -25,7 +25,8 @@ import ( "github.com/coreos/etcd/pkg/testutil" "google.golang.org/grpc" - "google.golang.org/grpc/transport" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // TestV3MaintenanceDefragmentInflightRange ensures inflight range requests @@ -82,9 +83,10 @@ func TestV3KVInflightRangeRequests(t *testing.T) { defer wg.Done() _, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo"), Serializable: true}, grpc.FailFast(false)) if err != nil { + errCode := status.Convert(err).Code() errDesc := rpctypes.ErrorDesc(err) - if err != nil && !(errDesc == context.Canceled.Error() || errDesc == transport.ErrConnClosing.Desc) { - t.Fatalf("inflight request should be canceled with '%v' or '%v', got '%v'", context.Canceled.Error(), transport.ErrConnClosing.Desc, errDesc) + if err != nil && !(errDesc == context.Canceled.Error() || errCode == codes.Unavailable) { + t.Fatalf("inflight request should be canceled with '%v' or code Unavailable, got '%v' with code '%s'", context.Canceled.Error(), errDesc, errCode) } } }()