mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcd-tester: stress with range, delete
This commit is contained in:
parent
7fbc1e39a6
commit
969bcd282b
@ -37,6 +37,88 @@ func init() {
|
||||
grpclog.SetLogger(plog)
|
||||
}
|
||||
|
||||
type stressFunc func(ctx context.Context) error
|
||||
|
||||
type stressEntry struct {
|
||||
weight float32
|
||||
f stressFunc
|
||||
}
|
||||
|
||||
type stressTable struct {
|
||||
entries []stressEntry
|
||||
sumWeights float32
|
||||
}
|
||||
|
||||
func createStressTable(entries []stressEntry) *stressTable {
|
||||
st := stressTable{entries: entries}
|
||||
for _, entry := range st.entries {
|
||||
st.sumWeights += entry.weight
|
||||
}
|
||||
return &st
|
||||
}
|
||||
|
||||
func (st *stressTable) choose() stressFunc {
|
||||
v := rand.Float32() * st.sumWeights
|
||||
var sum float32
|
||||
var idx int
|
||||
for i := range st.entries {
|
||||
sum += st.entries[i].weight
|
||||
if sum >= v {
|
||||
idx = i
|
||||
break
|
||||
}
|
||||
}
|
||||
return st.entries[idx].f
|
||||
}
|
||||
|
||||
func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
_, err := kvc.Put(ctx, &pb.PutRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%d", rand.Intn(keySuffixRange))),
|
||||
Value: randBytes(keySize),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
_, err := kvc.Range(ctx, &pb.RangeRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%d", rand.Intn(keySuffixRange))),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func newStressRangePrefix(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
_, err := kvc.Range(ctx, &pb.RangeRequest{
|
||||
Key: []byte("foo"),
|
||||
RangeEnd: []byte(fmt.Sprintf("foo%d", rand.Intn(keySuffixRange))),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%d", rand.Intn(keySuffixRange))),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func newStressDeletePrefix(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
|
||||
Key: []byte("foo"),
|
||||
RangeEnd: []byte(fmt.Sprintf("foo%d", rand.Intn(keySuffixRange))),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
type Stresser interface {
|
||||
// Stress starts to stress the etcd cluster
|
||||
Stress() error
|
||||
@ -64,6 +146,8 @@ type stresser struct {
|
||||
conn *grpc.ClientConn
|
||||
|
||||
success int
|
||||
|
||||
stressTable *stressTable
|
||||
}
|
||||
|
||||
func (s *stresser) Stress() error {
|
||||
@ -86,6 +170,15 @@ func (s *stresser) Stress() error {
|
||||
|
||||
kvc := pb.NewKVClient(conn)
|
||||
|
||||
var stressEntries = []stressEntry{
|
||||
{weight: 0.7, f: newStressPut(kvc, s.KeySuffixRange, s.KeySize)},
|
||||
{weight: 0.07, f: newStressRange(kvc, s.KeySuffixRange)},
|
||||
{weight: 0.07, f: newStressRangePrefix(kvc, s.KeySuffixRange)},
|
||||
{weight: 0.07, f: newStressDelete(kvc, s.KeySuffixRange)},
|
||||
{weight: 0.07, f: newStressDeletePrefix(kvc, s.KeySuffixRange)},
|
||||
}
|
||||
s.stressTable = createStressTable(stressEntries)
|
||||
|
||||
for i := 0; i < s.N; i++ {
|
||||
go s.run(ctx, kvc)
|
||||
}
|
||||
@ -105,13 +198,12 @@ func (s *stresser) run(ctx context.Context, kvc pb.KVClient) {
|
||||
// TODO: 10-second is enough timeout to cover leader failure
|
||||
// and immediate leader election. Find out what other cases this
|
||||
// could be timed out.
|
||||
putctx, putcancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
_, err := kvc.Put(putctx, &pb.PutRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%d", rand.Intn(s.KeySuffixRange))),
|
||||
Value: []byte(randStr(s.KeySize)),
|
||||
},
|
||||
grpc.FailFast(false))
|
||||
putcancel()
|
||||
sctx, scancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
|
||||
err := s.stressTable.choose()(sctx)
|
||||
|
||||
scancel()
|
||||
|
||||
if err != nil {
|
||||
shouldContinue := false
|
||||
switch grpc.ErrorDesc(err) {
|
||||
@ -217,7 +309,7 @@ func (s *stresserV2) Stress() error {
|
||||
for {
|
||||
setctx, setcancel := context.WithTimeout(ctx, clientV2.DefaultRequestTimeout)
|
||||
key := fmt.Sprintf("foo%d", rand.Intn(s.KeySuffixRange))
|
||||
_, err := kv.Set(setctx, key, randStr(s.KeySize), nil)
|
||||
_, err := kv.Set(setctx, key, string(randBytes(s.KeySize)), nil)
|
||||
setcancel()
|
||||
if err == context.Canceled {
|
||||
return
|
||||
@ -247,10 +339,10 @@ func (s *stresserV2) Report() (success int, failure int) {
|
||||
return s.success, s.failure
|
||||
}
|
||||
|
||||
func randStr(size int) string {
|
||||
func randBytes(size int) []byte {
|
||||
data := make([]byte, size)
|
||||
for i := 0; i < size; i++ {
|
||||
data[i] = byte(int('a') + rand.Intn(26))
|
||||
}
|
||||
return string(data)
|
||||
return data
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user