etcd-tester: add txn stresser

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyu-Ho Lee 2017-08-14 15:38:25 -07:00 committed by Gyuho Lee
parent ee3c81d8d3
commit 3ce73b70bc
3 changed files with 116 additions and 18 deletions

View File

@ -34,9 +34,11 @@ import (
type keyStresser struct {
Endpoint string
keyLargeSize int
keySize int
keySuffixRange int
keyLargeSize int
keySize int
keySuffixRange int
keyTxnSuffixRange int
keyTxnOps int
N int
@ -77,6 +79,15 @@ func (s *keyStresser) Stress() error {
{weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)},
{weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)},
}
if s.keyTxnSuffixRange > 0 {
// adjust to make up ±70% of workloads with writes
stressEntries[0].weight = 0.24
stressEntries[1].weight = 0.24
stressEntries = append(stressEntries, stressEntry{
weight: 0.24,
f: newStressTxn(kvc, s.keyTxnSuffixRange, s.keyTxnOps),
})
}
s.stressTable = createStressTable(stressEntries)
for i := 0; i < s.N; i++ {
@ -202,6 +213,79 @@ func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
}
}
func newStressTxn(kvc pb.KVClient, keyTxnSuffixRange, txnOps int) stressFunc {
keys := make([]string, keyTxnSuffixRange)
for i := range keys {
keys[i] = fmt.Sprintf("/k%03d", i)
}
return writeTxn(kvc, keys, txnOps)
}
func writeTxn(kvc pb.KVClient, keys []string, txnOps int) stressFunc {
return func(ctx context.Context) (error, int64) {
ks := make(map[string]struct{}, txnOps)
for len(ks) != txnOps {
ks[keys[rand.Intn(64)]] = struct{}{}
}
selected := make([]string, 0, txnOps)
for k := range ks {
selected = append(selected, k)
}
com, delOp, putOp := getTxnReqs(selected[0], "bar00")
txnReq := &pb.TxnRequest{
Compare: []*pb.Compare{com},
Success: []*pb.RequestOp{delOp},
Failure: []*pb.RequestOp{putOp},
}
// add nested txns if any
for i := 1; i < txnOps; i++ {
k, v := selected[i], fmt.Sprintf("bar%02d", i)
com, delOp, putOp = getTxnReqs(k, v)
nested := &pb.RequestOp{
Request: &pb.RequestOp_RequestTxn{
RequestTxn: &pb.TxnRequest{
Compare: []*pb.Compare{com},
Success: []*pb.RequestOp{delOp},
Failure: []*pb.RequestOp{putOp},
},
},
}
txnReq.Success = append(txnReq.Success, nested)
txnReq.Failure = append(txnReq.Failure, nested)
}
_, err := kvc.Txn(ctx, txnReq, grpc.FailFast(false))
return err, int64(txnOps)
}
}
func getTxnReqs(key, val string) (com *pb.Compare, delOp *pb.RequestOp, putOp *pb.RequestOp) {
// if key exists (version > 0)
com = &pb.Compare{
Key: []byte(key),
Target: pb.Compare_VERSION,
Result: pb.Compare_GREATER,
TargetUnion: &pb.Compare_Version{Version: 0},
}
delOp = &pb.RequestOp{
Request: &pb.RequestOp_RequestDeleteRange{
RequestDeleteRange: &pb.DeleteRangeRequest{
Key: []byte(key),
},
},
}
putOp = &pb.RequestOp{
Request: &pb.RequestOp_RequestPut{
RequestPut: &pb.PutRequest{
Key: []byte(key),
Value: []byte(val),
},
},
}
return com, delOp, putOp
}
func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
return func(ctx context.Context) (error, int64) {
_, err := kvc.Range(ctx, &pb.RangeRequest{

View File

@ -47,6 +47,8 @@ func main() {
stressKeyLargeSize := flag.Uint("stress-key-large-size", 32*1024+1, "the size of each large key written into etcd.")
stressKeySize := flag.Uint("stress-key-size", 100, "the size of each small key written into etcd.")
stressKeySuffixRange := flag.Uint("stress-key-count", 250000, "the count of key range written into etcd.")
stressKeyTxnSuffixRange := flag.Uint("stress-key-txn-count", 100, "the count of key range written into etcd txn (max 100).")
stressKeyTxnOps := flag.Uint("stress-key-txn-ops", 1, "number of operations per a transaction (max 64).")
limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
exitOnFailure := flag.Bool("exit-on-failure", false, "exit tester on first failure")
stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.")
@ -120,15 +122,23 @@ func main() {
}
scfg := stressConfig{
rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS),
keyLargeSize: int(*stressKeyLargeSize),
keySize: int(*stressKeySize),
keySuffixRange: int(*stressKeySuffixRange),
numLeases: 10,
keysPerLease: 10,
rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS),
keyLargeSize: int(*stressKeyLargeSize),
keySize: int(*stressKeySize),
keySuffixRange: int(*stressKeySuffixRange),
keyTxnSuffixRange: int(*stressKeyTxnSuffixRange),
keyTxnOps: int(*stressKeyTxnOps),
numLeases: 10,
keysPerLease: 10,
etcdRunnerPath: *etcdRunnerPath,
}
if scfg.keyTxnSuffixRange > 100 {
plog.Fatalf("stress-key-txn-count is maximum 100, got %d", scfg.keyTxnSuffixRange)
}
if scfg.keyTxnOps > 64 {
plog.Fatalf("stress-key-txn-ops is maximum 64, got %d", scfg.keyTxnOps)
}
t := &tester{
failures: schedule,

View File

@ -113,9 +113,11 @@ func (cs *compositeStresser) Checker() Checker {
}
type stressConfig struct {
keyLargeSize int
keySize int
keySuffixRange int
keyLargeSize int
keySize int
keySuffixRange int
keyTxnSuffixRange int
keyTxnOps int
numLeases int
keysPerLease int
@ -142,12 +144,14 @@ func NewStresser(s string, sc *stressConfig, m *member) Stresser {
// TODO: Too intensive stressers can panic etcd member with
// 'out of memory' error. Put rate limits in server side.
return &keyStresser{
Endpoint: m.grpcAddr(),
keyLargeSize: sc.keyLargeSize,
keySize: sc.keySize,
keySuffixRange: sc.keySuffixRange,
N: 100,
rateLimiter: sc.rateLimiter,
Endpoint: m.grpcAddr(),
keyLargeSize: sc.keyLargeSize,
keySize: sc.keySize,
keySuffixRange: sc.keySuffixRange,
keyTxnSuffixRange: sc.keyTxnSuffixRange,
keyTxnOps: sc.keyTxnOps,
N: 100,
rateLimiter: sc.rateLimiter,
}
case "v2keys":
return &v2Stresser{