diff --git a/tools/functional-tester/etcd-tester/cluster.go b/tools/functional-tester/etcd-tester/cluster.go index 025e47ea9..a7df4a12c 100644 --- a/tools/functional-tester/etcd-tester/cluster.go +++ b/tools/functional-tester/etcd-tester/cluster.go @@ -36,6 +36,7 @@ type cluster struct { v2Only bool // to be deprecated datadir string + stressQPS int stressKeySize int stressKeySuffixRange int @@ -50,10 +51,11 @@ type ClusterStatus struct { } // newCluster starts and returns a new cluster. The caller should call Terminate when finished, to shut it down. -func newCluster(agentEndpoints []string, datadir string, stressKeySize, stressKeySuffixRange int, isV2Only bool) (*cluster, error) { +func newCluster(agentEndpoints []string, datadir string, stressQPS, stressKeySize, stressKeySuffixRange int, isV2Only bool) (*cluster, error) { c := &cluster{ v2Only: isV2Only, datadir: datadir, + stressQPS: stressQPS, stressKeySize: stressKeySize, stressKeySuffixRange: stressKeySuffixRange, } @@ -123,6 +125,7 @@ func (c *cluster) bootstrap(agentEndpoints []string) error { Endpoint: m.grpcAddr(), KeySize: c.stressKeySize, KeySuffixRange: c.stressKeySuffixRange, + qps: c.stressQPS, N: stressN, } } diff --git a/tools/functional-tester/etcd-tester/main.go b/tools/functional-tester/etcd-tester/main.go index 07adbc1b7..2cc9524af 100644 --- a/tools/functional-tester/etcd-tester/main.go +++ b/tools/functional-tester/etcd-tester/main.go @@ -32,13 +32,14 @@ func main() { stressKeySize := flag.Int("stress-key-size", 100, "the size of each key written into etcd.") stressKeySuffixRange := flag.Int("stress-key-count", 250000, "the count of key range written into etcd.") limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).") + stressQPS := flag.Int("stress-qps", 5000, "maximum number of stresser requests per second.") schedCases := flag.String("schedule-cases", "", "test case schedule") consistencyCheck := flag.Bool("consistency-check", true, "true to check consistency (revision, hash)") isV2Only := flag.Bool("v2-only", false, "'true' to run V2 only tester.") flag.Parse() endpoints := strings.Split(*endpointStr, ",") - c, err := newCluster(endpoints, *datadir, *stressKeySize, *stressKeySuffixRange, *isV2Only) + c, err := newCluster(endpoints, *datadir, *stressQPS, *stressKeySize, *stressKeySuffixRange, *isV2Only) if err != nil { plog.Fatal(err) } diff --git a/tools/functional-tester/etcd-tester/stresser.go b/tools/functional-tester/etcd-tester/stresser.go index eebe5384d..f8d7818ee 100644 --- a/tools/functional-tester/etcd-tester/stresser.go +++ b/tools/functional-tester/etcd-tester/stresser.go @@ -27,6 +27,7 @@ import ( "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" + "golang.org/x/time/rate" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/transport" @@ -51,10 +52,14 @@ type stresser struct { KeySize int KeySuffixRange int - N int + qps int + N int + + mu sync.Mutex + wg *sync.WaitGroup + + rateLimiter *rate.Limiter - mu sync.Mutex - wg *sync.WaitGroup cancel func() conn *grpc.ClientConn @@ -77,75 +82,83 @@ func (s *stresser) Stress() error { s.conn = conn s.cancel = cancel s.wg = wg + s.rateLimiter = rate.NewLimiter(rate.Every(time.Second), s.qps) s.mu.Unlock() kvc := pb.NewKVClient(conn) for i := 0; i < s.N; i++ { - go func(i int) { - defer wg.Done() - for { - // TODO: 10-second is enough timeout to cover leader failure - // and immediate leader election. Find out what other cases this - // could be timed out. - putctx, putcancel := context.WithTimeout(ctx, 10*time.Second) - _, err := kvc.Put(putctx, &pb.PutRequest{ - Key: []byte(fmt.Sprintf("foo%d", rand.Intn(s.KeySuffixRange))), - Value: []byte(randStr(s.KeySize)), - }) - putcancel() - if err != nil { - shouldContinue := false - switch grpc.ErrorDesc(err) { - case context.DeadlineExceeded.Error(): - // This retries when request is triggered at the same time as - // leader failure. When we terminate the leader, the request to - // that leader cannot be processed, and times out. Also requests - // to followers cannot be forwarded to the old leader, so timing out - // as well. We want to keep stressing until the cluster elects a - // new leader and start processing requests again. - shouldContinue = true - - case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error(): - // This retries when request is triggered at the same time as - // leader failure and follower nodes receive time out errors - // from losing their leader. Followers should retry to connect - // to the new leader. - shouldContinue = true - - case etcdserver.ErrStopped.Error(): - // one of the etcd nodes stopped from failure injection - shouldContinue = true - - case transport.ErrConnClosing.Desc: - // server closed the transport (failure injected node) - shouldContinue = true - - case rpctypes.ErrNotCapable.Error(): - // capability check has not been done (in the beginning) - shouldContinue = true - - // default: - // errors from stresser.Cancel method: - // rpc error: code = 1 desc = context canceled (type grpc.rpcError) - // rpc error: code = 2 desc = grpc: the client connection is closing (type grpc.rpcError) - } - if shouldContinue { - continue - } - return - } - s.mu.Lock() - s.success++ - s.mu.Unlock() - } - }(i) + go s.run(ctx, kvc) } <-ctx.Done() return nil } +func (s *stresser) run(ctx context.Context, kvc pb.KVClient) { + defer s.wg.Done() + + for { + if err := s.rateLimiter.Wait(ctx); err == context.Canceled { + return + } + + // TODO: 10-second is enough timeout to cover leader failure + // and immediate leader election. Find out what other cases this + // could be timed out. + putctx, putcancel := context.WithTimeout(ctx, 10*time.Second) + _, err := kvc.Put(putctx, &pb.PutRequest{ + Key: []byte(fmt.Sprintf("foo%d", rand.Intn(s.KeySuffixRange))), + Value: []byte(randStr(s.KeySize)), + }) + putcancel() + if err != nil { + shouldContinue := false + switch grpc.ErrorDesc(err) { + case context.DeadlineExceeded.Error(): + // This retries when request is triggered at the same time as + // leader failure. When we terminate the leader, the request to + // that leader cannot be processed, and times out. Also requests + // to followers cannot be forwarded to the old leader, so timing out + // as well. We want to keep stressing until the cluster elects a + // new leader and start processing requests again. + shouldContinue = true + + case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error(): + // This retries when request is triggered at the same time as + // leader failure and follower nodes receive time out errors + // from losing their leader. Followers should retry to connect + // to the new leader. + shouldContinue = true + + case etcdserver.ErrStopped.Error(): + // one of the etcd nodes stopped from failure injection + shouldContinue = true + + case transport.ErrConnClosing.Desc: + // server closed the transport (failure injected node) + shouldContinue = true + + case rpctypes.ErrNotCapable.Error(): + // capability check has not been done (in the beginning) + shouldContinue = true + + // default: + // errors from stresser.Cancel method: + // rpc error: code = 1 desc = context canceled (type grpc.rpcError) + // rpc error: code = 2 desc = grpc: the client connection is closing (type grpc.rpcError) + } + if shouldContinue { + continue + } + return + } + s.mu.Lock() + s.success++ + s.mu.Unlock() + } +} + func (s *stresser) Cancel() { s.mu.Lock() cancel, conn, wg := s.cancel, s.conn, s.wg