diff --git a/tools/functional-tester/etcd-tester/cluster.go b/tools/functional-tester/etcd-tester/cluster.go index 04f2d9054..9f3e40cbc 100644 --- a/tools/functional-tester/etcd-tester/cluster.go +++ b/tools/functional-tester/etcd-tester/cluster.go @@ -320,7 +320,7 @@ func (c *cluster) getRevisionHash() (map[string]int64, map[string]int64, error) return revs, hashes, nil } -func (c *cluster) compactKV(rev int64) (err error) { +func (c *cluster) compactKV(rev int64, timeout time.Duration) (err error) { if rev <= 0 { return nil } @@ -332,7 +332,7 @@ func (c *cluster) compactKV(rev int64) (err error) { continue } kvc := pb.NewKVClient(conn) - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), timeout) _, cerr := kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev, Physical: true}) cancel() conn.Close() diff --git a/tools/functional-tester/etcd-tester/tester.go b/tools/functional-tester/etcd-tester/tester.go index 6647b210e..1b4be937b 100644 --- a/tools/functional-tester/etcd-tester/tester.go +++ b/tools/functional-tester/etcd-tester/tester.go @@ -29,6 +29,10 @@ type tester struct { currentRevision int64 } +// compactQPS is rough number of compact requests per second. +// Previous tests showed etcd can compact about 60,000 entries per second. +const compactQPS = 50000 + func (tt *tester) runLoop() { tt.status.Since = time.Now() tt.status.RoundLimit = tt.limit @@ -42,7 +46,10 @@ func (tt *tester) runLoop() { tt.status.setCase(-1) // -1 so that logPrefix doesn't print out 'case' roundTotalCounter.Inc() - var failed bool + var ( + prevCompactRev int64 + failed bool + ) for j, f := range tt.failures { caseTotalCounter.WithLabelValues(f.Desc()).Inc() tt.status.setCase(j) @@ -105,7 +112,15 @@ func (tt *tester) runLoop() { } revToCompact := max(0, tt.currentRevision-10000) - if err := tt.compact(revToCompact); err != nil { + compactN := revToCompact - prevCompactRev + timeout := 10 * time.Second + if prevCompactRev != 0 && compactN > 0 { + timeout += time.Duration(compactN/compactQPS) * time.Second + } + prevCompactRev = revToCompact + + plog.Printf("%s compacting %d entries (timeout %v)", tt.logPrefix(), compactN, timeout) + if err := tt.compact(revToCompact, timeout); err != nil { plog.Warningf("%s functional-tester returning with error (%v)", tt.logPrefix(), err) return } @@ -164,18 +179,18 @@ func (tt *tester) startStressers() { plog.Printf("%s started stressers", tt.logPrefix()) } -func (tt *tester) compact(rev int64) error { - plog.Printf("%s compacting storage at %d (current revision %d)", tt.logPrefix(), rev, tt.currentRevision) - if err := tt.cluster.compactKV(rev); err != nil { +func (tt *tester) compact(rev int64, timeout time.Duration) error { + plog.Printf("%s compacting storage (current revision %d, compact revision %d)", tt.logPrefix(), tt.currentRevision, rev) + if err := tt.cluster.compactKV(rev, timeout); err != nil { plog.Printf("%s compactKV error (%v)", tt.logPrefix(), err) if cerr := tt.cleanup(); cerr != nil { return fmt.Errorf("%s, %s", err, cerr) } return err } - plog.Printf("%s compacted storage at %d", tt.logPrefix(), rev) + plog.Printf("%s compacted storage (compact revision %d)", tt.logPrefix(), rev) - plog.Printf("%s checking compaction at %d", tt.logPrefix(), rev) + plog.Printf("%s checking compaction (compact revision %d)", tt.logPrefix(), rev) if err := tt.cluster.checkCompact(rev); err != nil { plog.Printf("%s checkCompact error (%v)", tt.logPrefix(), err) if cerr := tt.cleanup(); cerr != nil { @@ -184,7 +199,7 @@ func (tt *tester) compact(rev int64) error { return err } - plog.Printf("%s confirmed compaction at %d", tt.logPrefix(), rev) + plog.Printf("%s confirmed compaction (compact revision %d)", tt.logPrefix(), rev) return nil }