diff --git a/tools/functional-tester/etcd-tester/stresser.go b/tools/functional-tester/etcd-tester/stresser.go index c5d8a4225..434cba72e 100644 --- a/tools/functional-tester/etcd-tester/stresser.go +++ b/tools/functional-tester/etcd-tester/stresser.go @@ -79,7 +79,6 @@ func (s *stresser) Stress() error { }) putcancel() if grpc.ErrorDesc(err) == context.Canceled.Error() { - log.Printf("etcd-tester: stresser#%d is cancelled", i) return } s.mu.Lock() diff --git a/tools/functional-tester/etcd-tester/tester.go b/tools/functional-tester/etcd-tester/tester.go index 66b3aaba6..979aeadb9 100644 --- a/tools/functional-tester/etcd-tester/tester.go +++ b/tools/functional-tester/etcd-tester/tester.go @@ -55,6 +55,7 @@ func (tt *tester) runLoop() { continue } log.Printf("etcd-tester: [round#%d case#%d] start failure %s", i, j, f.Desc()) + log.Printf("etcd-tester: [round#%d case#%d] start injecting failure...", i, j) if err := f.Inject(tt.cluster, i); err != nil { log.Printf("etcd-tester: [round#%d case#%d] injection error: %v", i, j, err) @@ -64,6 +65,8 @@ func (tt *tester) runLoop() { } continue } + log.Printf("etcd-tester: [round#%d case#%d] injected failure", i, j) + log.Printf("etcd-tester: [round#%d case#%d] start recovering failure...", i, j) if err := f.Recover(tt.cluster, i); err != nil { log.Printf("etcd-tester: [round#%d case#%d] recovery error: %v", i, j, err) @@ -73,6 +76,7 @@ func (tt *tester) runLoop() { } continue } + log.Printf("etcd-tester: [round#%d case#%d] recovered failure", i, j) if tt.cluster.v2Only { log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j) @@ -83,25 +87,26 @@ func (tt *tester) runLoop() { for _, s := range tt.cluster.Stressers { s.Cancel() } + log.Printf("etcd-tester: [round#%d case#%d] canceled stressers", i, j) + log.Printf("etcd-tester: [round#%d case#%d] checking current revisions...", i, j) ok := false + var currentRevision int64 for k := 0; k < 5; k++ { time.Sleep(time.Second) - log.Printf("etcd-tester: [round#%d case#%d.%d] checking current revisions...", i, j, k) revs, err := tt.cluster.getRevision() if err != nil { if e := tt.cleanup(i, j); e != nil { log.Printf("etcd-tester: [round#%d case#%d.%d] cleanup error: %v", i, j, k, e) return } - log.Printf("etcd-tester: [round#%d case#%d.%d] failed to get revisions (%v)", i, j, k, err) + log.Printf("etcd-tester: [round#%d case#%d.%d] failed to get current revisions (%v)", i, j, k, err) continue } - if ok = isSameValueInMap(revs); ok { - log.Printf("etcd-tester: [round#%d case#%d.%d] checking current revisions succeed!", i, j, k) + if currentRevision, ok = getSameValue(revs); ok { break } else { - log.Printf("etcd-tester: [round#%d case#%d.%d] current revisions %+v", i, j, k, revs) + log.Printf("etcd-tester: [round#%d case#%d.%d] inconsistent current revisions %+v", i, j, k, revs) } } if !ok { @@ -112,6 +117,7 @@ func (tt *tester) runLoop() { } continue } + log.Printf("etcd-tester: [round#%d case#%d] all members are consistent with current revisions", i, j) log.Printf("etcd-tester: [round#%d case#%d] checking current storage hashes...", i, j) hashes, err := tt.cluster.getKVHash() @@ -122,20 +128,32 @@ func (tt *tester) runLoop() { return } } - if !isSameValueInMap(hashes) { + if _, ok = getSameValue(hashes); !ok { if err := tt.cleanup(i, j); err != nil { log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err) return } continue } - log.Printf("etcd-tester: [round#%d case#%d] all members are consistent!", i, j) - log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j) + log.Printf("etcd-tester: [round#%d case#%d] all members are consistent with storage hashes", i, j) + + revToCompact := max(0, currentRevision-10000) + log.Printf("etcd-tester: [round#%d case#%d] compacting storage at %d (current revision %d)", i, j, revToCompact, currentRevision) + if err := tt.cluster.compactKV(revToCompact); err != nil { + log.Printf("etcd-tester: [round#%d case#%d] compactKV error (%v)", i, j, err) + if err := tt.cleanup(i, j); err != nil { + log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err) + return + } + } + log.Printf("etcd-tester: [round#%d case#%d] compacted storage", i, j) log.Printf("etcd-tester: [round#%d case#%d] restarting the stressers...", i, j) for _, s := range tt.cluster.Stressers { go s.Stress() } + + log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j) } } } @@ -221,7 +239,7 @@ func (c *cluster) getKVHash() (map[string]int64, error) { return hashes, nil } -func isSameValueInMap(hashes map[string]int64) bool { +func getSameValue(hashes map[string]int64) (int64, bool) { var rv int64 ok := true for _, v := range hashes { @@ -233,5 +251,33 @@ func isSameValueInMap(hashes map[string]int64) bool { break } } - return ok + return rv, ok +} + +func max(n1, n2 int64) int64 { + if n1 > n2 { + return n1 + } + return n2 +} + +func (c *cluster) compactKV(rev int64) error { + var ( + conn *grpc.ClientConn + err error + ) + for _, u := range c.GRPCURLs { + conn, err = grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second)) + if err != nil { + continue + } + kvc := pb.NewKVClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + _, err = kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev}) + cancel() + if err == nil { + return nil + } + } + return err }