diff --git a/tools/functional-tester/etcd-tester/cluster.go b/tools/functional-tester/etcd-tester/cluster.go index d74ff23bb..9329a6e72 100644 --- a/tools/functional-tester/etcd-tester/cluster.go +++ b/tools/functional-tester/etcd-tester/cluster.go @@ -41,7 +41,6 @@ type cluster struct { stressQPS int stressKeySize int stressKeySuffixRange int - stressKeyRangeLimit int Size int Stressers []Stresser @@ -114,7 +113,6 @@ func (c *cluster) bootstrap(agentEndpoints []string) error { Endpoint: m.grpcAddr(), keySize: c.stressKeySize, keySuffixRange: c.stressKeySuffixRange, - keyRangeLimit: c.stressKeyRangeLimit, N: stressN, rateLimiter: limiter, } diff --git a/tools/functional-tester/etcd-tester/main.go b/tools/functional-tester/etcd-tester/main.go index ec2ae97f0..627a7bfca 100644 --- a/tools/functional-tester/etcd-tester/main.go +++ b/tools/functional-tester/etcd-tester/main.go @@ -31,7 +31,6 @@ func main() { datadir := flag.String("data-dir", "agent.etcd", "etcd data directory location on agent machine.") stressKeySize := flag.Uint("stress-key-size", 100, "the size of each key written into etcd.") stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.") - stressKeyRangeLimit := flag.Uint("stress-range-limit", 50, "maximum number of keys to range or delete.") limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).") stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.") schedCases := flag.String("schedule-cases", "", "test case schedule") @@ -45,7 +44,6 @@ func main() { stressQPS: *stressQPS, stressKeySize: int(*stressKeySize), stressKeySuffixRange: int(*stressKeySuffixRange), - stressKeyRangeLimit: int(*stressKeyRangeLimit), } if err := c.bootstrap(strings.Split(*endpointStr, ",")); err != nil { plog.Fatal(err) diff --git a/tools/functional-tester/etcd-tester/member.go b/tools/functional-tester/etcd-tester/member.go index a1ac72371..5d5a12568 100644 --- a/tools/functional-tester/etcd-tester/member.go +++ b/tools/functional-tester/etcd-tester/member.go @@ -123,7 +123,8 @@ func (m *member) SetHealthKeyV3() error { return fmt.Errorf("%v (%s)", err, m.ClientURL) } defer cli.Close() - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + // give enough time-out in case expensive requests (range/delete) are pending + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) _, err = cli.Put(ctx, "health", "good") cancel() if err != nil { diff --git a/tools/functional-tester/etcd-tester/stresser.go b/tools/functional-tester/etcd-tester/stresser.go index 2f165567c..a4c9cc6c6 100644 --- a/tools/functional-tester/etcd-tester/stresser.go +++ b/tools/functional-tester/etcd-tester/stresser.go @@ -74,7 +74,7 @@ func (st *stressTable) choose() stressFunc { func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc { return func(ctx context.Context) error { _, err := kvc.Put(ctx, &pb.PutRequest{ - Key: []byte(fmt.Sprintf("foo%d", rand.Intn(keySuffixRange))), + Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))), Value: randBytes(keySize), }, grpc.FailFast(false)) return err @@ -84,18 +84,19 @@ func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc { func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc { return func(ctx context.Context) error { _, err := kvc.Range(ctx, &pb.RangeRequest{ - Key: []byte(fmt.Sprintf("foo%d", rand.Intn(keySuffixRange))), + Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))), }, grpc.FailFast(false)) return err } } -func newStressRangeInterval(kvc pb.KVClient, keySuffixRange, keyRangeLimit int) stressFunc { +func newStressRangeInterval(kvc pb.KVClient, keySuffixRange int) stressFunc { return func(ctx context.Context) error { + start := rand.Intn(keySuffixRange) + end := start + 500 _, err := kvc.Range(ctx, &pb.RangeRequest{ - Key: []byte("foo"), - RangeEnd: []byte(fmt.Sprintf("foo%d", keySuffixRange)), - Limit: int64(keyRangeLimit), + Key: []byte(fmt.Sprintf("foo%016x", start)), + RangeEnd: []byte(fmt.Sprintf("foo%016x", end)), }, grpc.FailFast(false)) return err } @@ -104,29 +105,19 @@ func newStressRangeInterval(kvc pb.KVClient, keySuffixRange, keyRangeLimit int) func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc { return func(ctx context.Context) error { _, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{ - Key: []byte(fmt.Sprintf("foo%d", rand.Intn(keySuffixRange))), + Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))), }, grpc.FailFast(false)) return err } } -func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange, keyRangeLimit int) stressFunc { +func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange int) stressFunc { return func(ctx context.Context) error { - resp, _ := kvc.Range(ctx, &pb.RangeRequest{ - Key: []byte("foo"), - RangeEnd: []byte(fmt.Sprintf("foo%d", keySuffixRange)), - Limit: int64(keyRangeLimit), - }, grpc.FailFast(false)) - - start, end := []byte("foo"), []byte(fmt.Sprintf("foo%d", keyRangeLimit)) - if resp != nil && resp.Count > 0 { - start = resp.Kvs[0].Key - end = resp.Kvs[len(resp.Kvs)-1].Key - } - + start := rand.Intn(keySuffixRange) + end := start + 500 _, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{ - Key: start, - RangeEnd: end, + Key: []byte(fmt.Sprintf("foo%016x", start)), + RangeEnd: []byte(fmt.Sprintf("foo%016x", end)), }, grpc.FailFast(false)) return err } @@ -146,7 +137,6 @@ type stresser struct { keySize int keySuffixRange int - keyRangeLimit int N int @@ -190,9 +180,9 @@ func (s *stresser) Stress() error { var stressEntries = []stressEntry{ {weight: 0.7, f: newStressPut(kvc, s.keySuffixRange, s.keySize)}, {weight: 0.07, f: newStressRange(kvc, s.keySuffixRange)}, - {weight: 0.07, f: newStressRangeInterval(kvc, s.keySuffixRange, s.keyRangeLimit)}, + {weight: 0.07, f: newStressRangeInterval(kvc, s.keySuffixRange)}, {weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)}, - {weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange, s.keyRangeLimit)}, + {weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)}, } s.stressTable = createStressTable(stressEntries) @@ -255,15 +245,16 @@ func (s *stresser) run(ctx context.Context) { // capability check has not been done (in the beginning) continue - // default: - // errors from stresser.Cancel method: - // rpc error: code = 1 desc = context canceled (type grpc.rpcError) - // rpc error: code = 2 desc = grpc: the client connection is closing (type grpc.rpcError) case rpctypes.ErrTooManyRequests.Error(): // hitting the recovering member. continue case context.Canceled.Error(): + // from stresser.Cancel method: + return + + case grpc.ErrClientConnClosing.Error(): + // from stresser.Cancel method: return } @@ -335,7 +326,7 @@ func (s *stresserV2) Stress() error { go func() { for { setctx, setcancel := context.WithTimeout(ctx, clientV2.DefaultRequestTimeout) - key := fmt.Sprintf("foo%d", rand.Intn(s.keySuffixRange)) + key := fmt.Sprintf("foo%016x", rand.Intn(s.keySuffixRange)) _, err := kv.Set(setctx, key, string(randBytes(s.keySize)), nil) setcancel() if err == context.Canceled { diff --git a/tools/functional-tester/etcd-tester/tester.go b/tools/functional-tester/etcd-tester/tester.go index e1613da0c..c00167376 100644 --- a/tools/functional-tester/etcd-tester/tester.go +++ b/tools/functional-tester/etcd-tester/tester.go @@ -52,6 +52,8 @@ func (tt *tester) runLoop() { } continue } + // -1 so that logPrefix doesn't print out 'case' + tt.status.setCase(-1) revToCompact := max(0, tt.currentRevision-10000) compactN := revToCompact - prevCompactRev @@ -80,9 +82,6 @@ func (tt *tester) runLoop() { } func (tt *tester) doRound(round int) (bool, error) { - // -1 so that logPrefix doesn't print out 'case' - defer tt.status.setCase(-1) - for j, f := range tt.failures { caseTotalCounter.WithLabelValues(f.Desc()).Inc() tt.status.setCase(j)