From 969bcd282ba6e7c3c78235b15362e9fd8d664314 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Mon, 18 Jul 2016 15:03:09 -0700 Subject: [PATCH] etcd-tester: stress with range, delete --- .../functional-tester/etcd-tester/stresser.go | 112 ++++++++++++++++-- 1 file changed, 102 insertions(+), 10 deletions(-) diff --git a/tools/functional-tester/etcd-tester/stresser.go b/tools/functional-tester/etcd-tester/stresser.go index f78c392d4..252b419a6 100644 --- a/tools/functional-tester/etcd-tester/stresser.go +++ b/tools/functional-tester/etcd-tester/stresser.go @@ -37,6 +37,88 @@ func init() { grpclog.SetLogger(plog) } +type stressFunc func(ctx context.Context) error + +type stressEntry struct { + weight float32 + f stressFunc +} + +type stressTable struct { + entries []stressEntry + sumWeights float32 +} + +func createStressTable(entries []stressEntry) *stressTable { + st := stressTable{entries: entries} + for _, entry := range st.entries { + st.sumWeights += entry.weight + } + return &st +} + +func (st *stressTable) choose() stressFunc { + v := rand.Float32() * st.sumWeights + var sum float32 + var idx int + for i := range st.entries { + sum += st.entries[i].weight + if sum >= v { + idx = i + break + } + } + return st.entries[idx].f +} + +func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc { + return func(ctx context.Context) error { + _, err := kvc.Put(ctx, &pb.PutRequest{ + Key: []byte(fmt.Sprintf("foo%d", rand.Intn(keySuffixRange))), + Value: randBytes(keySize), + }, grpc.FailFast(false)) + return err + } +} + +func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc { + return func(ctx context.Context) error { + _, err := kvc.Range(ctx, &pb.RangeRequest{ + Key: []byte(fmt.Sprintf("foo%d", rand.Intn(keySuffixRange))), + }, grpc.FailFast(false)) + return err + } +} + +func newStressRangePrefix(kvc pb.KVClient, keySuffixRange int) stressFunc { + return func(ctx context.Context) error { + _, err := kvc.Range(ctx, &pb.RangeRequest{ + Key: []byte("foo"), + RangeEnd: []byte(fmt.Sprintf("foo%d", rand.Intn(keySuffixRange))), + }, grpc.FailFast(false)) + return err + } +} + +func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc { + return func(ctx context.Context) error { + _, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{ + Key: []byte(fmt.Sprintf("foo%d", rand.Intn(keySuffixRange))), + }, grpc.FailFast(false)) + return err + } +} + +func newStressDeletePrefix(kvc pb.KVClient, keySuffixRange int) stressFunc { + return func(ctx context.Context) error { + _, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{ + Key: []byte("foo"), + RangeEnd: []byte(fmt.Sprintf("foo%d", rand.Intn(keySuffixRange))), + }, grpc.FailFast(false)) + return err + } +} + type Stresser interface { // Stress starts to stress the etcd cluster Stress() error @@ -64,6 +146,8 @@ type stresser struct { conn *grpc.ClientConn success int + + stressTable *stressTable } func (s *stresser) Stress() error { @@ -86,6 +170,15 @@ func (s *stresser) Stress() error { kvc := pb.NewKVClient(conn) + var stressEntries = []stressEntry{ + {weight: 0.7, f: newStressPut(kvc, s.KeySuffixRange, s.KeySize)}, + {weight: 0.07, f: newStressRange(kvc, s.KeySuffixRange)}, + {weight: 0.07, f: newStressRangePrefix(kvc, s.KeySuffixRange)}, + {weight: 0.07, f: newStressDelete(kvc, s.KeySuffixRange)}, + {weight: 0.07, f: newStressDeletePrefix(kvc, s.KeySuffixRange)}, + } + s.stressTable = createStressTable(stressEntries) + for i := 0; i < s.N; i++ { go s.run(ctx, kvc) } @@ -105,13 +198,12 @@ func (s *stresser) run(ctx context.Context, kvc pb.KVClient) { // TODO: 10-second is enough timeout to cover leader failure // and immediate leader election. Find out what other cases this // could be timed out. - putctx, putcancel := context.WithTimeout(ctx, 10*time.Second) - _, err := kvc.Put(putctx, &pb.PutRequest{ - Key: []byte(fmt.Sprintf("foo%d", rand.Intn(s.KeySuffixRange))), - Value: []byte(randStr(s.KeySize)), - }, - grpc.FailFast(false)) - putcancel() + sctx, scancel := context.WithTimeout(ctx, 10*time.Second) + + err := s.stressTable.choose()(sctx) + + scancel() + if err != nil { shouldContinue := false switch grpc.ErrorDesc(err) { @@ -217,7 +309,7 @@ func (s *stresserV2) Stress() error { for { setctx, setcancel := context.WithTimeout(ctx, clientV2.DefaultRequestTimeout) key := fmt.Sprintf("foo%d", rand.Intn(s.KeySuffixRange)) - _, err := kv.Set(setctx, key, randStr(s.KeySize), nil) + _, err := kv.Set(setctx, key, string(randBytes(s.KeySize)), nil) setcancel() if err == context.Canceled { return @@ -247,10 +339,10 @@ func (s *stresserV2) Report() (success int, failure int) { return s.success, s.failure } -func randStr(size int) string { +func randBytes(size int) []byte { data := make([]byte, size) for i := 0; i < size; i++ { data[i] = byte(int('a') + rand.Intn(26)) } - return string(data) + return data }