diff --git a/tools/functional-tester/etcd-tester/stresser.go b/tools/functional-tester/etcd-tester/stresser.go index d578b5958..631de628c 100644 --- a/tools/functional-tester/etcd-tester/stresser.go +++ b/tools/functional-tester/etcd-tester/stresser.go @@ -160,6 +160,7 @@ type stresser struct { conn *grpc.ClientConn success int + failure int stressTable *stressTable } @@ -219,7 +220,10 @@ func (s *stresser) run(ctx context.Context) { scancel() if err != nil { - shouldContinue := false + s.mu.Lock() + s.failure++ + s.mu.Unlock() + switch grpc.ErrorDesc(err) { case context.DeadlineExceeded.Error(): // This retries when request is triggered at the same time as @@ -228,37 +232,45 @@ func (s *stresser) run(ctx context.Context) { // to followers cannot be forwarded to the old leader, so timing out // as well. We want to keep stressing until the cluster elects a // new leader and start processing requests again. - shouldContinue = true + continue case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error(): // This retries when request is triggered at the same time as // leader failure and follower nodes receive time out errors // from losing their leader. Followers should retry to connect // to the new leader. - shouldContinue = true + continue case etcdserver.ErrStopped.Error(): // one of the etcd nodes stopped from failure injection - shouldContinue = true + continue case transport.ErrConnClosing.Desc: // server closed the transport (failure injected node) - shouldContinue = true + continue case rpctypes.ErrNotCapable.Error(): // capability check has not been done (in the beginning) - shouldContinue = true + continue // default: // errors from stresser.Cancel method: // rpc error: code = 1 desc = context canceled (type grpc.rpcError) // rpc error: code = 2 desc = grpc: the client connection is closing (type grpc.rpcError) - } - if shouldContinue { + case rpctypes.ErrTooManyRequests.Error(): + // hitting the recovering member. continue + + case context.Canceled.Error(): + return } + + su, fa := s.Report() + plog.Warningf("stresser %v (success %d, failure %d) exited with error (%v)", s.Endpoint, su, fa, err) + return } + s.mu.Lock() s.success++ s.mu.Unlock() @@ -279,8 +291,7 @@ func (s *stresser) Cancel() { func (s *stresser) Report() (int, int) { s.mu.Lock() defer s.mu.Unlock() - // TODO: find a better way to report v3 tests - return s.success, -1 + return s.success, s.failure } type stresserV2 struct {