From cc7dd9b729c8bcfca912176033f7adcfe4038bac Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Thu, 5 May 2016 10:42:03 -0700 Subject: [PATCH] etcd-tester: refactor --- tools/functional-tester/etcd-tester/main.go | 2 +- tools/functional-tester/etcd-tester/status.go | 57 ++++ tools/functional-tester/etcd-tester/tester.go | 311 ++++++++++-------- tools/functional-tester/etcd-tester/util.go | 2 +- 4 files changed, 228 insertions(+), 144 deletions(-) create mode 100644 tools/functional-tester/etcd-tester/status.go diff --git a/tools/functional-tester/etcd-tester/main.go b/tools/functional-tester/etcd-tester/main.go index f495368ae..ea1da6a64 100644 --- a/tools/functional-tester/etcd-tester/main.go +++ b/tools/functional-tester/etcd-tester/main.go @@ -31,7 +31,7 @@ func main() { datadir := flag.String("data-dir", "agent.etcd", "etcd data directory location on agent machine.") stressKeySize := flag.Int("stress-key-size", 100, "the size of each key written into etcd.") stressKeySuffixRange := flag.Int("stress-key-count", 250000, "the count of key range written into etcd.") - limit := flag.Int("limit", 3, "the limit of rounds to run failure set.") + limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).") schedCases := flag.String("schedule-cases", "", "test case schedule") consistencyCheck := flag.Bool("consistency-check", true, "true to check consistency (revision, hash)") isV2Only := flag.Bool("v2-only", false, "'true' to run V2 only tester.") diff --git a/tools/functional-tester/etcd-tester/status.go b/tools/functional-tester/etcd-tester/status.go new file mode 100644 index 000000000..1151f92bd --- /dev/null +++ b/tools/functional-tester/etcd-tester/status.go @@ -0,0 +1,57 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "sync" + "time" +) + +type Status struct { + Since time.Time + Failures []string + RoundLimit int + + Cluster ClusterStatus + cluster *cluster + + mu sync.Mutex // guards Round and Case + Round int + Case int +} + +func (s *Status) setRound(r int) { + s.mu.Lock() + defer s.mu.Unlock() + s.Round = r +} + +func (s *Status) getRound() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.Round +} + +func (s *Status) setCase(c int) { + s.mu.Lock() + defer s.mu.Unlock() + s.Case = c +} + +func (s *Status) getCase() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.Case +} diff --git a/tools/functional-tester/etcd-tester/tester.go b/tools/functional-tester/etcd-tester/tester.go index 448bc3aff..94ef35f08 100644 --- a/tools/functional-tester/etcd-tester/tester.go +++ b/tools/functional-tester/etcd-tester/tester.go @@ -15,7 +15,7 @@ package main import ( - "sync" + "fmt" "time" ) @@ -25,7 +25,8 @@ type tester struct { limit int consistencyCheck bool - status Status + status Status + currentRevision int64 } func (tt *tester) runLoop() { @@ -35,201 +36,227 @@ func (tt *tester) runLoop() { for _, f := range tt.failures { tt.status.Failures = append(tt.status.Failures, f.Desc()) } - for i := 0; i < tt.limit; i++ { - tt.status.setRound(i) + round := 0 + for { + tt.status.setRound(round) + tt.status.setCase(-1) // -1 so that logPrefix doesn't print out 'case' roundTotalCounter.Inc() - var ( - currentRevision int64 - failed bool - ) + var failed bool for j, f := range tt.failures { caseTotalCounter.WithLabelValues(f.Desc()).Inc() tt.status.setCase(j) if err := tt.cluster.WaitHealth(); err != nil { - plog.Printf("[round#%d case#%d] wait full health error: %v", i, j, err) - if err := tt.cleanup(i, j); err != nil { - plog.Printf("[round#%d case#%d] cleanup error: %v", i, j, err) + plog.Printf("%s wait full health error: %v", tt.logPrefix(), err) + if err := tt.cleanup(); err != nil { return } failed = true break } - plog.Printf("[round#%d case#%d] start failure %s", i, j, f.Desc()) - plog.Printf("[round#%d case#%d] start injecting failure...", i, j) - if err := f.Inject(tt.cluster, i); err != nil { - plog.Printf("[round#%d case#%d] injection error: %v", i, j, err) - if err := tt.cleanup(i, j); err != nil { - plog.Printf("[round#%d case#%d] cleanup error: %v", i, j, err) + plog.Printf("%s starting failure %s", tt.logPrefix(), f.Desc()) + + plog.Printf("%s injecting failure...", tt.logPrefix()) + if err := f.Inject(tt.cluster, round); err != nil { + plog.Printf("%s injection error: %v", tt.logPrefix(), err) + if err := tt.cleanup(); err != nil { return } failed = true break } - plog.Printf("[round#%d case#%d] injected failure", i, j) + plog.Printf("%s injected failure", tt.logPrefix()) - plog.Printf("[round#%d case#%d] start recovering failure...", i, j) - if err := f.Recover(tt.cluster, i); err != nil { - plog.Printf("[round#%d case#%d] recovery error: %v", i, j, err) - if err := tt.cleanup(i, j); err != nil { - plog.Printf("[round#%d case#%d] cleanup error: %v", i, j, err) + plog.Printf("%s recovering failure...", tt.logPrefix()) + if err := f.Recover(tt.cluster, round); err != nil { + plog.Printf("%s recovery error: %v", tt.logPrefix(), err) + if err := tt.cleanup(); err != nil { return } failed = true break } - plog.Printf("[round#%d case#%d] recovered failure", i, j) + plog.Printf("%s recovered failure", tt.logPrefix()) - if !tt.consistencyCheck { - continue - } if tt.cluster.v2Only { - plog.Printf("[round#%d case#%d] succeed!", i, j) + plog.Printf("%s succeed!", tt.logPrefix()) continue } - plog.Printf("[round#%d case#%d] canceling the stressers...", i, j) - for _, s := range tt.cluster.Stressers { - s.Cancel() + var err error + failed, err = tt.updateCurrentRevisionHash(tt.consistencyCheck) + if err != nil { + plog.Warningf("%s functional-tester returning with error (%v)", tt.logPrefix(), err) + return } - plog.Printf("[round#%d case#%d] canceled stressers", i, j) - - plog.Printf("[round#%d case#%d] checking current revisions...", i, j) - var ( - revs map[string]int64 - hashes map[string]int64 - rerr error - ok bool - ) - for k := 0; k < 5; k++ { - time.Sleep(time.Second) - - revs, hashes, rerr = tt.cluster.getRevisionHash() - if rerr != nil { - plog.Printf("[round#%d case#%d.%d] failed to get current revisions (%v)", i, j, k, rerr) - continue - } - if currentRevision, ok = getSameValue(revs); ok { - break - } - - plog.Printf("[round#%d case#%d.%d] inconsistent current revisions %+v", i, j, k, revs) - } - if !ok || rerr != nil { - plog.Printf("[round#%d case#%d] checking current revisions failed [revisions: %v]", i, j, revs) - if err := tt.cleanup(i, j); err != nil { - plog.Printf("[round#%d case#%d] cleanup error: %v", i, j, err) - return - } - failed = true + if failed { break } - plog.Printf("[round#%d case#%d] all members are consistent with current revisions [revisions: %v]", i, j, revs) - - plog.Printf("[round#%d case#%d] checking current storage hashes...", i, j) - if _, ok = getSameValue(hashes); !ok { - plog.Printf("[round#%d case#%d] checking current storage hashes failed [hashes: %v]", i, j, hashes) - if err := tt.cleanup(i, j); err != nil { - plog.Printf("[round#%d case#%d] cleanup error: %v", i, j, err) - return - } - failed = true - break - } - plog.Printf("[round#%d case#%d] all members are consistent with storage hashes", i, j) - - plog.Printf("[round#%d case#%d] restarting the stressers...", i, j) - for _, s := range tt.cluster.Stressers { - go s.Stress() - } - - plog.Printf("[round#%d case#%d] succeed!", i, j) + plog.Printf("%s succeed!", tt.logPrefix()) } + // -1 so that logPrefix doesn't print out 'case' + tt.status.setCase(-1) + if failed { continue } - revToCompact := max(0, currentRevision-10000) - plog.Printf("[round#%d] compacting storage at %d (current revision %d)", i, revToCompact, currentRevision) - if err := tt.cluster.compactKV(revToCompact); err != nil { - plog.Printf("[round#%d] compactKV error (%v)", i, err) - if err := tt.cleanup(i, 0); err != nil { - plog.Printf("[round#%d] cleanup error: %v", i, err) - return - } - continue - } - plog.Printf("[round#%d] compacted storage", i) - plog.Printf("[round#%d] check compaction at %d", i, revToCompact) - if err := tt.cluster.checkCompact(revToCompact); err != nil { - plog.Printf("[round#%d] checkCompact error (%v)", i, err) - if err := tt.cleanup(i, 0); err != nil { - plog.Printf("[round#%d] cleanup error: %v", i, err) + revToCompact := max(0, tt.currentRevision-10000) + if err := tt.compact(revToCompact); err != nil { + plog.Warningf("%s functional-tester returning with error (%v)", tt.logPrefix(), err) + return + } + if round > 0 && round%500 == 0 { // every 500 rounds + if err := tt.defrag(); err != nil { + plog.Warningf("%s functional-tester returning with error (%v)", tt.logPrefix(), err) return } } - plog.Printf("[round#%d] confirmed compaction at %d", i, revToCompact) - if i > 0 && i%500 == 0 { // every 500 rounds - plog.Printf("[round#%d] canceling the stressers...", i) - for _, s := range tt.cluster.Stressers { - s.Cancel() - } - plog.Printf("[round#%d] canceled stressers", i) - - plog.Printf("[round#%d] deframenting...", i) - if err := tt.cluster.defrag(); err != nil { - plog.Printf("[round#%d] defrag error (%v)", i, err) - if err := tt.cleanup(i, 0); err != nil { - plog.Printf("[round#%d] cleanup error: %v", i, err) - return - } - } - plog.Printf("[round#%d] deframented...", i) - - plog.Printf("[round#%d] restarting the stressers...", i) - for _, s := range tt.cluster.Stressers { - go s.Stress() - } + round++ + if round == tt.limit { + plog.Printf("%s functional-tester is finished", tt.logPrefix()) + break } } } -func (tt *tester) cleanup(i, j int) error { - roundFailedTotalCounter.Inc() - caseFailedTotalCounter.WithLabelValues(tt.failures[j].Desc()).Inc() +func (tt *tester) logPrefix() string { + var ( + rd = tt.status.getRound() + cs = tt.status.getCase() + prefix = fmt.Sprintf("[round#%d case#%d]", rd, cs) + ) + if cs == -1 { + prefix = fmt.Sprintf("[round#%d]", rd) + } + return prefix +} - plog.Printf("[round#%d case#%d] cleaning up...", i, j) +func (tt *tester) cleanup() error { + roundFailedTotalCounter.Inc() + caseFailedTotalCounter.WithLabelValues(tt.failures[tt.status.Case].Desc()).Inc() + + plog.Printf("%s cleaning up...", tt.logPrefix()) if err := tt.cluster.Cleanup(); err != nil { + plog.Printf("%s cleanup error: %v", tt.logPrefix(), err) return err } return tt.cluster.Bootstrap() } -type Status struct { - Since time.Time - Failures []string - RoundLimit int - - Cluster ClusterStatus - cluster *cluster - - mu sync.Mutex // guards Round and Case - Round int - Case int +func (tt *tester) cancelStressers() { + plog.Printf("%s canceling the stressers...", tt.logPrefix()) + for _, s := range tt.cluster.Stressers { + s.Cancel() + } + plog.Printf("%s canceled stressers", tt.logPrefix()) } -func (s *Status) setRound(r int) { - s.mu.Lock() - defer s.mu.Unlock() - s.Round = r +func (tt *tester) startStressers() { + plog.Printf("%s starting the stressers...", tt.logPrefix()) + for _, s := range tt.cluster.Stressers { + go s.Stress() + } + plog.Printf("%s started stressers", tt.logPrefix()) } -func (s *Status) setCase(c int) { - s.mu.Lock() - defer s.mu.Unlock() - s.Case = c +func (tt *tester) compact(rev int64) error { + plog.Printf("%s compacting storage at %d (current revision %d)", tt.logPrefix(), rev, tt.currentRevision) + if err := tt.cluster.compactKV(rev); err != nil { + plog.Printf("%s compactKV error (%v)", tt.logPrefix(), err) + if cerr := tt.cleanup(); cerr != nil { + return fmt.Errorf("%s, %s", err, cerr) + } + return err + } + plog.Printf("%s compacted storage at %d", tt.logPrefix(), rev) + + plog.Printf("%s checking compaction at %d", tt.logPrefix(), rev) + if err := tt.cluster.checkCompact(rev); err != nil { + plog.Printf("%s checkCompact error (%v)", tt.logPrefix(), err) + if cerr := tt.cleanup(); cerr != nil { + return fmt.Errorf("%s, %s", err, cerr) + } + return err + } + + plog.Printf("%s confirmed compaction at %d", tt.logPrefix(), rev) + return nil +} + +func (tt *tester) defrag() error { + tt.cancelStressers() + defer tt.startStressers() + + plog.Printf("%s defragmenting...", tt.logPrefix()) + if err := tt.cluster.defrag(); err != nil { + plog.Printf("%s defrag error (%v)", tt.logPrefix(), err) + if cerr := tt.cleanup(); cerr != nil { + return fmt.Errorf("%s, %s", err, cerr) + } + return err + } + + plog.Printf("%s defragmented...", tt.logPrefix()) + return nil +} + +func (tt *tester) updateCurrentRevisionHash(check bool) (failed bool, err error) { + if check { + tt.cancelStressers() + defer tt.startStressers() + } + + plog.Printf("%s updating current revisions...", tt.logPrefix()) + var ( + revs map[string]int64 + hashes map[string]int64 + rerr error + ok bool + ) + for i := 0; i < 7; i++ { + revs, hashes, rerr = tt.cluster.getRevisionHash() + if rerr != nil { + plog.Printf("%s #%d failed to get current revisions (%v)", tt.logPrefix(), i, rerr) + continue + } + if tt.currentRevision, ok = getSameValue(revs); ok { + break + } + + plog.Printf("%s #%d inconsistent current revisions %+v", tt.logPrefix(), i, revs) + if !check { + break // if consistency check is false, just try once + } + + time.Sleep(time.Second) + } + plog.Printf("%s updated current revisions with %d", tt.logPrefix(), tt.currentRevision) + + if !check { + failed = false + return + } + + if !ok || rerr != nil { + plog.Printf("%s checking current revisions failed [revisions: %v]", tt.logPrefix(), revs) + failed = true + err = tt.cleanup() + return + } + plog.Printf("%s all members are consistent with current revisions [revisions: %v]", tt.logPrefix(), revs) + + plog.Printf("%s checking current storage hashes...", tt.logPrefix()) + if _, ok = getSameValue(hashes); !ok { + plog.Printf("%s checking current storage hashes failed [hashes: %v]", tt.logPrefix(), hashes) + failed = true + err = tt.cleanup() + return + } + plog.Printf("%s all members are consistent with storage hashes", tt.logPrefix()) + return } diff --git a/tools/functional-tester/etcd-tester/util.go b/tools/functional-tester/etcd-tester/util.go index df7aec65d..6c8c2aba5 100644 --- a/tools/functional-tester/etcd-tester/util.go +++ b/tools/functional-tester/etcd-tester/util.go @@ -21,7 +21,7 @@ func getSameValue(vals map[string]int64) (int64, bool) { rv = v } if rv != v { - return -1, false + return rv, false } } return rv, true