etcd-tester: refactor

This commit is contained in:
Gyu-Ho Lee 2016-05-05 10:42:03 -07:00
parent 3bcd2b5b9f
commit cc7dd9b729
4 changed files with 228 additions and 144 deletions

View File

@ -31,7 +31,7 @@ func main() {
datadir := flag.String("data-dir", "agent.etcd", "etcd data directory location on agent machine.")
stressKeySize := flag.Int("stress-key-size", 100, "the size of each key written into etcd.")
stressKeySuffixRange := flag.Int("stress-key-count", 250000, "the count of key range written into etcd.")
limit := flag.Int("limit", 3, "the limit of rounds to run failure set.")
limit := flag.Int("limit", -1, "the limit of rounds to run failure set (-1 to run without limits).")
schedCases := flag.String("schedule-cases", "", "test case schedule")
consistencyCheck := flag.Bool("consistency-check", true, "true to check consistency (revision, hash)")
isV2Only := flag.Bool("v2-only", false, "'true' to run V2 only tester.")

View File

@ -0,0 +1,57 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"sync"
"time"
)
type Status struct {
Since time.Time
Failures []string
RoundLimit int
Cluster ClusterStatus
cluster *cluster
mu sync.Mutex // guards Round and Case
Round int
Case int
}
func (s *Status) setRound(r int) {
s.mu.Lock()
defer s.mu.Unlock()
s.Round = r
}
func (s *Status) getRound() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.Round
}
func (s *Status) setCase(c int) {
s.mu.Lock()
defer s.mu.Unlock()
s.Case = c
}
func (s *Status) getCase() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.Case
}

View File

