From 3ce73b70bc0e52c34c49712d0bcf84ae77710e48 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Mon, 14 Aug 2017 15:38:25 -0700 Subject: [PATCH] etcd-tester: add txn stresser Signed-off-by: Gyu-Ho Lee --- .../etcd-tester/key_stresser.go | 90 ++++++++++++++++++- tools/functional-tester/etcd-tester/main.go | 22 +++-- .../functional-tester/etcd-tester/stresser.go | 22 +++-- 3 files changed, 116 insertions(+), 18 deletions(-) diff --git a/tools/functional-tester/etcd-tester/key_stresser.go b/tools/functional-tester/etcd-tester/key_stresser.go index 3b29fb199..f4ab77906 100644 --- a/tools/functional-tester/etcd-tester/key_stresser.go +++ b/tools/functional-tester/etcd-tester/key_stresser.go @@ -34,9 +34,11 @@ import ( type keyStresser struct { Endpoint string - keyLargeSize int - keySize int - keySuffixRange int + keyLargeSize int + keySize int + keySuffixRange int + keyTxnSuffixRange int + keyTxnOps int N int @@ -77,6 +79,15 @@ func (s *keyStresser) Stress() error { {weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)}, {weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)}, } + if s.keyTxnSuffixRange > 0 { + // adjust to make up ±70% of workloads with writes + stressEntries[0].weight = 0.24 + stressEntries[1].weight = 0.24 + stressEntries = append(stressEntries, stressEntry{ + weight: 0.24, + f: newStressTxn(kvc, s.keyTxnSuffixRange, s.keyTxnOps), + }) + } s.stressTable = createStressTable(stressEntries) for i := 0; i < s.N; i++ { @@ -202,6 +213,79 @@ func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc { } } +func newStressTxn(kvc pb.KVClient, keyTxnSuffixRange, txnOps int) stressFunc { + keys := make([]string, keyTxnSuffixRange) + for i := range keys { + keys[i] = fmt.Sprintf("/k%03d", i) + } + return writeTxn(kvc, keys, txnOps) +} + +func writeTxn(kvc pb.KVClient, keys []string, txnOps int) stressFunc { + return func(ctx context.Context) (error, int64) { + ks := make(map[string]struct{}, txnOps) + for len(ks) != txnOps { + ks[keys[rand.Intn(64)]] = struct{}{} + } + selected := make([]string, 0, txnOps) + for k := range ks { + selected = append(selected, k) + } + com, delOp, putOp := getTxnReqs(selected[0], "bar00") + txnReq := &pb.TxnRequest{ + Compare: []*pb.Compare{com}, + Success: []*pb.RequestOp{delOp}, + Failure: []*pb.RequestOp{putOp}, + } + + // add nested txns if any + for i := 1; i < txnOps; i++ { + k, v := selected[i], fmt.Sprintf("bar%02d", i) + com, delOp, putOp = getTxnReqs(k, v) + nested := &pb.RequestOp{ + Request: &pb.RequestOp_RequestTxn{ + RequestTxn: &pb.TxnRequest{ + Compare: []*pb.Compare{com}, + Success: []*pb.RequestOp{delOp}, + Failure: []*pb.RequestOp{putOp}, + }, + }, + } + txnReq.Success = append(txnReq.Success, nested) + txnReq.Failure = append(txnReq.Failure, nested) + } + + _, err := kvc.Txn(ctx, txnReq, grpc.FailFast(false)) + return err, int64(txnOps) + } +} + +func getTxnReqs(key, val string) (com *pb.Compare, delOp *pb.RequestOp, putOp *pb.RequestOp) { + // if key exists (version > 0) + com = &pb.Compare{ + Key: []byte(key), + Target: pb.Compare_VERSION, + Result: pb.Compare_GREATER, + TargetUnion: &pb.Compare_Version{Version: 0}, + } + delOp = &pb.RequestOp{ + Request: &pb.RequestOp_RequestDeleteRange{ + RequestDeleteRange: &pb.DeleteRangeRequest{ + Key: []byte(key), + }, + }, + } + putOp = &pb.RequestOp{ + Request: &pb.RequestOp_RequestPut{ + RequestPut: &pb.PutRequest{ + Key: []byte(key), + Value: []byte(val), + }, + }, + } + return com, delOp, putOp +} + func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc { return func(ctx context.Context) (error, int64) { _, err := kvc.Range(ctx, &pb.RangeRequest{ diff --git a/tools/functional-tester/etcd-tester/main.go b/tools/functional-tester/etcd-tester/main.go index c0f77c2f1..0197b1a95 100644 --- a/tools/functional-tester/etcd-tester/main.go +++ b/tools/functional-tester/etcd-tester/main.go @@ -47,6 +47,8 @@ func main() { stressKeyLargeSize := flag.Uint("stress-key-large-size", 32*1024+1, "the size of each large key written into etcd.") stressKeySize := flag.Uint("stress-key-size", 100, "the size of each small key written into etcd.") stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.") + stressKeyTxnSuffixRange := flag.Uint("stress-key-txn-count", 100, "the count of key range written into etcd txn (max 100).") + stressKeyTxnOps := flag.Uint("stress-key-txn-ops", 1, "number of operations per a transaction (max 64).") limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).") exitOnFailure := flag.Bool("exit-on-failure", false, "exit tester on first failure") stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.") @@ -120,15 +122,23 @@ func main() { } scfg := stressConfig{ - rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS), - keyLargeSize: int(*stressKeyLargeSize), - keySize: int(*stressKeySize), - keySuffixRange: int(*stressKeySuffixRange), - numLeases: 10, - keysPerLease: 10, + rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS), + keyLargeSize: int(*stressKeyLargeSize), + keySize: int(*stressKeySize), + keySuffixRange: int(*stressKeySuffixRange), + keyTxnSuffixRange: int(*stressKeyTxnSuffixRange), + keyTxnOps: int(*stressKeyTxnOps), + numLeases: 10, + keysPerLease: 10, etcdRunnerPath: *etcdRunnerPath, } + if scfg.keyTxnSuffixRange > 100 { + plog.Fatalf("stress-key-txn-count is maximum 100, got %d", scfg.keyTxnSuffixRange) + } + if scfg.keyTxnOps > 64 { + plog.Fatalf("stress-key-txn-ops is maximum 64, got %d", scfg.keyTxnOps) + } t := &tester{ failures: schedule, diff --git a/tools/functional-tester/etcd-tester/stresser.go b/tools/functional-tester/etcd-tester/stresser.go index f9ab3f9fb..bf0d88214 100644 --- a/tools/functional-tester/etcd-tester/stresser.go +++ b/tools/functional-tester/etcd-tester/stresser.go @@ -113,9 +113,11 @@ func (cs *compositeStresser) Checker() Checker { } type stressConfig struct { - keyLargeSize int - keySize int - keySuffixRange int + keyLargeSize int + keySize int + keySuffixRange int + keyTxnSuffixRange int + keyTxnOps int numLeases int keysPerLease int @@ -142,12 +144,14 @@ func NewStresser(s string, sc *stressConfig, m *member) Stresser { // TODO: Too intensive stressers can panic etcd member with // 'out of memory' error. Put rate limits in server side. return &keyStresser{ - Endpoint: m.grpcAddr(), - keyLargeSize: sc.keyLargeSize, - keySize: sc.keySize, - keySuffixRange: sc.keySuffixRange, - N: 100, - rateLimiter: sc.rateLimiter, + Endpoint: m.grpcAddr(), + keyLargeSize: sc.keyLargeSize, + keySize: sc.keySize, + keySuffixRange: sc.keySuffixRange, + keyTxnSuffixRange: sc.keyTxnSuffixRange, + keyTxnOps: sc.keyTxnOps, + N: 100, + rateLimiter: sc.rateLimiter, } case "v2keys": return &v2Stresser{