mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #6126 from gyuho/tester
etcd-tester: fix tester for 5-node cluster
This commit is contained in:
commit
bb28c9ab00
@ -41,7 +41,6 @@ type cluster struct {
|
||||
stressQPS int
|
||||
stressKeySize int
|
||||
stressKeySuffixRange int
|
||||
stressKeyRangeLimit int
|
||||
|
||||
Size int
|
||||
Stressers []Stresser
|
||||
@ -114,7 +113,6 @@ func (c *cluster) bootstrap(agentEndpoints []string) error {
|
||||
Endpoint: m.grpcAddr(),
|
||||
keySize: c.stressKeySize,
|
||||
keySuffixRange: c.stressKeySuffixRange,
|
||||
keyRangeLimit: c.stressKeyRangeLimit,
|
||||
N: stressN,
|
||||
rateLimiter: limiter,
|
||||
}
|
||||
|
@ -31,7 +31,6 @@ func main() {
|
||||
datadir := flag.String("data-dir", "agent.etcd", "etcd data directory location on agent machine.")
|
||||
stressKeySize := flag.Uint("stress-key-size", 100, "the size of each key written into etcd.")
|
||||
stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.")
|
||||
stressKeyRangeLimit := flag.Uint("stress-range-limit", 50, "maximum number of keys to range or delete.")
|
||||
limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
|
||||
stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.")
|
||||
schedCases := flag.String("schedule-cases", "", "test case schedule")
|
||||
@ -45,7 +44,6 @@ func main() {
|
||||
stressQPS: *stressQPS,
|
||||
stressKeySize: int(*stressKeySize),
|
||||
stressKeySuffixRange: int(*stressKeySuffixRange),
|
||||
stressKeyRangeLimit: int(*stressKeyRangeLimit),
|
||||
}
|
||||
if err := c.bootstrap(strings.Split(*endpointStr, ",")); err != nil {
|
||||
plog.Fatal(err)
|
||||
|
@ -123,7 +123,8 @@ func (m *member) SetHealthKeyV3() error {
|
||||
return fmt.Errorf("%v (%s)", err, m.ClientURL)
|
||||
}
|
||||
defer cli.Close()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
// give enough time-out in case expensive requests (range/delete) are pending
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
_, err = cli.Put(ctx, "health", "good")
|
||||
cancel()
|
||||
if err != nil {
|
||||
|
@ -74,7 +74,7 @@ func (st *stressTable) choose() stressFunc {
|
||||
func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
_, err := kvc.Put(ctx, &pb.PutRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%d", rand.Intn(keySuffixRange))),
|
||||
Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
|
||||
Value: randBytes(keySize),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
@ -84,18 +84,19 @@ func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
|
||||
func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
_, err := kvc.Range(ctx, &pb.RangeRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%d", rand.Intn(keySuffixRange))),
|
||||
Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func newStressRangeInterval(kvc pb.KVClient, keySuffixRange, keyRangeLimit int) stressFunc {
|
||||
func newStressRangeInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
start := rand.Intn(keySuffixRange)
|
||||
end := start + 500
|
||||
_, err := kvc.Range(ctx, &pb.RangeRequest{
|
||||
Key: []byte("foo"),
|
||||
RangeEnd: []byte(fmt.Sprintf("foo%d", keySuffixRange)),
|
||||
Limit: int64(keyRangeLimit),
|
||||
Key: []byte(fmt.Sprintf("foo%016x", start)),
|
||||
RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
}
|
||||
@ -104,29 +105,19 @@ func newStressRangeInterval(kvc pb.KVClient, keySuffixRange, keyRangeLimit int)
|
||||
func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%d", rand.Intn(keySuffixRange))),
|
||||
Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange, keyRangeLimit int) stressFunc {
|
||||
func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
resp, _ := kvc.Range(ctx, &pb.RangeRequest{
|
||||
Key: []byte("foo"),
|
||||
RangeEnd: []byte(fmt.Sprintf("foo%d", keySuffixRange)),
|
||||
Limit: int64(keyRangeLimit),
|
||||
}, grpc.FailFast(false))
|
||||
|
||||
start, end := []byte("foo"), []byte(fmt.Sprintf("foo%d", keyRangeLimit))
|
||||
if resp != nil && resp.Count > 0 {
|
||||
start = resp.Kvs[0].Key
|
||||
end = resp.Kvs[len(resp.Kvs)-1].Key
|
||||
}
|
||||
|
||||
start := rand.Intn(keySuffixRange)
|
||||
end := start + 500
|
||||
_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
|
||||
Key: start,
|
||||
RangeEnd: end,
|
||||
Key: []byte(fmt.Sprintf("foo%016x", start)),
|
||||
RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
}
|
||||
@ -146,7 +137,6 @@ type stresser struct {
|
||||
|
||||
keySize int
|
||||
keySuffixRange int
|
||||
keyRangeLimit int
|
||||
|
||||
N int
|
||||
|
||||
@ -190,9 +180,9 @@ func (s *stresser) Stress() error {
|
||||
var stressEntries = []stressEntry{
|
||||
{weight: 0.7, f: newStressPut(kvc, s.keySuffixRange, s.keySize)},
|
||||
{weight: 0.07, f: newStressRange(kvc, s.keySuffixRange)},
|
||||
{weight: 0.07, f: newStressRangeInterval(kvc, s.keySuffixRange, s.keyRangeLimit)},
|
||||
{weight: 0.07, f: newStressRangeInterval(kvc, s.keySuffixRange)},
|
||||
{weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)},
|
||||
{weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange, s.keyRangeLimit)},
|
||||
{weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)},
|
||||
}
|
||||
s.stressTable = createStressTable(stressEntries)
|
||||
|
||||
@ -255,15 +245,16 @@ func (s *stresser) run(ctx context.Context) {
|
||||
// capability check has not been done (in the beginning)
|
||||
continue
|
||||
|
||||
// default:
|
||||
// errors from stresser.Cancel method:
|
||||
// rpc error: code = 1 desc = context canceled (type grpc.rpcError)
|
||||
// rpc error: code = 2 desc = grpc: the client connection is closing (type grpc.rpcError)
|
||||
case rpctypes.ErrTooManyRequests.Error():
|
||||
// hitting the recovering member.
|
||||
continue
|
||||
|
||||
case context.Canceled.Error():
|
||||
// from stresser.Cancel method:
|
||||
return
|
||||
|
||||
case grpc.ErrClientConnClosing.Error():
|
||||
// from stresser.Cancel method:
|
||||
return
|
||||
}
|
||||
|
||||
@ -335,7 +326,7 @@ func (s *stresserV2) Stress() error {
|
||||
go func() {
|
||||
for {
|
||||
setctx, setcancel := context.WithTimeout(ctx, clientV2.DefaultRequestTimeout)
|
||||
key := fmt.Sprintf("foo%d", rand.Intn(s.keySuffixRange))
|
||||
key := fmt.Sprintf("foo%016x", rand.Intn(s.keySuffixRange))
|
||||
_, err := kv.Set(setctx, key, string(randBytes(s.keySize)), nil)
|
||||
setcancel()
|
||||
if err == context.Canceled {
|
||||
|
@ -52,6 +52,8 @@ func (tt *tester) runLoop() {
|
||||
}
|
||||
continue
|
||||
}
|
||||
// -1 so that logPrefix doesn't print out 'case'
|
||||
tt.status.setCase(-1)
|
||||
|
||||
revToCompact := max(0, tt.currentRevision-10000)
|
||||
compactN := revToCompact - prevCompactRev
|
||||
@ -80,9 +82,6 @@ func (tt *tester) runLoop() {
|
||||
}
|
||||
|
||||
func (tt *tester) doRound(round int) (bool, error) {
|
||||
// -1 so that logPrefix doesn't print out 'case'
|
||||
defer tt.status.setCase(-1)
|
||||
|
||||
for j, f := range tt.failures {
|
||||
caseTotalCounter.WithLabelValues(f.Desc()).Inc()
|
||||
tt.status.setCase(j)
|
||||
|
Loading…
x
Reference in New Issue
Block a user