functional/tester: add "Checker", remove compositeChecker

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyuho Lee 2018-04-11 19:10:01 -07:00
parent 2fc3eb0c56
commit e9c4bad2d1
15 changed files with 287 additions and 188 deletions

View File

@ -157,7 +157,6 @@ tester-config:
round-limit: 1
exit-on-failure: true
consistency-check: true
enable-pprof: true
case-delay-ms: 7000
@ -205,7 +204,7 @@ tester-config:
runner-exec-path: ./bin/etcd-runner
external-exec-path: ""
stress-types:
stressers:
- KV
- LEASE
# - ELECTION_RUNNER
@ -213,6 +212,10 @@ tester-config:
# - LOCK_RACER_RUNNER
# - LEASE_RUNNER
checkers:
- KV_HASH
- LEASE_EXPIRE
stress-key-size: 100
stress-key-size-large: 32769
stress-key-suffix-range: 250000

View File

@ -0,0 +1,25 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import "github.com/coreos/etcd/functional/rpcpb"
// Checker checks cluster consistency.
type Checker interface {
// Type returns the checker type.
Type() rpcpb.Checker
// Check returns an error if the system fails a consistency check.
Check() error
}

View File

@ -0,0 +1,91 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import (
"fmt"
"time"
"github.com/coreos/etcd/functional/rpcpb"
"go.uber.org/zap"
)
const retries = 7
type hashRevGetter interface {
getRevisionHash() (revs map[string]int64, hashes map[string]int64, err error)
}
type kvHashChecker struct {
ctype rpcpb.Checker
lg *zap.Logger
hrg hashRevGetter
}
func newKVHashChecker(lg *zap.Logger, hrg hashRevGetter) Checker {
return &kvHashChecker{
ctype: rpcpb.Checker_KV_HASH,
lg: lg,
hrg: hrg,
}
}
func (hc *kvHashChecker) checkRevAndHashes() (err error) {
var (
revs map[string]int64
hashes map[string]int64
)
// retries in case of transient failure or etcd cluster has not stablized yet.
for i := 0; i < retries; i++ {
revs, hashes, err = hc.hrg.getRevisionHash()
if err != nil {
hc.lg.Warn(
"failed to get revision and hash",
zap.Int("retries", i),
zap.Error(err),
)
} else {
sameRev := getSameValue(revs)
sameHashes := getSameValue(hashes)
if sameRev && sameHashes {
return nil
}
hc.lg.Warn(
"retrying; etcd cluster is not stable",
zap.Int("retries", i),
zap.Bool("same-revisions", sameRev),
zap.Bool("same-hashes", sameHashes),
zap.String("revisions", fmt.Sprintf("%+v", revs)),
zap.String("hashes", fmt.Sprintf("%+v", hashes)),
)
}
time.Sleep(time.Second)
}
if err != nil {
return fmt.Errorf("failed revision and hash check (%v)", err)
}
return fmt.Errorf("etcd cluster is not stable: [revisions: %v] and [hashes: %v]", revs, hashes)
}
func (hc *kvHashChecker) Type() rpcpb.Checker {
return hc.ctype
}
func (hc *kvHashChecker) Check() error {
return hc.checkRevAndHashes()
}

View File