@ -15,7 +15,7 @@
package main
import (
"sync"
"fmt"
"time"
)
@ -25,7 +25,8 @@ type tester struct {
limit int
consistencyCheck bool
status Status
status Status
currentRevision int64
}
func (tt *tester) runLoop() {
@ -35,201 +36,227 @@ func (tt *tester) runLoop() {
for _, f := range tt.failures {
tt.status.Failures = append(tt.status.Failures, f.Desc())
}
for i := 0; i < tt.limit; i++ {
tt.status.setRound(i)
round := 0
for {
tt.status.setRound(round)
tt.status.setCase(-1) // -1 so that logPrefix doesn't print out 'case'
roundTotalCounter.Inc()
var (
currentRevision int64
failed bool
)
var failed bool
for j, f := range tt.failures {
caseTotalCounter.WithLabelValues(f.Desc()).Inc()
tt.status.setCase(j)
if err := tt.cluster.WaitHealth(); err != nil {
plog.Printf("[round#%d case#%d] wait full health error: %v", i, j, err)
if err := tt.cleanup(i, j); err != nil {
plog.Printf("[round#%d case#%d] cleanup error: %v", i, j, err)
plog.Printf("%s wait full health error: %v", tt.logPrefix(), err)
if err := tt.cleanup(); err != nil {
return
}
failed = true
break
}
plog.Printf("[round#%d case#%d] start failure %s", i, j, f.Desc())
plog.Printf("[round#%d case#%d] start injecting failure...", i, j)
if err := f.Inject(tt.cluster, i); err != nil {
plog.Printf("[round#%d case#%d] injection error: %v", i, j, err)
if err := tt.cleanup(i, j); err != nil {
plog.Printf("[round#%d case#%d] cleanup error: %v", i, j, err)
plog.Printf("%s starting failure %s", tt.logPrefix(), f.Desc())
plog.Printf("%s injecting failure...", tt.logPrefix())
if err := f.Inject(tt.cluster, round); err != nil {
plog.Printf("%s injection error: %v", tt.logPrefix(), err)
if err := tt.cleanup(); err != nil {
return
}
failed = true
break
}
plog.Printf("[round#%d case#%d] injected failure", i, j)
plog.Printf("%s injected failure", tt.logPrefix())
plog.Printf("[round#%d case#%d] start recovering failure...", i, j)
if err := f.Recover(tt.cluster, i); err != nil {
plog.Printf("[round#%d case#%d] recovery error: %v", i, j, err)
if err := tt.cleanup(i, j); err != nil {
plog.Printf("[round#%d case#%d] cleanup error: %v", i, j, err)
plog.Printf("%s recovering failure...", tt.logPrefix())
if err := f.Recover(tt.cluster, round); err != nil {
plog.Printf("%s recovery error: %v", tt.logPrefix(), err)
if err := tt.cleanup(); err != nil {
return
}
failed = true
break
}
plog.Printf("[round#%d case#%d] recovered failure", i, j)
plog.Printf("%s recovered failure", tt.logPrefix())
if !tt.consistencyCheck {
continue
}
if tt.cluster.v2Only {
plog.Printf("[round#%d case#%d] succeed!", i, j)
plog.Printf("%s succeed!", tt.logPrefix())
continue
}
plog.Printf("[round#%d case#%d] canceling the stressers...", i, j)
for _, s := range tt.cluster.Stressers {
s.Cancel()
var err error
failed, err = tt.updateCurrentRevisionHash(tt.consistencyCheck)
if err != nil {
plog.Warningf("%s functional-tester returning with error (%v)", tt.logPrefix(), err)
return
}
plog.Printf("[round#%d case#%d] canceled stressers", i, j)
plog.Printf("[round#%d case#%d] checking current revisions...", i, j)
var (
revs map[string]int64
hashes map[string]int64
rerr error
ok bool
)
for k := 0; k < 5; k++ {
time.Sleep(time.Second)
revs, hashes, rerr = tt.cluster.getRevisionHash()
if rerr != nil {
plog.Printf("[round#%d case#%d.%d] failed to get current revisions (%v)", i, j, k, rerr)
continue
}
if currentRevision, ok = getSameValue(revs); ok {
break
}
plog.Printf("[round#%d case#%d.%d] inconsistent current revisions %+v", i, j, k, revs)
}
if !ok || rerr != nil {
plog.Printf("[round#%d case#%d] checking current revisions failed [revisions: %v]", i, j, revs)
if err := tt.cleanup(i, j); err != nil {
plog.Printf("[round#%d case#%d] cleanup error: %v", i, j, err)
return
}
failed = true
if failed {
break
}
plog.Printf("[round#%d case#%d] all members are consistent with current revisions [revisions: %v]", i, j, revs)
plog.Printf("[round#%d case#%d] checking current storage hashes...", i, j)
if _, ok = getSameValue(hashes); !ok {
plog.Printf("[round#%d case#%d] checking current storage hashes failed [hashes: %v]", i, j, hashes)
if err := tt.cleanup(i, j); err != nil {
plog.Printf("[round#%d case#%d] cleanup error: %v", i, j, err)
return
}
failed = true
break
}
plog.Printf("[round#%d case#%d] all members are consistent with storage hashes", i, j)
plog.Printf("[round#%d case#%d] restarting the stressers...", i, j)
for _, s := range tt.cluster.Stressers {
go s.Stress()
}
plog.Printf("[round#%d case#%d] succeed!", i, j)
plog.Printf("%s succeed!", tt.logPrefix())
}
// -1 so that logPrefix doesn't print out 'case'
tt.status.setCase(-1)
if failed {
continue
}
revToCompact := max(0, currentRevision-10000)
plog.Printf("[round#%d] compacting storage at %d (current revision %d)", i, revToCompact, currentRevision)
if err := tt.cluster.compactKV(revToCompact); err != nil {
plog.Printf("[round#%d] compactKV error (%v)", i, err)
if err := tt.cleanup(i, 0); err != nil {
plog.Printf("[round#%d] cleanup error: %v", i, err)
return
}
continue
}
plog.Printf("[round#%d] compacted storage", i)
plog.Printf("[round#%d] check compaction at %d", i, revToCompact)
if err := tt.cluster.checkCompact(revToCompact); err != nil {
plog.Printf("[round#%d] checkCompact error (%v)", i, err)
if err := tt.cleanup(i, 0); err != nil {
plog.Printf("[round#%d] cleanup error: %v", i, err)
revToCompact := max(0, tt.currentRevision-10000)
if err := tt.compact(revToCompact); err != nil {
plog.Warningf("%s functional-tester returning with error (%v)", tt.logPrefix(), err)
return
}
if round > 0 && round%500 == 0 { // every 500 rounds
if err := tt.defrag(); err != nil {
plog.Warningf("%s functional-tester returning with error (%v)", tt.logPrefix(), err)
return
}
}
plog.Printf("[round#%d] confirmed compaction at %d", i, revToCompact)
if i > 0 && i%500 == 0 { // every 500 rounds
plog.Printf("[round#%d] canceling the stressers...", i)
for _, s := range tt.cluster.Stressers {
s.Cancel()
}
plog.Printf("[round#%d] canceled stressers", i)
plog.Printf("[round#%d] deframenting...", i)
if err := tt.cluster.defrag(); err != nil {
plog.Printf("[round#%d] defrag error (%v)", i, err)
if err := tt.cleanup(i, 0); err != nil {
plog.Printf("[round#%d] cleanup error: %v", i, err)
return
}
}
plog.Printf("[round#%d] deframented...", i)
plog.Printf("[round#%d] restarting the stressers...", i)
for _, s := range tt.cluster.Stressers {
go s.Stress()
}
round++
if round == tt.limit {
plog.Printf("%s functional-tester is finished", tt.logPrefix())
break
}
}
}
func (tt *tester) cleanup(i, j int) error {
roundFailedTotalCounter.Inc()
caseFailedTotalCounter.WithLabelValues(tt.failures[j].Desc()).Inc()
func (tt *tester) logPrefix() string {
var (
rd = tt.status.getRound()
cs = tt.status.getCase()
prefix = fmt.Sprintf("[round#%d case#%d]", rd, cs)
)
if cs == -1 {
prefix = fmt.Sprintf("[round#%d]", rd)
}
return prefix
}
plog.Printf("[round#%d case#%d] cleaning up...", i, j)
func (tt *tester) cleanup() error {
roundFailedTotalCounter.Inc()
caseFailedTotalCounter.WithLabelValues(tt.failures[tt.status.Case].Desc()).Inc()
plog.Printf("%s cleaning up...", tt.logPrefix())
if err := tt.cluster.Cleanup(); err != nil {
plog.Printf("%s cleanup error: %v", tt.logPrefix(), err)
return err
}
return tt.cluster.Bootstrap()
}
type Status struct {
Since time.Time
Failures []string
RoundLimit int
Cluster ClusterStatus
cluster *cluster
mu sync.Mutex // guards Round and Case
Round int
Case int
func (tt *tester) cancelStressers() {
plog.Printf("%s canceling the stressers...", tt.logPrefix())
for _, s := range tt.cluster.Stressers {
s.Cancel()
}
plog.Printf("%s canceled stressers", tt.logPrefix())
}
func (s *Status) setRound(r int) {
s.mu.Lock()
defer s.mu.Unlock()
s.Round = r
func (tt *tester) startStressers() {
plog.Printf("%s starting the stressers...", tt.logPrefix())
for _, s := range tt.cluster.Stressers {
go s.Stress()
}
plog.Printf("%s started stressers", tt.logPrefix())
}
func (s *Status) setCase(c int) {
s.mu.Lock()
defer s.mu.Unlock()
s.Case = c
func (tt *tester) compact(rev int64) error {
plog.Printf("%s compacting storage at %d (current revision %d)", tt.logPrefix(), rev, tt.currentRevision)
if err := tt.cluster.compactKV(rev); err != nil {
plog.Printf("%s compactKV error (%v)", tt.logPrefix(), err)
if cerr := tt.cleanup(); cerr != nil {
return fmt.Errorf("%s, %s", err, cerr)
}
return err
}
plog.Printf("%s compacted storage at %d", tt.logPrefix(), rev)
plog.Printf("%s checking compaction at %d", tt.logPrefix(), rev)
if err := tt.cluster.checkCompact(rev); err != nil {
plog.Printf("%s checkCompact error (%v)", tt.logPrefix(), err)
if cerr := tt.cleanup(); cerr != nil {
return fmt.Errorf("%s, %s", err, cerr)
}
return err
}
plog.Printf("%s confirmed compaction at %d", tt.logPrefix(), rev)
return nil
}
func (tt *tester) defrag() error {
tt.cancelStressers()
defer tt.startStressers()
plog.Printf("%s defragmenting...", tt.logPrefix())
if err := tt.cluster.defrag(); err != nil {
plog.Printf("%s defrag error (%v)", tt.logPrefix(), err)
if cerr := tt.cleanup(); cerr != nil {
return fmt.Errorf("%s, %s", err, cerr)
}
return err
}
plog.Printf("%s defragmented...", tt.logPrefix())
return nil
}
func (tt *tester) updateCurrentRevisionHash(check bool) (failed bool, err error) {
if check {
tt.cancelStressers()
defer tt.startStressers()
}
plog.Printf("%s updating current revisions...", tt.logPrefix())
var (
revs map[string]int64
hashes map[string]int64
rerr error
ok bool
)
for i := 0; i < 7; i++ {
revs, hashes, rerr = tt.cluster.getRevisionHash()
if rerr != nil {
plog.Printf("%s #%d failed to get current revisions (%v)", tt.logPrefix(), i, rerr)
continue
}
if tt.currentRevision, ok = getSameValue(revs); ok {
break
}
plog.Printf("%s #%d inconsistent current revisions %+v", tt.logPrefix(), i, revs)
if !check {
break // if consistency check is false, just try once
}
time.Sleep(time.Second)
}
plog.Printf("%s updated current revisions with %d", tt.logPrefix(), tt.currentRevision)
if !check {
failed = false
return
}
if !ok || rerr != nil {
plog.Printf("%s checking current revisions failed [revisions: %v]", tt.logPrefix(), revs)
failed = true
err = tt.cleanup()
return
}
plog.Printf("%s all members are consistent with current revisions [revisions: %v]", tt.logPrefix(), revs)
plog.Printf("%s checking current storage hashes...", tt.logPrefix())
if _, ok = getSameValue(hashes); !ok {
plog.Printf("%s checking current storage hashes failed [hashes: %v]", tt.logPrefix(), hashes)
failed = true
err = tt.cleanup()
return
}
plog.Printf("%s all members are consistent with storage hashes", tt.logPrefix())
return
}

View File

@ -21,7 +21,7 @@ func getSameValue(vals map[string]int64) (int64, bool) {
rv = v
}
if rv != v {
return -1, false
return rv, false
}
}
return rv, true