mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #6808 from fanminshi/functional-tester-compaction-deadline-fix
etcd-tester: increase compaction timeout limit
This commit is contained in:
commit
0f5d9f00ad
@ -18,6 +18,7 @@ import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context" // grpc does a comparison on context.Cancel; can't use "context" package
|
||||
@ -46,9 +47,8 @@ type keyStresser struct {
|
||||
|
||||
cancel func()
|
||||
conn *grpc.ClientConn
|
||||
|
||||
success int
|
||||
failure int
|
||||
// atomicModifiedKeys records the number of keys created and deleted by the stresser.
|
||||
atomicModifiedKeys int64
|
||||
|
||||
stressTable *stressTable
|
||||
}
|
||||
@ -100,18 +100,13 @@ func (s *keyStresser) run(ctx context.Context) {
|
||||
// and immediate leader election. Find out what other cases this
|
||||
// could be timed out.
|
||||
sctx, scancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
err := s.stressTable.choose()(sctx)
|
||||
err, modifiedKeys := s.stressTable.choose()(sctx)
|
||||
scancel()
|
||||
if err == nil {
|
||||
s.mu.Lock()
|
||||
s.success++
|
||||
s.mu.Unlock()
|
||||
atomic.AddInt64(&s.atomicModifiedKeys, modifiedKeys)
|
||||
continue
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.failure++
|
||||
s.mu.Unlock()
|
||||
switch grpc.ErrorDesc(err) {
|
||||
case context.DeadlineExceeded.Error():
|
||||
// This retries when request is triggered at the same time as
|
||||
@ -140,8 +135,7 @@ func (s *keyStresser) run(ctx context.Context) {
|
||||
// from stresser.Cancel method:
|
||||
return
|
||||
default:
|
||||
su, fa := s.Report()
|
||||
plog.Errorf("keyStresser %v (success %d, failure %d) exited with error (%v)", s.Endpoint, su, fa, err)
|
||||
plog.Errorf("keyStresser %v exited with error (%v)", s.Endpoint, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -154,15 +148,13 @@ func (s *keyStresser) Cancel() {
|
||||
plog.Infof("keyStresser %q is canceled", s.Endpoint)
|
||||
}
|
||||
|
||||
func (s *keyStresser) Report() (int, int) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.success, s.failure
|
||||
func (s *keyStresser) ModifiedKeys() int64 {
|
||||
return atomic.LoadInt64(&s.atomicModifiedKeys)
|
||||
}
|
||||
|
||||
func (s *keyStresser) Checker() Checker { return nil }
|
||||
|
||||
type stressFunc func(ctx context.Context) error
|
||||
type stressFunc func(ctx context.Context) (err error, modifiedKeys int64)
|
||||
|
||||
type stressEntry struct {
|
||||
weight float32
|
||||
@ -197,53 +189,56 @@ func (st *stressTable) choose() stressFunc {
|
||||
}
|
||||
|
||||
func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
return func(ctx context.Context) (error, int64) {
|
||||
_, err := kvc.Put(ctx, &pb.PutRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
|
||||
Value: randBytes(keySize),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
return err, 1
|
||||
}
|
||||
}
|
||||
|
||||
func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
return func(ctx context.Context) (error, int64) {
|
||||
_, err := kvc.Range(ctx, &pb.RangeRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
return err, 0
|
||||
}
|
||||
}
|
||||
|
||||
func newStressRangeInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
return func(ctx context.Context) (error, int64) {
|
||||
start := rand.Intn(keySuffixRange)
|
||||
end := start + 500
|
||||
_, err := kvc.Range(ctx, &pb.RangeRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%016x", start)),
|
||||
RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
return err, 0
|
||||
}
|
||||
}
|
||||
|
||||
func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
return func(ctx context.Context) (error, int64) {
|
||||
_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
return err, 1
|
||||
}
|
||||
}
|
||||
|
||||
func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
return func(ctx context.Context) (error, int64) {
|
||||
start := rand.Intn(keySuffixRange)
|
||||
end := start + 500
|
||||
_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
|
||||
resp, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%016x", start)),
|
||||
RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
if err == nil {
|
||||
return nil, resp.Deleted
|
||||
}
|
||||
return err, 0
|
||||
}
|
||||
}
|
||||
|
@ -18,6 +18,8 @@ import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
@ -43,11 +45,10 @@ type leaseStresser struct {
|
||||
ctx context.Context
|
||||
|
||||
rateLimiter *rate.Limiter
|
||||
|
||||
success int
|
||||
failure int
|
||||
numLeases int
|
||||
keysPerLease int
|
||||
// atomicModifiedKey records the number of keys created and deleted during a test case
|
||||
atomicModifiedKey int64
|
||||
numLeases int
|
||||
keysPerLease int
|
||||
|
||||
aliveLeases *atomicLeases
|
||||
revokedLeases *atomicLeases
|
||||
@ -147,7 +148,9 @@ func (ls *leaseStresser) run() {
|
||||
defer ls.runWg.Done()
|
||||
ls.restartKeepAlives()
|
||||
for {
|
||||
err := ls.rateLimiter.WaitN(ls.ctx, ls.numLeases*ls.keysPerLease)
|
||||
// the number of keys created and deleted is roughly 2x the number of created keys for an iteration.
|
||||
// the rateLimiter therefore consumes 2x ls.numLeases*ls.keysPerLease tokens where each token represents a create/delete operation for key.
|
||||
err := ls.rateLimiter.WaitN(ls.ctx, 2*ls.numLeases*ls.keysPerLease)
|
||||
if err == context.Canceled {
|
||||
return
|
||||
}
|
||||
@ -366,6 +369,8 @@ func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error {
|
||||
txn := &pb.TxnRequest{Success: txnPuts}
|
||||
_, err := ls.kvc.Txn(ls.ctx, txn)
|
||||
if err == nil {
|
||||
// since all created keys will be deleted too, the number of operations on keys will be roughly 2x the number of created keys
|
||||
atomic.AddInt64(&ls.atomicModifiedKey, 2*int64(ls.keysPerLease))
|
||||
return nil
|
||||
}
|
||||
if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
|
||||
@ -400,8 +405,8 @@ func (ls *leaseStresser) Cancel() {
|
||||
plog.Infof("lease stresser %q is canceled", ls.endpoint)
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) Report() (int, int) {
|
||||
return ls.success, ls.failure
|
||||
func (ls *leaseStresser) ModifiedKeys() int64 {
|
||||
return atomic.LoadInt64(&ls.atomicModifiedKey)
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) Checker() Checker { return &leaseChecker{ls} }
|
||||
|
@ -30,8 +30,8 @@ type Stresser interface {
|
||||
Stress() error
|
||||
// Cancel cancels the stress test on the etcd cluster
|
||||
Cancel()
|
||||
// Report reports the success and failure of the stress test
|
||||
Report() (success int, failure int)
|
||||
// ModifiedKeys reports the number of keys created and deleted by stresser
|
||||
ModifiedKeys() int64
|
||||
// Checker returns an invariant checker for after the stresser is canceled.
|
||||
Checker() Checker
|
||||
}
|
||||
@ -44,8 +44,8 @@ type nopStresser struct {
|
||||
|
||||
func (s *nopStresser) Stress() error { return nil }
|
||||
func (s *nopStresser) Cancel() {}
|
||||
func (s *nopStresser) Report() (int, int) {
|
||||
return int(time.Since(s.start).Seconds()) * s.qps, 0
|
||||
func (s *nopStresser) ModifiedKeys() int64 {
|
||||
return 0
|
||||
}
|
||||
func (s *nopStresser) Checker() Checker { return nil }
|
||||
|
||||
@ -79,13 +79,11 @@ func (cs *compositeStresser) Cancel() {
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (cs *compositeStresser) Report() (succ int, fail int) {
|
||||
func (cs *compositeStresser) ModifiedKeys() (modifiedKey int64) {
|
||||
for _, stress := range cs.stressers {
|
||||
s, f := stress.Report()
|
||||
succ += s
|
||||
fail += f
|
||||
modifiedKey += stress.ModifiedKeys()
|
||||
}
|
||||
return succ, fail
|
||||
return modifiedKey
|
||||
}
|
||||
|
||||
func (cs *compositeStresser) Checker() Checker {
|
||||
|
@ -48,11 +48,11 @@ func (tt *tester) runLoop() {
|
||||
}
|
||||
|
||||
if err := tt.resetStressCheck(); err != nil {
|
||||
plog.Errorf("%s failed to start stresser (%v)", err)
|
||||
plog.Errorf("%s failed to start stresser (%v)", tt.logPrefix(), err)
|
||||
return
|
||||
}
|
||||
|
||||
var prevCompactRev int64
|
||||
var preModifiedKey int64
|
||||
for round := 0; round < tt.limit || tt.limit == -1; round++ {
|
||||
tt.status.setRound(round)
|
||||
roundTotalCounter.Inc()
|
||||
@ -62,27 +62,27 @@ func (tt *tester) runLoop() {
|
||||
if tt.cleanup() != nil {
|
||||
return
|
||||
}
|
||||
prevCompactRev = 0 // reset after clean up
|
||||
// reset preModifiedKey after clean up
|
||||
preModifiedKey = 0
|
||||
continue
|
||||
}
|
||||
// -1 so that logPrefix doesn't print out 'case'
|
||||
tt.status.setCase(-1)
|
||||
|
||||
revToCompact := max(0, tt.currentRevision-10000)
|
||||
compactN := revToCompact - prevCompactRev
|
||||
currentModifiedKey := tt.stresser.ModifiedKeys()
|
||||
modifiedKey := currentModifiedKey - preModifiedKey
|
||||
preModifiedKey = currentModifiedKey
|
||||
timeout := 10 * time.Second
|
||||
if compactN > 0 {
|
||||
timeout += time.Duration(compactN/compactQPS) * time.Second
|
||||
}
|
||||
prevCompactRev = revToCompact
|
||||
|
||||
plog.Printf("%s compacting %d entries (timeout %v)", tt.logPrefix(), compactN, timeout)
|
||||
timeout += time.Duration(modifiedKey/compactQPS) * time.Second
|
||||
plog.Printf("%s compacting %d modifications (timeout %v)", tt.logPrefix(), modifiedKey, timeout)
|
||||
if err := tt.compact(revToCompact, timeout); err != nil {
|
||||
plog.Warningf("%s functional-tester compact got error (%v)", tt.logPrefix(), err)
|
||||
if tt.cleanup() != nil {
|
||||
return
|
||||
}
|
||||
prevCompactRev = 0 // reset after clean up
|
||||
// reset preModifiedKey after clean up
|
||||
preModifiedKey = 0
|
||||
}
|
||||
if round > 0 && round%500 == 0 { // every 500 rounds
|
||||
if err := tt.defrag(); err != nil {
|
||||
@ -257,4 +257,4 @@ func (tt *tester) resetStressCheck() error {
|
||||
return tt.startStresser()
|
||||
}
|
||||
|
||||
func (tt *tester) Report() (success, failure int) { return tt.stresser.Report() }
|
||||
func (tt *tester) Report() int64 { return tt.stresser.ModifiedKeys() }
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
@ -40,9 +41,8 @@ type v2Stresser struct {
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
||||
mu sync.Mutex
|
||||
failure int
|
||||
success int
|
||||
mu sync.Mutex
|
||||
atomicModifiedKey int64
|
||||
|
||||
cancel func()
|
||||
}
|
||||
@ -84,17 +84,13 @@ func (s *v2Stresser) run(ctx context.Context, kv clientV2.KeysAPI) {
|
||||
setctx, setcancel := context.WithTimeout(ctx, clientV2.DefaultRequestTimeout)
|
||||
key := fmt.Sprintf("foo%016x", rand.Intn(s.keySuffixRange))
|
||||
_, err := kv.Set(setctx, key, string(randBytes(s.keySize)), nil)
|
||||
if err == nil {
|
||||
atomic.AddInt64(&s.atomicModifiedKey, 1)
|
||||
}
|
||||
setcancel()
|
||||
if err == context.Canceled {
|
||||
return
|
||||
}
|
||||
s.mu.Lock()
|
||||
if err != nil {
|
||||
s.failure++
|
||||
} else {
|
||||
s.success++
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@ -103,10 +99,8 @@ func (s *v2Stresser) Cancel() {
|
||||
s.wg.Wait()
|
||||
}
|
||||
|
||||
func (s *v2Stresser) Report() (success int, failure int) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.success, s.failure
|
||||
func (s *v2Stresser) ModifiedKeys() int64 {
|
||||
return atomic.LoadInt64(&s.atomicModifiedKey)
|
||||
}
|
||||
|
||||
func (s *v2Stresser) Checker() Checker { return nil }
|
||||
|
Loading…
x
Reference in New Issue
Block a user