@ -27,83 +27,30 @@ import (
"google.golang.org/grpc"
)
const retries = 7
// Checker checks cluster consistency.
type Checker interface {
// Check returns an error if the system fails a consistency check.
Check() error
type leaseExpireChecker struct {
ctype rpcpb.Checker
lg *zap.Logger
m *rpcpb.Member
ls *leaseStresser
cli *clientv3.Client
}
type hashAndRevGetter interface {
getRevisionHash() (revs map[string]int64, hashes map[string]int64, err error)
}
type hashChecker struct {
lg *zap.Logger
hrg hashAndRevGetter
}
func newHashChecker(lg *zap.Logger, hrg hashAndRevGetter) Checker {
return &hashChecker{
lg: lg,
hrg: hrg,
func newLeaseExpireChecker(ls *leaseStresser) Checker {
return &leaseExpireChecker{
ctype: rpcpb.Checker_LEASE_EXPIRE,
lg: ls.lg,
m: ls.m,
ls: ls,
}
}
const leaseCheckerTimeout = 10 * time.Second
const leaseExpireCheckerTimeout = 10 * time.Second
func (hc *hashChecker) checkRevAndHashes() (err error) {
var (
revs map[string]int64
hashes map[string]int64
)
// retries in case of transient failure or etcd cluster has not stablized yet.
for i := 0; i < retries; i++ {
revs, hashes, err = hc.hrg.getRevisionHash()
if err != nil {
hc.lg.Warn(
"failed to get revision and hash",
zap.Int("retries", i),
zap.Error(err),
)
} else {
sameRev := getSameValue(revs)
sameHashes := getSameValue(hashes)
if sameRev && sameHashes {
return nil
}
hc.lg.Warn(
"retrying; etcd cluster is not stable",
zap.Int("retries", i),
zap.Bool("same-revisions", sameRev),
zap.Bool("same-hashes", sameHashes),
zap.String("revisions", fmt.Sprintf("%+v", revs)),
zap.String("hashes", fmt.Sprintf("%+v", hashes)),
)
}
time.Sleep(time.Second)
}
if err != nil {
return fmt.Errorf("failed revision and hash check (%v)", err)
}
return fmt.Errorf("etcd cluster is not stable: [revisions: %v] and [hashes: %v]", revs, hashes)
func (lc *leaseExpireChecker) Type() rpcpb.Checker {
return lc.ctype
}
func (hc *hashChecker) Check() error {
return hc.checkRevAndHashes()
}
type leaseChecker struct {
lg *zap.Logger
m *rpcpb.Member
ls *leaseStresser
cli *clientv3.Client
}
func (lc *leaseChecker) Check() error {
func (lc *leaseExpireChecker) Check() error {
if lc.ls == nil {
return nil
}
@ -135,8 +82,8 @@ func (lc *leaseChecker) Check() error {
}
// checkShortLivedLeases ensures leases expire.
func (lc *leaseChecker) checkShortLivedLeases() error {
ctx, cancel := context.WithTimeout(context.Background(), leaseCheckerTimeout)
func (lc *leaseExpireChecker) checkShortLivedLeases() error {
ctx, cancel := context.WithTimeout(context.Background(), leaseExpireCheckerTimeout)
errc := make(chan error)
defer cancel()
for leaseID := range lc.ls.shortLivedLeases.leases {
@ -154,7 +101,7 @@ func (lc *leaseChecker) checkShortLivedLeases() error {
return errsToError(errs)
}
func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) (err error) {
func (lc *leaseExpireChecker) checkShortLivedLease(ctx context.Context, leaseID int64) (err error) {
// retry in case of transient failure or lease is expired but not yet revoked due to the fact that etcd cluster didn't have enought time to delete it.
var resp *clientv3.LeaseTimeToLiveResponse
for i := 0; i < retries; i++ {
@ -199,7 +146,7 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64)
return err
}
func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID int64) error {
func (lc *leaseExpireChecker) checkLease(ctx context.Context, expired bool, leaseID int64) error {
keysExpired, err := lc.hasKeysAttachedToLeaseExpired(ctx, leaseID)
if err != nil {
lc.lg.Warn(
@ -227,8 +174,8 @@ func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID in
return nil
}
func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error {
ctx, cancel := context.WithTimeout(context.Background(), leaseCheckerTimeout)
func (lc *leaseExpireChecker) check(expired bool, leases map[int64]time.Time) error {
ctx, cancel := context.WithTimeout(context.Background(), leaseExpireCheckerTimeout)
defer cancel()
for leaseID := range leases {
if err := lc.checkLease(ctx, expired, leaseID); err != nil {
@ -239,7 +186,7 @@ func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error {
}
// TODO: handle failures from "grpc.FailFast(false)"
func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*clientv3.LeaseTimeToLiveResponse, error) {
func (lc *leaseExpireChecker) getLeaseByID(ctx context.Context, leaseID int64) (*clientv3.LeaseTimeToLiveResponse, error) {
return lc.cli.TimeToLive(
ctx,
clientv3.LeaseID(leaseID),
@ -247,7 +194,7 @@ func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*clien
)
}
func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
func (lc *leaseExpireChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
// keep retrying until lease's state is known or ctx is being canceled
for ctx.Err() == nil {
resp, err := lc.getLeaseByID(ctx, leaseID)
@ -272,7 +219,7 @@ func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (boo
// The keys attached to the lease has the format of "<leaseID>_<idx>" where idx is the ordering key creation
// Since the format of keys contains about leaseID, finding keys base on "<leaseID>" prefix
// determines whether the attached keys for a given leaseID has been deleted or not
func (lc *leaseChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
func (lc *leaseExpireChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
resp, err := lc.cli.Get(ctx, fmt.Sprintf("%d", leaseID), clientv3.WithPrefix())
if err != nil {
lc.lg.Warn(
@ -285,42 +232,3 @@ func (lc *leaseChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, lease
}
return len(resp.Kvs) == 0, nil
}
// compositeChecker implements a checker that runs a slice of Checkers concurrently.
type compositeChecker struct{ checkers []Checker }
func newCompositeChecker(checkers []Checker) Checker {
return &compositeChecker{checkers}
}
func (cchecker *compositeChecker) Check() error {
errc := make(chan error)
for _, c := range cchecker.checkers {
go func(chk Checker) { errc <- chk.Check() }(c)
}
var errs []error
for range cchecker.checkers {
if err := <-errc; err != nil {
errs = append(errs, err)
}
}
return errsToError(errs)
}
type runnerChecker struct {
errc chan error
}
func (rc *runnerChecker) Check() error {
select {
case err := <-rc.errc:
return err
default:
return nil
}
}
type noChecker struct{}
func newNoChecker() Checker { return &noChecker{} }
func (nc *noChecker) Check() error { return nil }

View File

@ -0,0 +1,23 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import "github.com/coreos/etcd/functional/rpcpb"
type noCheck struct{}
func newNoChecker() Checker { return &noCheck{} }
func (nc *noCheck) Type() rpcpb.Checker { return rpcpb.Checker_NO_CHECK }
func (nc *noCheck) Check() error { return nil }

View File

@ -0,0 +1,42 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import "github.com/coreos/etcd/functional/rpcpb"
type runnerChecker struct {
ctype rpcpb.Checker
errc chan error
}
func newRunnerChecker(errc chan error) Checker {
return &runnerChecker{
ctype: rpcpb.Checker_RUNNER,
errc: errc,
}
}
func (rc *runnerChecker) Type() rpcpb.Checker {
return rc.ctype
}
func (rc *runnerChecker) Check() error {
select {
case err := <-rc.errc:
return err
default:
return nil
}
}

View File

@ -56,7 +56,7 @@ type Cluster struct {
rateLimiter *rate.Limiter
stresser Stresser
checker Checker
checkers []Checker
currentRevision int64
rd int
@ -118,7 +118,7 @@ func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
int(clus.Tester.StressQPS),
)
clus.updateStresserChecker()
clus.setStresserChecker()
return clus, nil
}
@ -274,26 +274,49 @@ func (clus *Cluster) UpdateDelayLatencyMs() {
}
}
func (clus *Cluster) updateStresserChecker() {
cs := &compositeStresser{}
func (clus *Cluster) setStresserChecker() {
css := &compositeStresser{}
lss := []*leaseStresser{}
rss := []*runnerStresser{}
for _, m := range clus.Members {
cs.stressers = append(cs.stressers, newStresser(clus, m))
}
clus.stresser = cs
if clus.Tester.ConsistencyCheck {
clus.checker = newHashChecker(clus.lg, hashAndRevGetter(clus))
if schk := cs.Checker(); schk != nil {
clus.checker = newCompositeChecker([]Checker{clus.checker, schk})
sss := newStresser(clus, m)
css.stressers = append(css.stressers, &compositeStresser{sss})
for _, s := range sss {
if v, ok := s.(*leaseStresser); ok {
lss = append(lss, v)
clus.lg.Info("added lease stresser", zap.String("endpoint", m.EtcdClientEndpoint))
}
if v, ok := s.(*runnerStresser); ok {
rss = append(rss, v)
clus.lg.Info("added lease stresser", zap.String("endpoint", m.EtcdClientEndpoint))
}
}
} else {
clus.checker = newNoChecker()
}
clus.stresser = css
for _, cs := range clus.Tester.Checkers {
switch cs {
case "KV_HASH":
clus.checkers = append(clus.checkers, newKVHashChecker(clus.lg, hashRevGetter(clus)))
case "LEASE_EXPIRE":
for _, ls := range lss {
clus.checkers = append(clus.checkers, newLeaseExpireChecker(ls))
}
case "RUNNER":
for _, rs := range rss {
clus.checkers = append(clus.checkers, newRunnerChecker(rs.errc))
}
case "NO_CHECK":
clus.checkers = append(clus.checkers, newNoChecker())
}
}
clus.lg.Info("updated stressers")
}
func (clus *Cluster) checkConsistency() (err error) {
func (clus *Cluster) runCheckers() (err error) {
defer func() {
if err != nil {
return
@ -307,15 +330,19 @@ func (clus *Cluster) checkConsistency() (err error) {
}
}()
if err = clus.checker.Check(); err != nil {
clus.lg.Warn(
"consistency check FAIL",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.Error(err),
)
return err
for _, chk := range clus.checkers {
if err = chk.Check(); err != nil {
clus.lg.Warn(
"consistency check FAIL",
zap.String("checker", chk.Type().String()),
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.Error(err),
)
return err
}
}
clus.lg.Info(
"consistency check ALL PASS",
zap.Int("round", clus.rd),

View File

@ -336,9 +336,14 @@ func read(lg *zap.Logger, fpath string) (*Cluster, error) {
}
}
for _, v := range clus.Tester.StressTypes {
if _, ok := rpcpb.StressType_value[v]; !ok {
return nil, fmt.Errorf("StressType is unknown; got %q", v)
for _, v := range clus.Tester.Stressers {
if _, ok := rpcpb.Stresser_value[v]; !ok {
return nil, fmt.Errorf("Stresser is unknown; got %q", v)
}
}
for _, v := range clus.Tester.Checkers {
if _, ok := rpcpb.Checker_value[v]; !ok {
return nil, fmt.Errorf("Checker is unknown; got %q", v)
}
}

View File

@ -237,7 +237,7 @@ func (clus *Cluster) doRound() error {
zap.Int("case-total", len(clus.cases)),
zap.String("desc", fa.Desc()),
)
if err := clus.checkConsistency(); err != nil {
if err := clus.runCheckers(); err != nil {
return fmt.Errorf("consistency check error (%v)", err)
}
@ -362,6 +362,6 @@ func (clus *Cluster) cleanup() error {
return err
}
clus.updateStresserChecker()
clus.setStresserChecker()
return nil
}

View File

@ -190,7 +190,6 @@ func Test_read(t *testing.T) {
UpdatedDelayLatencyMs: 5000,
RoundLimit: 1,
ExitOnCaseFail: true,
ConsistencyCheck: true,
EnablePprof: true,
CaseDelayMs: 7000,
CaseShuffle: true,
@ -230,7 +229,8 @@ func Test_read(t *testing.T) {
FailpointCommands: []string{`panic("etcd-tester")`},
RunnerExecPath: "./bin/etcd-runner",
ExternalExecPath: "",
StressTypes: []string{"KV", "LEASE"},
Stressers: []string{"KV", "LEASE"},
Checkers: []string{"KV_HASH", "LEASE_EXPIRE"},
StressKeySize: 100,
StressKeySizeLarge: 32769,
StressKeySuffixRange: 250000,

View File

@ -33,14 +33,12 @@ type Stresser interface {
Close() map[string]int
// ModifiedKeys reports the number of keys created and deleted by stresser
ModifiedKeys() int64
// Checker returns an invariant checker for after the stresser is canceled.
Checker() Checker
}
// newStresser creates stresser from a comma separated list of stresser types.
func newStresser(clus *Cluster, m *rpcpb.Member) Stresser {
stressers := make([]Stresser, len(clus.Tester.StressTypes))
for i, stype := range clus.Tester.StressTypes {
func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
stressers = make([]Stresser, len(clus.Tester.Stressers))
for i, stype := range clus.Tester.Stressers {
clus.lg.Info(
"creating stresser",
zap.String("type", stype),
@ -52,7 +50,7 @@ func newStresser(clus *Cluster, m *rpcpb.Member) Stresser {
// TODO: Too intensive stressing clients can panic etcd member with
// 'out of memory' error. Put rate limits in server side.
stressers[i] = &keyStresser{
stype: rpcpb.StressType_KV,
stype: rpcpb.Stresser_KV,
lg: clus.lg,
m: m,
keySize: int(clus.Tester.StressKeySize),
@ -66,7 +64,7 @@ func newStresser(clus *Cluster, m *rpcpb.Member) Stresser {
case "LEASE":
stressers[i] = &leaseStresser{
stype: rpcpb.StressType_LEASE,
stype: rpcpb.Stresser_LEASE,
lg: clus.lg,
m: m,
numLeases: 10, // TODO: configurable
@ -86,7 +84,7 @@ func newStresser(clus *Cluster, m *rpcpb.Member) Stresser {
"--req-rate", fmt.Sprintf("%v", reqRate),
}
stressers[i] = newRunnerStresser(
rpcpb.StressType_ELECTION_RUNNER,
rpcpb.Stresser_ELECTION_RUNNER,
clus.lg,
clus.Tester.RunnerExecPath,
args,
@ -107,7 +105,7 @@ func newStresser(clus *Cluster, m *rpcpb.Member) Stresser {
"--req-rate", fmt.Sprintf("%v", reqRate),
}
stressers[i] = newRunnerStresser(
rpcpb.StressType_WATCH_RUNNER,
rpcpb.Stresser_WATCH_RUNNER,
clus.lg,
clus.Tester.RunnerExecPath,
args,
@ -126,7 +124,7 @@ func newStresser(clus *Cluster, m *rpcpb.Member) Stresser {
"--req-rate", fmt.Sprintf("%v", reqRate),
}
stressers[i] = newRunnerStresser(
rpcpb.StressType_LOCK_RACER_RUNNER,
rpcpb.Stresser_LOCK_RACER_RUNNER,
clus.lg,
clus.Tester.RunnerExecPath,
args,
@ -141,7 +139,7 @@ func newStresser(clus *Cluster, m *rpcpb.Member) Stresser {
"--endpoints", m.EtcdClientEndpoint,
}
stressers[i] = newRunnerStresser(
rpcpb.StressType_LEASE_RUNNER,
rpcpb.Stresser_LEASE_RUNNER,
clus.lg,
clus.Tester.RunnerExecPath,
args,
@ -150,5 +148,5 @@ func newStresser(clus *Cluster, m *rpcpb.Member) Stresser {
)
}
}
return &compositeStresser{stressers}
return stressers
}

View File

@ -74,16 +74,3 @@ func (cs *compositeStresser) ModifiedKeys() (modifiedKey int64) {
}
return modifiedKey
}
func (cs *compositeStresser) Checker() Checker {
var chks []Checker
for _, s := range cs.stressers {
if chk := s.Checker(); chk != nil {
chks = append(chks, chk)
}
}
if len(chks) == 0 {
return nil
}
return newCompositeChecker(chks)
}

View File

@ -35,7 +35,7 @@ import (
)
type keyStresser struct {
stype rpcpb.StressType
stype rpcpb.Stresser
lg *zap.Logger
m *rpcpb.Member
@ -204,8 +204,6 @@ func (s *keyStresser) ModifiedKeys() int64 {
return atomic.LoadInt64(&s.atomicModifiedKeys)
}
func (s *keyStresser) Checker() Checker { return nil }
type stressFunc func(ctx context.Context) (err error, modifiedKeys int64)
type stressEntry struct {

View File

@ -38,7 +38,7 @@ const (
)
type leaseStresser struct {
stype rpcpb.StressType
stype rpcpb.Stresser
lg *zap.Logger
m *rpcpb.Member
@ -485,7 +485,3 @@ func (ls *leaseStresser) Close() map[string]int {
func (ls *leaseStresser) ModifiedKeys() int64 {
return atomic.LoadInt64(&ls.atomicModifiedKey)
}
func (ls *leaseStresser) Checker() Checker {
return &leaseChecker{lg: ls.lg, m: ls.m, ls: ls}
}

View File

@ -27,7 +27,7 @@ import (
)
type runnerStresser struct {
stype rpcpb.StressType
stype rpcpb.Stresser
lg *zap.Logger
cmd *exec.Cmd
@ -41,7 +41,7 @@ type runnerStresser struct {
}
func newRunnerStresser(
stype rpcpb.StressType,
stype rpcpb.Stresser,
lg *zap.Logger,
cmdStr string,
args []string,
@ -115,7 +115,3 @@ func (rs *runnerStresser) Close() map[string]int {
func (rs *runnerStresser) ModifiedKeys() int64 {
return 1
}
func (rs *runnerStresser) Checker() Checker {
return &runnerChecker{rs.errc}
}