From 107d7b663c432332e4ddc67518376963bc1e2e9f Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Fri, 4 Nov 2016 16:02:18 -0700 Subject: [PATCH] etcd-tester: changed compaction timeout calculation functional tester sometime experiences timeout during compaction phase. I changed the timeout calculation base on number of entries created and deleted. FIX #6805 --- .../etcd-tester/key_stresser.go | 51 +++++++++---------- .../etcd-tester/lease_stresser.go | 21 +++++--- .../functional-tester/etcd-tester/stresser.go | 16 +++--- tools/functional-tester/etcd-tester/tester.go | 24 ++++----- .../etcd-tester/v2_stresser.go | 22 +++----- 5 files changed, 63 insertions(+), 71 deletions(-) diff --git a/tools/functional-tester/etcd-tester/key_stresser.go b/tools/functional-tester/etcd-tester/key_stresser.go index b665b16d2..bd818b890 100644 --- a/tools/functional-tester/etcd-tester/key_stresser.go +++ b/tools/functional-tester/etcd-tester/key_stresser.go @@ -18,6 +18,7 @@ import ( "fmt" "math/rand" "sync" + "sync/atomic" "time" "golang.org/x/net/context" // grpc does a comparison on context.Cancel; can't use "context" package @@ -46,9 +47,8 @@ type keyStresser struct { cancel func() conn *grpc.ClientConn - - success int - failure int + // atomicModifiedKeys records the number of keys created and deleted by the stresser. + atomicModifiedKeys int64 stressTable *stressTable } @@ -100,18 +100,13 @@ func (s *keyStresser) run(ctx context.Context) { // and immediate leader election. Find out what other cases this // could be timed out. sctx, scancel := context.WithTimeout(ctx, 10*time.Second) - err := s.stressTable.choose()(sctx) + err, modifiedKeys := s.stressTable.choose()(sctx) scancel() if err == nil { - s.mu.Lock() - s.success++ - s.mu.Unlock() + atomic.AddInt64(&s.atomicModifiedKeys, modifiedKeys) continue } - s.mu.Lock() - s.failure++ - s.mu.Unlock() switch grpc.ErrorDesc(err) { case context.DeadlineExceeded.Error(): // This retries when request is triggered at the same time as @@ -140,8 +135,7 @@ func (s *keyStresser) run(ctx context.Context) { // from stresser.Cancel method: return default: - su, fa := s.Report() - plog.Errorf("keyStresser %v (success %d, failure %d) exited with error (%v)", s.Endpoint, su, fa, err) + plog.Errorf("keyStresser %v exited with error (%v)", s.Endpoint, err) return } } @@ -154,15 +148,13 @@ func (s *keyStresser) Cancel() { plog.Infof("keyStresser %q is canceled", s.Endpoint) } -func (s *keyStresser) Report() (int, int) { - s.mu.Lock() - defer s.mu.Unlock() - return s.success, s.failure +func (s *keyStresser) ModifiedKeys() int64 { + return atomic.LoadInt64(&s.atomicModifiedKeys) } func (s *keyStresser) Checker() Checker { return nil } -type stressFunc func(ctx context.Context) error +type stressFunc func(ctx context.Context) (err error, modifiedKeys int64) type stressEntry struct { weight float32 @@ -197,53 +189,56 @@ func (st *stressTable) choose() stressFunc { } func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc { - return func(ctx context.Context) error { + return func(ctx context.Context) (error, int64) { _, err := kvc.Put(ctx, &pb.PutRequest{ Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))), Value: randBytes(keySize), }, grpc.FailFast(false)) - return err + return err, 1 } } func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc { - return func(ctx context.Context) error { + return func(ctx context.Context) (error, int64) { _, err := kvc.Range(ctx, &pb.RangeRequest{ Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))), }, grpc.FailFast(false)) - return err + return err, 0 } } func newStressRangeInterval(kvc pb.KVClient, keySuffixRange int) stressFunc { - return func(ctx context.Context) error { + return func(ctx context.Context) (error, int64) { start := rand.Intn(keySuffixRange) end := start + 500 _, err := kvc.Range(ctx, &pb.RangeRequest{ Key: []byte(fmt.Sprintf("foo%016x", start)), RangeEnd: []byte(fmt.Sprintf("foo%016x", end)), }, grpc.FailFast(false)) - return err + return err, 0 } } func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc { - return func(ctx context.Context) error { + return func(ctx context.Context) (error, int64) { _, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{ Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))), }, grpc.FailFast(false)) - return err + return err, 1 } } func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange int) stressFunc { - return func(ctx context.Context) error { + return func(ctx context.Context) (error, int64) { start := rand.Intn(keySuffixRange) end := start + 500 - _, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{ + resp, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{ Key: []byte(fmt.Sprintf("foo%016x", start)), RangeEnd: []byte(fmt.Sprintf("foo%016x", end)), }, grpc.FailFast(false)) - return err + if err == nil { + return nil, resp.Deleted + } + return err, 0 } } diff --git a/tools/functional-tester/etcd-tester/lease_stresser.go b/tools/functional-tester/etcd-tester/lease_stresser.go index 1e95f4693..6a251cb90 100644 --- a/tools/functional-tester/etcd-tester/lease_stresser.go +++ b/tools/functional-tester/etcd-tester/lease_stresser.go @@ -18,6 +18,8 @@ import ( "fmt" "math/rand" "sync" + "sync/atomic" + "time" "github.com/coreos/etcd/clientv3" @@ -43,11 +45,10 @@ type leaseStresser struct { ctx context.Context rateLimiter *rate.Limiter - - success int - failure int - numLeases int - keysPerLease int + // atomicModifiedKey records the number of keys created and deleted during a test case + atomicModifiedKey int64 + numLeases int + keysPerLease int aliveLeases *atomicLeases revokedLeases *atomicLeases @@ -147,7 +148,9 @@ func (ls *leaseStresser) run() { defer ls.runWg.Done() ls.restartKeepAlives() for { - err := ls.rateLimiter.WaitN(ls.ctx, ls.numLeases*ls.keysPerLease) + // the number of keys created and deleted is roughly 2x the number of created keys for an iteration. + // the rateLimiter therefore consumes 2x ls.numLeases*ls.keysPerLease tokens where each token represents a create/delete operation for key. + err := ls.rateLimiter.WaitN(ls.ctx, 2*ls.numLeases*ls.keysPerLease) if err == context.Canceled { return } @@ -366,6 +369,8 @@ func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error { txn := &pb.TxnRequest{Success: txnPuts} _, err := ls.kvc.Txn(ls.ctx, txn) if err == nil { + // since all created keys will be deleted too, the number of operations on keys will be roughly 2x the number of created keys + atomic.AddInt64(&ls.atomicModifiedKey, 2*int64(ls.keysPerLease)) return nil } if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound { @@ -400,8 +405,8 @@ func (ls *leaseStresser) Cancel() { plog.Infof("lease stresser %q is canceled", ls.endpoint) } -func (ls *leaseStresser) Report() (int, int) { - return ls.success, ls.failure +func (ls *leaseStresser) ModifiedKeys() int64 { + return atomic.LoadInt64(&ls.atomicModifiedKey) } func (ls *leaseStresser) Checker() Checker { return &leaseChecker{ls} } diff --git a/tools/functional-tester/etcd-tester/stresser.go b/tools/functional-tester/etcd-tester/stresser.go index 7732f76bd..ea8968d58 100644 --- a/tools/functional-tester/etcd-tester/stresser.go +++ b/tools/functional-tester/etcd-tester/stresser.go @@ -30,8 +30,8 @@ type Stresser interface { Stress() error // Cancel cancels the stress test on the etcd cluster Cancel() - // Report reports the success and failure of the stress test - Report() (success int, failure int) + // ModifiedKeys reports the number of keys created and deleted by stresser + ModifiedKeys() int64 // Checker returns an invariant checker for after the stresser is canceled. Checker() Checker } @@ -44,8 +44,8 @@ type nopStresser struct { func (s *nopStresser) Stress() error { return nil } func (s *nopStresser) Cancel() {} -func (s *nopStresser) Report() (int, int) { - return int(time.Since(s.start).Seconds()) * s.qps, 0 +func (s *nopStresser) ModifiedKeys() int64 { + return 0 } func (s *nopStresser) Checker() Checker { return nil } @@ -79,13 +79,11 @@ func (cs *compositeStresser) Cancel() { wg.Wait() } -func (cs *compositeStresser) Report() (succ int, fail int) { +func (cs *compositeStresser) ModifiedKeys() (modifiedKey int64) { for _, stress := range cs.stressers { - s, f := stress.Report() - succ += s - fail += f + modifiedKey += stress.ModifiedKeys() } - return succ, fail + return modifiedKey } func (cs *compositeStresser) Checker() Checker { diff --git a/tools/functional-tester/etcd-tester/tester.go b/tools/functional-tester/etcd-tester/tester.go index f76e1fa94..62b3ec9bc 100644 --- a/tools/functional-tester/etcd-tester/tester.go +++ b/tools/functional-tester/etcd-tester/tester.go @@ -48,11 +48,11 @@ func (tt *tester) runLoop() { } if err := tt.resetStressCheck(); err != nil { - plog.Errorf("%s failed to start stresser (%v)", err) + plog.Errorf("%s failed to start stresser (%v)", tt.logPrefix(), err) return } - var prevCompactRev int64 + var preModifiedKey int64 for round := 0; round < tt.limit || tt.limit == -1; round++ { tt.status.setRound(round) roundTotalCounter.Inc() @@ -62,27 +62,27 @@ func (tt *tester) runLoop() { if tt.cleanup() != nil { return } - prevCompactRev = 0 // reset after clean up + // reset preModifiedKey after clean up + preModifiedKey = 0 continue } // -1 so that logPrefix doesn't print out 'case' tt.status.setCase(-1) revToCompact := max(0, tt.currentRevision-10000) - compactN := revToCompact - prevCompactRev + currentModifiedKey := tt.stresser.ModifiedKeys() + modifiedKey := currentModifiedKey - preModifiedKey + preModifiedKey = currentModifiedKey timeout := 10 * time.Second - if compactN > 0 { - timeout += time.Duration(compactN/compactQPS) * time.Second - } - prevCompactRev = revToCompact - - plog.Printf("%s compacting %d entries (timeout %v)", tt.logPrefix(), compactN, timeout) + timeout += time.Duration(modifiedKey/compactQPS) * time.Second + plog.Printf("%s compacting %d modifications (timeout %v)", tt.logPrefix(), modifiedKey, timeout) if err := tt.compact(revToCompact, timeout); err != nil { plog.Warningf("%s functional-tester compact got error (%v)", tt.logPrefix(), err) if tt.cleanup() != nil { return } - prevCompactRev = 0 // reset after clean up + // reset preModifiedKey after clean up + preModifiedKey = 0 } if round > 0 && round%500 == 0 { // every 500 rounds if err := tt.defrag(); err != nil { @@ -257,4 +257,4 @@ func (tt *tester) resetStressCheck() error { return tt.startStresser() } -func (tt *tester) Report() (success, failure int) { return tt.stresser.Report() } +func (tt *tester) Report() int64 { return tt.stresser.ModifiedKeys() } diff --git a/tools/functional-tester/etcd-tester/v2_stresser.go b/tools/functional-tester/etcd-tester/v2_stresser.go index 24c1126dc..5ee1729ed 100644 --- a/tools/functional-tester/etcd-tester/v2_stresser.go +++ b/tools/functional-tester/etcd-tester/v2_stresser.go @@ -21,6 +21,7 @@ import ( "net" "net/http" "sync" + "sync/atomic" "time" "golang.org/x/time/rate" @@ -40,9 +41,8 @@ type v2Stresser struct { wg sync.WaitGroup - mu sync.Mutex - failure int - success int + mu sync.Mutex + atomicModifiedKey int64 cancel func() } @@ -84,17 +84,13 @@ func (s *v2Stresser) run(ctx context.Context, kv clientV2.KeysAPI) { setctx, setcancel := context.WithTimeout(ctx, clientV2.DefaultRequestTimeout) key := fmt.Sprintf("foo%016x", rand.Intn(s.keySuffixRange)) _, err := kv.Set(setctx, key, string(randBytes(s.keySize)), nil) + if err == nil { + atomic.AddInt64(&s.atomicModifiedKey, 1) + } setcancel() if err == context.Canceled { return } - s.mu.Lock() - if err != nil { - s.failure++ - } else { - s.success++ - } - s.mu.Unlock() } } @@ -103,10 +99,8 @@ func (s *v2Stresser) Cancel() { s.wg.Wait() } -func (s *v2Stresser) Report() (success int, failure int) { - s.mu.Lock() - defer s.mu.Unlock() - return s.success, s.failure +func (s *v2Stresser) ModifiedKeys() int64 { + return atomic.LoadInt64(&s.atomicModifiedKey) } func (s *v2Stresser) Checker() Checker { return nil }