From b1832d2f3ca7bc52d36de0acc437355c0e672ce3 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Tue, 8 May 2018 16:18:45 -0700 Subject: [PATCH] functional/tester: configurable stresser weights Signed-off-by: Gyuho Lee --- functional/tester/cluster_read_config.go | 69 ++++++++--------- functional/tester/cluster_test.go | 17 ++++- functional/tester/stresser.go | 94 +++++++++++++++--------- functional/tester/stresser_key.go | 55 +++++++------- functional/tester/stresser_lease.go | 2 +- functional/tester/stresser_runner.go | 4 +- 6 files changed, 136 insertions(+), 105 deletions(-) diff --git a/functional/tester/cluster_read_config.go b/functional/tester/cluster_read_config.go index eeaa5bc8a..186278b22 100644 --- a/functional/tester/cluster_read_config.go +++ b/functional/tester/cluster_read_config.go @@ -52,6 +52,41 @@ func read(lg *zap.Logger, fpath string) (*Cluster, error) { } } + if len(clus.Tester.Cases) == 0 { + return nil, errors.New("Cases not found") + } + if clus.Tester.DelayLatencyMs <= clus.Tester.DelayLatencyMsRv*5 { + return nil, fmt.Errorf("delay latency %d ms must be greater than 5x of delay latency random variable %d ms", clus.Tester.DelayLatencyMs, clus.Tester.DelayLatencyMsRv) + } + if clus.Tester.UpdatedDelayLatencyMs == 0 { + clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs + } + + for _, v := range clus.Tester.Cases { + if _, ok := rpcpb.Case_value[v]; !ok { + return nil, fmt.Errorf("%q is not defined in 'rpcpb.Case_value'", v) + } + } + + for _, s := range clus.Tester.Stressers { + if _, ok := rpcpb.StresserType_value[s.Type]; !ok { + return nil, fmt.Errorf("unknown 'StresserType' %+v", s) + } + } + + for _, v := range clus.Tester.Checkers { + if _, ok := rpcpb.Checker_value[v]; !ok { + return nil, fmt.Errorf("Checker is unknown; got %q", v) + } + } + + if clus.Tester.StressKeySuffixRangeTxn > 100 { + return nil, fmt.Errorf("StressKeySuffixRangeTxn maximum value is 100, got %v", clus.Tester.StressKeySuffixRangeTxn) + } + if clus.Tester.StressKeyTxnOps > 64 { + return nil, fmt.Errorf("StressKeyTxnOps maximum value is 64, got %v", clus.Tester.StressKeyTxnOps) + } + for i, mem := range clus.Members { if mem.EtcdExec == "embed" && failpointsEnabled { return nil, errors.New("EtcdExec 'embed' cannot be run with failpoints enabled") @@ -337,39 +372,5 @@ func read(lg *zap.Logger, fpath string) (*Cluster, error) { } } - if len(clus.Tester.Cases) == 0 { - return nil, errors.New("Cases not found") - } - if clus.Tester.DelayLatencyMs <= clus.Tester.DelayLatencyMsRv*5 { - return nil, fmt.Errorf("delay latency %d ms must be greater than 5x of delay latency random variable %d ms", clus.Tester.DelayLatencyMs, clus.Tester.DelayLatencyMsRv) - } - if clus.Tester.UpdatedDelayLatencyMs == 0 { - clus.Tester.UpdatedDelayLatencyMs = clus.Tester.DelayLatencyMs - } - - for _, v := range clus.Tester.Cases { - if _, ok := rpcpb.Case_value[v]; !ok { - return nil, fmt.Errorf("%q is not defined in 'rpcpb.Case_value'", v) - } - } - - for _, v := range clus.Tester.Stressers { - if _, ok := rpcpb.Stresser_value[v]; !ok { - return nil, fmt.Errorf("Stresser is unknown; got %q", v) - } - } - for _, v := range clus.Tester.Checkers { - if _, ok := rpcpb.Checker_value[v]; !ok { - return nil, fmt.Errorf("Checker is unknown; got %q", v) - } - } - - if clus.Tester.StressKeySuffixRangeTxn > 100 { - return nil, fmt.Errorf("StressKeySuffixRangeTxn maximum value is 100, got %v", clus.Tester.StressKeySuffixRangeTxn) - } - if clus.Tester.StressKeyTxnOps > 64 { - return nil, fmt.Errorf("StressKeyTxnOps maximum value is 64, got %v", clus.Tester.StressKeyTxnOps) - } - return clus, err } diff --git a/functional/tester/cluster_test.go b/functional/tester/cluster_test.go index 661c00680..5d0a66ed0 100644 --- a/functional/tester/cluster_test.go +++ b/functional/tester/cluster_test.go @@ -232,10 +232,19 @@ func Test_read(t *testing.T) { "NO_FAIL_WITH_STRESS", "NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS", }, - FailpointCommands: []string{`panic("etcd-tester")`}, - RunnerExecPath: "./bin/etcd-runner", - ExternalExecPath: "", - Stressers: []string{"KV", "LEASE"}, + FailpointCommands: []string{`panic("etcd-tester")`}, + RunnerExecPath: "./bin/etcd-runner", + ExternalExecPath: "", + Stressers: []*rpcpb.Stresser{ + {Type: "KV_WRITE_SMALL", Weight: 0.35}, + {Type: "KV_WRITE_LARGE", Weight: 0.002}, + {Type: "KV_READ_ONE_KEY", Weight: 0.07}, + {Type: "KV_READ_RANGE", Weight: 0.07}, + {Type: "KV_DELETE_ONE_KEY", Weight: 0.07}, + {Type: "KV_DELETE_RANGE", Weight: 0.07}, + {Type: "KV_TXN_WRITE_DELETE", Weight: 0.35}, + {Type: "LEASE", Weight: 0.0}, + }, Checkers: []string{"KV_HASH", "LEASE_EXPIRE"}, StressKeySize: 100, StressKeySizeLarge: 32769, diff --git a/functional/tester/stresser.go b/functional/tester/stresser.go index b74b84b15..5f4fdea34 100644 --- a/functional/tester/stresser.go +++ b/functional/tester/stresser.go @@ -37,40 +37,60 @@ type Stresser interface { // newStresser creates stresser from a comma separated list of stresser types. func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) { - stressers = make([]Stresser, len(clus.Tester.Stressers)) - for i, stype := range clus.Tester.Stressers { + // TODO: Too intensive stressing clients can panic etcd member with + // 'out of memory' error. Put rate limits in server side. + ks := &keyStresser{ + lg: clus.lg, + m: m, + keySize: int(clus.Tester.StressKeySize), + keyLargeSize: int(clus.Tester.StressKeySizeLarge), + keySuffixRange: int(clus.Tester.StressKeySuffixRange), + keyTxnSuffixRange: int(clus.Tester.StressKeySuffixRangeTxn), + keyTxnOps: int(clus.Tester.StressKeyTxnOps), + clientsN: int(clus.Tester.StressClients), + rateLimiter: clus.rateLimiter, + } + ksExist := false + + for _, s := range clus.Tester.Stressers { clus.lg.Info( "creating stresser", - zap.String("type", stype), + zap.String("type", s.Type), + zap.Float64("weight", s.Weight), zap.String("endpoint", m.EtcdClientEndpoint), ) - - switch stype { - case "KV": - // TODO: Too intensive stressing clients can panic etcd member with - // 'out of memory' error. Put rate limits in server side. - stressers[i] = &keyStresser{ - stype: rpcpb.Stresser_KV, - lg: clus.lg, - m: m, - keySize: int(clus.Tester.StressKeySize), - keyLargeSize: int(clus.Tester.StressKeySizeLarge), - keySuffixRange: int(clus.Tester.StressKeySuffixRange), - keyTxnSuffixRange: int(clus.Tester.StressKeySuffixRangeTxn), - keyTxnOps: int(clus.Tester.StressKeyTxnOps), - clientsN: int(clus.Tester.StressClients), - rateLimiter: clus.rateLimiter, - } + switch s.Type { + case "KV_WRITE_SMALL": + ksExist = true + ks.weightKVWriteSmall = s.Weight + case "KV_WRITE_LARGE": + ksExist = true + ks.weightKVWriteLarge = s.Weight + case "KV_READ_ONE_KEY": + ksExist = true + ks.weightKVReadOneKey = s.Weight + case "KV_READ_RANGE": + ksExist = true + ks.weightKVReadRange = s.Weight + case "KV_DELETE_ONE_KEY": + ksExist = true + ks.weightKVDeleteOneKey = s.Weight + case "KV_DELETE_RANGE": + ksExist = true + ks.weightKVDeleteRange = s.Weight + case "KV_TXN_WRITE_DELETE": + ksExist = true + ks.weightKVTxnWriteDelete = s.Weight case "LEASE": - stressers[i] = &leaseStresser{ - stype: rpcpb.Stresser_LEASE, + stressers = append(stressers, &leaseStresser{ + stype: rpcpb.StresserType_LEASE, lg: clus.lg, m: m, numLeases: 10, // TODO: configurable keysPerLease: 10, // TODO: configurable rateLimiter: clus.rateLimiter, - } + }) case "ELECTION_RUNNER": reqRate := 100 @@ -83,15 +103,15 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) { "--rounds=0", // runs forever "--req-rate", fmt.Sprintf("%v", reqRate), } - stressers[i] = newRunnerStresser( - rpcpb.Stresser_ELECTION_RUNNER, + stressers = append(stressers, newRunnerStresser( + rpcpb.StresserType_ELECTION_RUNNER, m.EtcdClientEndpoint, clus.lg, clus.Tester.RunnerExecPath, args, clus.rateLimiter, reqRate, - ) + )) case "WATCH_RUNNER": reqRate := 100 @@ -105,15 +125,15 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) { "--rounds=0", // runs forever "--req-rate", fmt.Sprintf("%v", reqRate), } - stressers[i] = newRunnerStresser( - rpcpb.Stresser_WATCH_RUNNER, + stressers = append(stressers, newRunnerStresser( + rpcpb.StresserType_WATCH_RUNNER, m.EtcdClientEndpoint, clus.lg, clus.Tester.RunnerExecPath, args, clus.rateLimiter, reqRate, - ) + )) case "LOCK_RACER_RUNNER": reqRate := 100 @@ -125,15 +145,15 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) { "--rounds=0", // runs forever "--req-rate", fmt.Sprintf("%v", reqRate), } - stressers[i] = newRunnerStresser( - rpcpb.Stresser_LOCK_RACER_RUNNER, + stressers = append(stressers, newRunnerStresser( + rpcpb.StresserType_LOCK_RACER_RUNNER, m.EtcdClientEndpoint, clus.lg, clus.Tester.RunnerExecPath, args, clus.rateLimiter, reqRate, - ) + )) case "LEASE_RUNNER": args := []string{ @@ -141,16 +161,20 @@ func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) { "--ttl=30", "--endpoints", m.EtcdClientEndpoint, } - stressers[i] = newRunnerStresser( - rpcpb.Stresser_LEASE_RUNNER, + stressers = append(stressers, newRunnerStresser( + rpcpb.StresserType_LEASE_RUNNER, m.EtcdClientEndpoint, clus.lg, clus.Tester.RunnerExecPath, args, clus.rateLimiter, 0, - ) + )) } } + + if ksExist { + return append(stressers, ks) + } return stressers } diff --git a/functional/tester/stresser_key.go b/functional/tester/stresser_key.go index 4889280c3..54efddb28 100644 --- a/functional/tester/stresser_key.go +++ b/functional/tester/stresser_key.go @@ -36,11 +36,18 @@ import ( ) type keyStresser struct { - stype rpcpb.Stresser - lg *zap.Logger + lg *zap.Logger m *rpcpb.Member + weightKVWriteSmall float64 + weightKVWriteLarge float64 + weightKVReadOneKey float64 + weightKVReadRange float64 + weightKVDeleteOneKey float64 + weightKVDeleteRange float64 + weightKVTxnWriteDelete float64 + keySize int keyLargeSize int keySuffixRange int @@ -75,26 +82,16 @@ func (s *keyStresser) Stress() error { s.ctx, s.cancel = context.WithCancel(context.Background()) s.wg.Add(s.clientsN) - var stressEntries = []stressEntry{ - {weight: 0.7, f: newStressPut(s.cli, s.keySuffixRange, s.keySize)}, - { - weight: 0.7 * float32(s.keySize) / float32(s.keyLargeSize), - f: newStressPut(s.cli, s.keySuffixRange, s.keyLargeSize), - }, - {weight: 0.07, f: newStressRange(s.cli, s.keySuffixRange)}, - {weight: 0.07, f: newStressRangeInterval(s.cli, s.keySuffixRange)}, - {weight: 0.07, f: newStressDelete(s.cli, s.keySuffixRange)}, - {weight: 0.07, f: newStressDeleteInterval(s.cli, s.keySuffixRange)}, - } - if s.keyTxnSuffixRange > 0 { - // adjust to make up ±70% of workloads with writes - stressEntries[0].weight = 0.35 - stressEntries = append(stressEntries, stressEntry{ - weight: 0.35, - f: newStressTxn(s.cli, s.keyTxnSuffixRange, s.keyTxnOps), - }) - } - s.stressTable = createStressTable(stressEntries) + + s.stressTable = createStressTable([]stressEntry{ + {weight: s.weightKVWriteSmall, f: newStressPut(s.cli, s.keySuffixRange, s.keySize)}, + {weight: s.weightKVWriteLarge, f: newStressPut(s.cli, s.keySuffixRange, s.keyLargeSize)}, + {weight: s.weightKVReadOneKey, f: newStressRange(s.cli, s.keySuffixRange)}, + {weight: s.weightKVReadRange, f: newStressRangeInterval(s.cli, s.keySuffixRange)}, + {weight: s.weightKVDeleteOneKey, f: newStressDelete(s.cli, s.keySuffixRange)}, + {weight: s.weightKVDeleteRange, f: newStressDeleteInterval(s.cli, s.keySuffixRange)}, + {weight: s.weightKVTxnWriteDelete, f: newStressTxn(s.cli, s.keyTxnSuffixRange, s.keyTxnOps)}, + }) s.emu.Lock() s.paused = false @@ -106,7 +103,7 @@ func (s *keyStresser) Stress() error { s.lg.Info( "stress START", - zap.String("stress-type", s.stype.String()), + zap.String("stress-type", "KV"), zap.String("endpoint", s.m.EtcdClientEndpoint), ) return nil @@ -163,7 +160,7 @@ func (s *keyStresser) run() { default: s.lg.Warn( "stress run exiting", - zap.String("stress-type", s.stype.String()), + zap.String("stress-type", "KV"), zap.String("endpoint", s.m.EtcdClientEndpoint), zap.String("error-type", reflect.TypeOf(err).String()), zap.String("error-desc", rpctypes.ErrorDesc(err)), @@ -198,7 +195,7 @@ func (s *keyStresser) Close() map[string]int { s.lg.Info( "stress STOP", - zap.String("stress-type", s.stype.String()), + zap.String("stress-type", "KV"), zap.String("endpoint", s.m.EtcdClientEndpoint), ) return ess @@ -211,13 +208,13 @@ func (s *keyStresser) ModifiedKeys() int64 { type stressFunc func(ctx context.Context) (err error, modifiedKeys int64) type stressEntry struct { - weight float32 + weight float64 f stressFunc } type stressTable struct { entries []stressEntry - sumWeights float32 + sumWeights float64 } func createStressTable(entries []stressEntry) *stressTable { @@ -229,8 +226,8 @@ func createStressTable(entries []stressEntry) *stressTable { } func (st *stressTable) choose() stressFunc { - v := rand.Float32() * st.sumWeights - var sum float32 + v := rand.Float64() * st.sumWeights + var sum float64 var idx int for i := range st.entries { sum += st.entries[i].weight diff --git a/functional/tester/stresser_lease.go b/functional/tester/stresser_lease.go index 8510a0765..673634e10 100644 --- a/functional/tester/stresser_lease.go +++ b/functional/tester/stresser_lease.go @@ -38,7 +38,7 @@ const ( ) type leaseStresser struct { - stype rpcpb.Stresser + stype rpcpb.StresserType lg *zap.Logger m *rpcpb.Member diff --git a/functional/tester/stresser_runner.go b/functional/tester/stresser_runner.go index 18487f402..8fb209801 100644 --- a/functional/tester/stresser_runner.go +++ b/functional/tester/stresser_runner.go @@ -27,7 +27,7 @@ import ( ) type runnerStresser struct { - stype rpcpb.Stresser + stype rpcpb.StresserType etcdClientEndpoint string lg *zap.Logger @@ -42,7 +42,7 @@ type runnerStresser struct { } func newRunnerStresser( - stype rpcpb.Stresser, + stype rpcpb.StresserType, ep string, lg *zap.Logger, cmdStr string,