mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #6758 from heyitsanthony/move-checker
etcd-tester: refactor stresser / checker management
This commit is contained in:
commit
4969a0e9e7
@ -17,7 +17,6 @@ package main
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
@ -133,76 +132,31 @@ func (hc *hashChecker) Check() error {
|
||||
return hc.checkRevAndHashes()
|
||||
}
|
||||
|
||||
type leaseChecker struct {
|
||||
leaseStressers []Stresser
|
||||
}
|
||||
|
||||
func newLeaseChecker(leaseStressers []Stresser) Checker { return &leaseChecker{leaseStressers} }
|
||||
type leaseChecker struct{ ls *leaseStresser }
|
||||
|
||||
func (lc *leaseChecker) Check() error {
|
||||
plog.Info("lease stresser invariant check...")
|
||||
errc := make(chan error)
|
||||
for _, ls := range lc.leaseStressers {
|
||||
go func(s Stresser) { errc <- lc.checkInvariant(s) }(ls)
|
||||
}
|
||||
var errs []error
|
||||
for i := 0; i < len(lc.leaseStressers); i++ {
|
||||
if err := <-errc; err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) == 0 {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("lease stresser encounters error: (%v)", fromErrsToString(errs))
|
||||
}
|
||||
|
||||
func fromErrsToString(errs []error) string {
|
||||
stringArr := make([]string, len(errs))
|
||||
for i, err := range errs {
|
||||
stringArr[i] = err.Error()
|
||||
}
|
||||
return strings.Join(stringArr, ",")
|
||||
}
|
||||
|
||||
func (lc *leaseChecker) checkInvariant(lStresser Stresser) error {
|
||||
ls := lStresser.(*leaseStresser)
|
||||
if err := checkLeasesExpired(ls); err != nil {
|
||||
plog.Infof("checking revoked leases %v", lc.ls.revokedLeases.leases)
|
||||
if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := checkLeasesAlive(ls); err != nil {
|
||||
plog.Infof("checking alive leases %v", lc.ls.aliveLeases.leases)
|
||||
if err := lc.check(false, lc.ls.aliveLeases.leases); err != nil {
|
||||
return err
|
||||
}
|
||||
return checkShortLivedLeases(ls)
|
||||
plog.Infof("checking short lived leases %v", lc.ls.shortLivedLeases.leases)
|
||||
return lc.check(true, lc.ls.shortLivedLeases.leases)
|
||||
}
|
||||
|
||||
func checkLeasesExpired(ls *leaseStresser) error {
|
||||
plog.Infof("revoked leases %v", ls.revokedLeases.getLeasesMap())
|
||||
return checkLeases(true, ls, ls.revokedLeases.getLeasesMap())
|
||||
}
|
||||
|
||||
func checkLeasesAlive(ls *leaseStresser) error {
|
||||
plog.Infof("alive leases %v", ls.aliveLeases.getLeasesMap())
|
||||
return checkLeases(false, ls, ls.aliveLeases.getLeasesMap())
|
||||
}
|
||||
|
||||
// checkShortLivedLeases() verifies that the short lived leases are indeed being deleted.
|
||||
func checkShortLivedLeases(ls *leaseStresser) error {
|
||||
plog.Infof("short lived leases %v", ls.shortLivedLeases.getLeasesMap())
|
||||
return checkLeases(true, ls, ls.shortLivedLeases.getLeasesMap())
|
||||
}
|
||||
|
||||
func checkLeases(expired bool, ls *leaseStresser, leases map[int64]time.Time) error {
|
||||
func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), leaseCheckerTimeout)
|
||||
defer cancel()
|
||||
for leaseID := range leases {
|
||||
keysExpired, err := ls.hasKeysAttachedToLeaseExpired(ctx, leaseID)
|
||||
keysExpired, err := lc.ls.hasKeysAttachedToLeaseExpired(ctx, leaseID)
|
||||
if err != nil {
|
||||
plog.Errorf("hasKeysAttachedToLeaseExpired error: (%v)", err)
|
||||
return err
|
||||
}
|
||||
leaseExpired, err := ls.hasLeaseExpired(ctx, leaseID)
|
||||
leaseExpired, err := lc.ls.hasLeaseExpired(ctx, leaseID)
|
||||
if err != nil {
|
||||
plog.Errorf("hasLeaseExpired error: (%v)", err)
|
||||
return err
|
||||
@ -217,22 +171,25 @@ func checkLeases(expired bool, ls *leaseStresser, leases map[int64]time.Time) er
|
||||
return nil
|
||||
}
|
||||
|
||||
type compositeChecker struct {
|
||||
checkers []Checker
|
||||
}
|
||||
// compositeChecker implements a checker that runs a slice of Checkers concurrently.
|
||||
type compositeChecker struct{ checkers []Checker }
|
||||
|
||||
func newCompositeChecker(checkers []Checker) Checker {
|
||||
return &compositeChecker{checkers}
|
||||
}
|
||||
|
||||
func (cchecker *compositeChecker) Check() error {
|
||||
for _, checker := range cchecker.checkers {
|
||||
if err := checker.Check(); err != nil {
|
||||
return err
|
||||
errc := make(chan error)
|
||||
for _, c := range cchecker.checkers {
|
||||
go func(chk Checker) { errc <- chk.Check() }(c)
|
||||
}
|
||||
var errs []error
|
||||
for range cchecker.checkers {
|
||||
if err := <-errc; err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return errsToError(errs)
|
||||
}
|
||||
|
||||
type noChecker struct{}
|
||||
|
@ -39,17 +39,8 @@ type agentConfig struct {
|
||||
}
|
||||
|
||||
type cluster struct {
|
||||
agents []agentConfig
|
||||
|
||||
v2Only bool // to be deprecated
|
||||
consistencyCheck bool
|
||||
Size int
|
||||
|
||||
Stressers []Stresser
|
||||
stressBuilder stressBuilder
|
||||
leaseStresserBuilder leaseStresserBuilder
|
||||
Checker Checker
|
||||
|
||||
agents []agentConfig
|
||||
Size int
|
||||
Members []*member
|
||||
}
|
||||
|
||||
@ -100,27 +91,6 @@ func (c *cluster) bootstrap() error {
|
||||
}
|
||||
}
|
||||
|
||||
c.Stressers = make([]Stresser, 0)
|
||||
leaseStressers := make([]Stresser, len(members))
|
||||
for i, m := range members {
|
||||
lStresser := c.leaseStresserBuilder(m)
|
||||
leaseStressers[i] = lStresser
|
||||
c.Stressers = append(c.Stressers, c.stressBuilder(m), lStresser)
|
||||
}
|
||||
|
||||
for i := range c.Stressers {
|
||||
go c.Stressers[i].Stress()
|
||||
}
|
||||
|
||||
var checkers []Checker
|
||||
if c.consistencyCheck && !c.v2Only {
|
||||
checkers = append(checkers, newHashChecker(hashAndRevGetter(c)), newLeaseChecker(leaseStressers))
|
||||
} else {
|
||||
checkers = append(checkers, newNoChecker())
|
||||
}
|
||||
|
||||
c.Checker = newCompositeChecker(checkers)
|
||||
|
||||
c.Size = size
|
||||
c.Members = members
|
||||
return nil
|
||||
@ -134,13 +104,9 @@ func (c *cluster) WaitHealth() error {
|
||||
// TODO: set it to a reasonable value. It is set that high because
|
||||
// follower may use long time to catch up the leader when reboot under
|
||||
// reasonable workload (https://github.com/coreos/etcd/issues/2698)
|
||||
healthFunc := func(m *member) error { return m.SetHealthKeyV3() }
|
||||
if c.v2Only {
|
||||
healthFunc = func(m *member) error { return m.SetHealthKeyV2() }
|
||||
}
|
||||
for i := 0; i < 60; i++ {
|
||||
for _, m := range c.Members {
|
||||
if err = healthFunc(m); err != nil {
|
||||
if err = m.SetHealthKeyV3(); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -155,9 +121,6 @@ func (c *cluster) WaitHealth() error {
|
||||
|
||||
// GetLeader returns the index of leader and error if any.
|
||||
func (c *cluster) GetLeader() (int, error) {
|
||||
if c.v2Only {
|
||||
return 0, nil
|
||||
}
|
||||
for i, m := range c.Members {
|
||||
isLeader, err := m.IsLeader()
|
||||
if isLeader || err != nil {
|
||||
@ -167,15 +130,6 @@ func (c *cluster) GetLeader() (int, error) {
|
||||
return 0, fmt.Errorf("no leader found")
|
||||
}
|
||||
|
||||
func (c *cluster) Report() (success, failure int) {
|
||||
for _, stress := range c.Stressers {
|
||||
s, f := stress.Report()
|
||||
success += s
|
||||
failure += f
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *cluster) Cleanup() error {
|
||||
var lasterr error
|
||||
for _, m := range c.Members {
|
||||
@ -183,10 +137,6 @@ func (c *cluster) Cleanup() error {
|
||||
lasterr = err
|
||||
}
|
||||
}
|
||||
for _, s := range c.Stressers {
|
||||
s.Cancel()
|
||||
}
|
||||
|
||||
return lasterr
|
||||
}
|
||||
|
||||
@ -194,9 +144,6 @@ func (c *cluster) Terminate() {
|
||||
for _, m := range c.Members {
|
||||
m.Agent.Terminate()
|
||||
}
|
||||
for _, s := range c.Stressers {
|
||||
s.Cancel()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cluster) Status() ClusterStatus {
|
||||
@ -217,6 +164,29 @@ func (c *cluster) Status() ClusterStatus {
|
||||
return cs
|
||||
}
|
||||
|
||||
// maxRev returns the maximum revision found on the cluster.
|
||||
func (c *cluster) maxRev() (rev int64, err error) {
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
|
||||
defer cancel()
|
||||
revc, errc := make(chan int64, len(c.Members)), make(chan error, len(c.Members))
|
||||
for i := range c.Members {
|
||||
go func(m *member) {
|
||||
mrev, merr := m.Rev(ctx)
|
||||
revc <- mrev
|
||||
errc <- merr
|
||||
}(c.Members[i])
|
||||
}
|
||||
for i := 0; i < len(c.Members); i++ {
|
||||
if merr := <-errc; merr != nil {
|
||||
err = merr
|
||||
}
|
||||
if mrev := <-revc; mrev > rev {
|
||||
rev = mrev
|
||||
}
|
||||
}
|
||||
return rev, err
|
||||
}
|
||||
|
||||
func (c *cluster) getRevisionHash() (map[string]int64, map[string]int64, error) {
|
||||
revs := make(map[string]int64)
|
||||
hashes := make(map[string]int64)
|
||||
|
@ -136,26 +136,31 @@ func (f *failureUntilSnapshot) Inject(c *cluster, round int) error {
|
||||
if err := f.failure.Inject(c, round); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if c.Size < 3 {
|
||||
return nil
|
||||
}
|
||||
|
||||
start, _ := c.Report()
|
||||
end := start
|
||||
// maxRev may fail since failure just injected, retry if failed.
|
||||
startRev, err := c.maxRev()
|
||||
for i := 0; i < 10 && startRev == 0; i++ {
|
||||
startRev, err = c.maxRev()
|
||||
}
|
||||
if startRev == 0 {
|
||||
return err
|
||||
}
|
||||
lastRev := startRev
|
||||
// Normal healthy cluster could accept 1000req/s at least.
|
||||
// Give it 3-times time to create a new snapshot.
|
||||
retry := snapshotCount / 1000 * 3
|
||||
for j := 0; j < retry; j++ {
|
||||
end, _ = c.Report()
|
||||
lastRev, err = c.maxRev()
|
||||
// If the number of proposals committed is bigger than snapshot count,
|
||||
// a new snapshot should have been created.
|
||||
if end-start > snapshotCount {
|
||||
if lastRev-startRev > snapshotCount {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
return fmt.Errorf("cluster too slow: only commit %d requests in %ds", end-start, retry)
|
||||
return fmt.Errorf("cluster too slow: only commit %d requests in %ds", lastRev-startRev, retry)
|
||||
}
|
||||
|
||||
func (f *failureUntilSnapshot) Desc() string {
|
||||
|
249
tools/functional-tester/etcd-tester/key_stresser.go
Normal file
249
tools/functional-tester/etcd-tester/key_stresser.go
Normal file
@ -0,0 +1,249 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context" // grpc does a comparison on context.Cancel; can't use "context" package
|
||||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/transport"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
)
|
||||
|
||||
type keyStresser struct {
|
||||
Endpoint string
|
||||
|
||||
keyLargeSize int
|
||||
keySize int
|
||||
keySuffixRange int
|
||||
|
||||
N int
|
||||
|
||||
rateLimiter *rate.Limiter
|
||||
|
||||
mu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
|
||||
cancel func()
|
||||
conn *grpc.ClientConn
|
||||
|
||||
success int
|
||||
failure int
|
||||
|
||||
stressTable *stressTable
|
||||
}
|
||||
|
||||
func (s *keyStresser) Stress() error {
|
||||
// TODO: add backoff option
|
||||
conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v (%s)", err, s.Endpoint)
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
s.wg.Add(s.N)
|
||||
s.conn = conn
|
||||
s.cancel = cancel
|
||||
|
||||
kvc := pb.NewKVClient(conn)
|
||||
|
||||
var stressEntries = []stressEntry{
|
||||
{weight: 0.7, f: newStressPut(kvc, s.keySuffixRange, s.keySize)},
|
||||
{
|
||||
weight: 0.7 * float32(s.keySize) / float32(s.keyLargeSize),
|
||||
f: newStressPut(kvc, s.keySuffixRange, s.keyLargeSize),
|
||||
},
|
||||
{weight: 0.07, f: newStressRange(kvc, s.keySuffixRange)},
|
||||
{weight: 0.07, f: newStressRangeInterval(kvc, s.keySuffixRange)},
|
||||
{weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)},
|
||||
{weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)},
|
||||
}
|
||||
s.stressTable = createStressTable(stressEntries)
|
||||
|
||||
for i := 0; i < s.N; i++ {
|
||||
go s.run(ctx)
|
||||
}
|
||||
|
||||
plog.Infof("keyStresser %q is started", s.Endpoint)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *keyStresser) run(ctx context.Context) {
|
||||
defer s.wg.Done()
|
||||
|
||||
for {
|
||||
if err := s.rateLimiter.Wait(ctx); err == context.Canceled {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: 10-second is enough timeout to cover leader failure
|
||||
// and immediate leader election. Find out what other cases this
|
||||
// could be timed out.
|
||||
sctx, scancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
err := s.stressTable.choose()(sctx)
|
||||
scancel()
|
||||
if err == nil {
|
||||
s.mu.Lock()
|
||||
s.success++
|
||||
s.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.failure++
|
||||
s.mu.Unlock()
|
||||
switch grpc.ErrorDesc(err) {
|
||||
case context.DeadlineExceeded.Error():
|
||||
// This retries when request is triggered at the same time as
|
||||
// leader failure. When we terminate the leader, the request to
|
||||
// that leader cannot be processed, and times out. Also requests
|
||||
// to followers cannot be forwarded to the old leader, so timing out
|
||||
// as well. We want to keep stressing until the cluster elects a
|
||||
// new leader and start processing requests again.
|
||||
case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error():
|
||||
// This retries when request is triggered at the same time as
|
||||
// leader failure and follower nodes receive time out errors
|
||||
// from losing their leader. Followers should retry to connect
|
||||
// to the new leader.
|
||||
case etcdserver.ErrStopped.Error():
|
||||
// one of the etcd nodes stopped from failure injection
|
||||
case transport.ErrConnClosing.Desc:
|
||||
// server closed the transport (failure injected node)
|
||||
case rpctypes.ErrNotCapable.Error():
|
||||
// capability check has not been done (in the beginning)
|
||||
case rpctypes.ErrTooManyRequests.Error():
|
||||
// hitting the recovering member.
|
||||
case context.Canceled.Error():
|
||||
// from stresser.Cancel method:
|
||||
return
|
||||
case grpc.ErrClientConnClosing.Error():
|
||||
// from stresser.Cancel method:
|
||||
return
|
||||
default:
|
||||
su, fa := s.Report()
|
||||
plog.Errorf("keyStresser %v (success %d, failure %d) exited with error (%v)", s.Endpoint, su, fa, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *keyStresser) Cancel() {
|
||||
s.cancel()
|
||||
s.conn.Close()
|
||||
s.wg.Wait()
|
||||
plog.Infof("keyStresser %q is canceled", s.Endpoint)
|
||||
}
|
||||
|
||||
func (s *keyStresser) Report() (int, int) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.success, s.failure
|
||||
}
|
||||
|
||||
func (s *keyStresser) Checker() Checker { return nil }
|
||||
|
||||
type stressFunc func(ctx context.Context) error
|
||||
|
||||
type stressEntry struct {
|
||||
weight float32
|
||||
f stressFunc
|
||||
}
|
||||
|
||||
type stressTable struct {
|
||||
entries []stressEntry
|
||||
sumWeights float32
|
||||
}
|
||||
|
||||
func createStressTable(entries []stressEntry) *stressTable {
|
||||
st := stressTable{entries: entries}
|
||||
for _, entry := range st.entries {
|
||||
st.sumWeights += entry.weight
|
||||
}
|
||||
return &st
|
||||
}
|
||||
|
||||
func (st *stressTable) choose() stressFunc {
|
||||
v := rand.Float32() * st.sumWeights
|
||||
var sum float32
|
||||
var idx int
|
||||
for i := range st.entries {
|
||||
sum += st.entries[i].weight
|
||||
if sum >= v {
|
||||
idx = i
|
||||
break
|
||||
}
|
||||
}
|
||||
return st.entries[idx].f
|
||||
}
|
||||
|
||||
func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
_, err := kvc.Put(ctx, &pb.PutRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
|
||||
Value: randBytes(keySize),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
_, err := kvc.Range(ctx, &pb.RangeRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func newStressRangeInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
start := rand.Intn(keySuffixRange)
|
||||
end := start + 500
|
||||
_, err := kvc.Range(ctx, &pb.RangeRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%016x", start)),
|
||||
RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
start := rand.Intn(keySuffixRange)
|
||||
end := start + 500
|
||||
_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%016x", start)),
|
||||
RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
}
|
||||
}
|
@ -32,16 +32,8 @@ const (
|
||||
// time to live for lease
|
||||
TTL = 120
|
||||
TTLShort = 2
|
||||
// leasesStressRoundPs indicates the rate that leaseStresser.run() creates and deletes leases per second
|
||||
leasesStressRoundPs = 1
|
||||
)
|
||||
|
||||
type leaseStressConfig struct {
|
||||
numLeases int
|
||||
keysPerLease int
|
||||
qps int
|
||||
}
|
||||
|
||||
type leaseStresser struct {
|
||||
endpoint string
|
||||
cancel func()
|
||||
@ -110,36 +102,6 @@ func (al *atomicLeases) getLeasesMap() map[int64]time.Time {
|
||||
return leasesCopy
|
||||
}
|
||||
|
||||
type leaseStresserBuilder func(m *member) Stresser
|
||||
|
||||
func newLeaseStresserBuilder(s string, lsConfig *leaseStressConfig) leaseStresserBuilder {
|
||||
// TODO: probably need to combine newLeaseStresserBuilder with newStresserBuilder to have a unified stresser builder.
|
||||
switch s {
|
||||
case "nop":
|
||||
return func(*member) Stresser {
|
||||
return &nopStresser{
|
||||
start: time.Now(),
|
||||
qps: lsConfig.qps,
|
||||
}
|
||||
}
|
||||
case "default":
|
||||
return func(mem *member) Stresser {
|
||||
// limit lease stresser to run 1 round per second
|
||||
l := rate.NewLimiter(rate.Limit(leasesStressRoundPs), leasesStressRoundPs)
|
||||
return &leaseStresser{
|
||||
endpoint: mem.grpcAddr(),
|
||||
numLeases: lsConfig.numLeases,
|
||||
keysPerLease: lsConfig.keysPerLease,
|
||||
rateLimiter: l,
|
||||
}
|
||||
}
|
||||
default:
|
||||
plog.Panicf("unknown stresser type: %s\n", s)
|
||||
}
|
||||
// never reach here
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) setupOnce() error {
|
||||
if ls.aliveLeases != nil {
|
||||
return nil
|
||||
@ -185,7 +147,8 @@ func (ls *leaseStresser) run() {
|
||||
defer ls.runWg.Done()
|
||||
ls.restartKeepAlives()
|
||||
for {
|
||||
if err := ls.rateLimiter.Wait(ls.ctx); err == context.Canceled {
|
||||
err := ls.rateLimiter.WaitN(ls.ctx, ls.numLeases*ls.keysPerLease)
|
||||
if err == context.Canceled {
|
||||
return
|
||||
}
|
||||
plog.Debugf("creating lease on %v", ls.endpoint)
|
||||
@ -437,3 +400,5 @@ func (ls *leaseStresser) Cancel() {
|
||||
func (ls *leaseStresser) Report() (int, int) {
|
||||
return ls.success, ls.failure
|
||||
}
|
||||
|
||||
func (ls *leaseStresser) Checker() Checker { return &leaseChecker{ls} }
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcd-tester")
|
||||
@ -47,8 +48,7 @@ func main() {
|
||||
stressQPS := flag.Int("stress-qps", 10000, "maximum number of stresser requests per second.")
|
||||
schedCases := flag.String("schedule-cases", "", "test case schedule")
|
||||
consistencyCheck := flag.Bool("consistency-check", true, "true to check consistency (revision, hash)")
|
||||
isV2Only := flag.Bool("v2-only", false, "'true' to run V2 only tester.")
|
||||
stresserType := flag.String("stresser", "default", "specify stresser (\"default\" or \"nop\").")
|
||||
stresserType := flag.String("stresser", "keys,lease", "comma separated list of stressers (keys, lease, v2keys, nop).")
|
||||
failureTypes := flag.String("failures", "default,failpoints", "specify failures (concat of \"default\" and \"failpoints\").")
|
||||
externalFailures := flag.String("external-failures", "", "specify a path of script for enabling/disabling an external fault injector")
|
||||
flag.Parse()
|
||||
@ -67,28 +67,7 @@ func main() {
|
||||
agents[i].datadir = *datadir
|
||||
}
|
||||
|
||||
sConfig := &stressConfig{
|
||||
qps: *stressQPS,
|
||||
keyLargeSize: int(*stressKeyLargeSize),
|
||||
keySize: int(*stressKeySize),
|
||||
keySuffixRange: int(*stressKeySuffixRange),
|
||||
v2: *isV2Only,
|
||||
}
|
||||
|
||||
lsConfig := &leaseStressConfig{
|
||||
numLeases: 10,
|
||||
keysPerLease: 10,
|
||||
qps: *stressQPS, // only used to create nop stresser in leaseStresserBuilder
|
||||
}
|
||||
|
||||
c := &cluster{
|
||||
agents: agents,
|
||||
v2Only: *isV2Only,
|
||||
stressBuilder: newStressBuilder(*stresserType, sConfig),
|
||||
leaseStresserBuilder: newLeaseStresserBuilder(*stresserType, lsConfig),
|
||||
consistencyCheck: *consistencyCheck,
|
||||
}
|
||||
|
||||
c := &cluster{agents: agents}
|
||||
if err := c.bootstrap(); err != nil {
|
||||
plog.Fatal(err)
|
||||
}
|
||||
@ -129,10 +108,24 @@ func main() {
|
||||
schedule[i] = failures[caseNum]
|
||||
}
|
||||
}
|
||||
|
||||
scfg := stressConfig{
|
||||
rateLimiter: rate.NewLimiter(rate.Limit(*stressQPS), *stressQPS),
|
||||
keyLargeSize: int(*stressKeyLargeSize),
|
||||
keySize: int(*stressKeySize),
|
||||
keySuffixRange: int(*stressKeySuffixRange),
|
||||
numLeases: 10,
|
||||
keysPerLease: 10,
|
||||
}
|
||||
|
||||
t := &tester{
|
||||
failures: schedule,
|
||||
cluster: c,
|
||||
limit: *limit,
|
||||
|
||||
scfg: scfg,
|
||||
stresserType: *stresserType,
|
||||
doChecks: *consistencyCheck,
|
||||
}
|
||||
|
||||
sh := statusHandler{status: &t.status}
|
||||
|
@ -23,7 +23,6 @@ import (
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
clientv2 "github.com/coreos/etcd/client"
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
|
||||
@ -105,6 +104,19 @@ func (m *member) RevHash() (int64, int64, error) {
|
||||
return resp.Header.Revision, int64(resp.Hash), nil
|
||||
}
|
||||
|
||||
func (m *member) Rev(ctx context.Context) (int64, error) {
|
||||
cli, err := m.newClientV3()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer cli.Close()
|
||||
resp, err := cli.Status(ctx, m.ClientURL)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return resp.Header.Revision, nil
|
||||
}
|
||||
|
||||
func (m *member) IsLeader() (bool, error) {
|
||||
cli, err := m.newClientV3()
|
||||
if err != nil {
|
||||
@ -134,19 +146,6 @@ func (m *member) SetHealthKeyV3() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *member) SetHealthKeyV2() error {
|
||||
cfg := clientv2.Config{Endpoints: []string{m.ClientURL}}
|
||||
c, err := clientv2.New(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
kapi := clientv2.NewKeysAPI(c)
|
||||
_, err = kapi.Set(ctx, "health", "good", nil)
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *member) newClientV3() (*clientv3.Client, error) {
|
||||
return clientv3.New(clientv3.Config{
|
||||
Endpoints: []string{m.ClientURL},
|
||||
|
@ -15,113 +15,15 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
clientV2 "github.com/coreos/etcd/client"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/transport"
|
||||
)
|
||||
|
||||
func init() {
|
||||
grpclog.SetLogger(plog)
|
||||
}
|
||||
|
||||
type stressFunc func(ctx context.Context) error
|
||||
|
||||
type stressEntry struct {
|
||||
weight float32
|
||||
f stressFunc
|
||||
}
|
||||
|
||||
type stressTable struct {
|
||||
entries []stressEntry
|
||||
sumWeights float32
|
||||
}
|
||||
|
||||
func createStressTable(entries []stressEntry) *stressTable {
|
||||
st := stressTable{entries: entries}
|
||||
for _, entry := range st.entries {
|
||||
st.sumWeights += entry.weight
|
||||
}
|
||||
return &st
|
||||
}
|
||||
|
||||
func (st *stressTable) choose() stressFunc {
|
||||
v := rand.Float32() * st.sumWeights
|
||||
var sum float32
|
||||
var idx int
|
||||
for i := range st.entries {
|
||||
sum += st.entries[i].weight
|
||||
if sum >= v {
|
||||
idx = i
|
||||
break
|
||||
}
|
||||
}
|
||||
return st.entries[idx].f
|
||||
}
|
||||
|
||||
func newStressPut(kvc pb.KVClient, keySuffixRange, keySize int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
_, err := kvc.Put(ctx, &pb.PutRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
|
||||
Value: randBytes(keySize),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func newStressRange(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
_, err := kvc.Range(ctx, &pb.RangeRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func newStressRangeInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
start := rand.Intn(keySuffixRange)
|
||||
end := start + 500
|
||||
_, err := kvc.Range(ctx, &pb.RangeRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%016x", start)),
|
||||
RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func newStressDelete(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange))),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func newStressDeleteInterval(kvc pb.KVClient, keySuffixRange int) stressFunc {
|
||||
return func(ctx context.Context) error {
|
||||
start := rand.Intn(keySuffixRange)
|
||||
end := start + 500
|
||||
_, err := kvc.DeleteRange(ctx, &pb.DeleteRangeRequest{
|
||||
Key: []byte(fmt.Sprintf("foo%016x", start)),
|
||||
RangeEnd: []byte(fmt.Sprintf("foo%016x", end)),
|
||||
}, grpc.FailFast(false))
|
||||
return err
|
||||
}
|
||||
}
|
||||
func init() { grpclog.SetLogger(plog) }
|
||||
|
||||
type Stresser interface {
|
||||
// Stress starts to stress the etcd cluster
|
||||
@ -130,244 +32,8 @@ type Stresser interface {
|
||||
Cancel()
|
||||
// Report reports the success and failure of the stress test
|
||||
Report() (success int, failure int)
|
||||
}
|
||||
|
||||
type stresser struct {
|
||||
Endpoint string
|
||||
|
||||
keyLargeSize int
|
||||
keySize int
|
||||
keySuffixRange int
|
||||
|
||||
N int
|
||||
|
||||
mu sync.Mutex
|
||||
wg *sync.WaitGroup
|
||||
|
||||
rateLimiter *rate.Limiter
|
||||
|
||||
cancel func()
|
||||
conn *grpc.ClientConn
|
||||
|
||||
success int
|
||||
failure int
|
||||
|
||||
stressTable *stressTable
|
||||
}
|
||||
|
||||
func (s *stresser) Stress() error {
|
||||
if s.rateLimiter == nil {
|
||||
panic("expect rateLimiter to be set")
|
||||
}
|
||||
|
||||
// TODO: add backoff option
|
||||
conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v (%s)", err, s.Endpoint)
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(s.N)
|
||||
|
||||
s.mu.Lock()
|
||||
s.conn = conn
|
||||
s.cancel = cancel
|
||||
s.wg = wg
|
||||
s.mu.Unlock()
|
||||
|
||||
kvc := pb.NewKVClient(conn)
|
||||
|
||||
var stressEntries = []stressEntry{
|
||||
{weight: 0.7, f: newStressPut(kvc, s.keySuffixRange, s.keySize)},
|
||||
{
|
||||
weight: 0.7 * float32(s.keySize) / float32(s.keyLargeSize),
|
||||
f: newStressPut(kvc, s.keySuffixRange, s.keyLargeSize),
|
||||
},
|
||||
{weight: 0.07, f: newStressRange(kvc, s.keySuffixRange)},
|
||||
{weight: 0.07, f: newStressRangeInterval(kvc, s.keySuffixRange)},
|
||||
{weight: 0.07, f: newStressDelete(kvc, s.keySuffixRange)},
|
||||
{weight: 0.07, f: newStressDeleteInterval(kvc, s.keySuffixRange)},
|
||||
}
|
||||
s.stressTable = createStressTable(stressEntries)
|
||||
|
||||
for i := 0; i < s.N; i++ {
|
||||
go s.run(ctx)
|
||||
}
|
||||
|
||||
plog.Printf("stresser %q is started", s.Endpoint)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *stresser) run(ctx context.Context) {
|
||||
defer s.wg.Done()
|
||||
|
||||
for {
|
||||
if err := s.rateLimiter.Wait(ctx); err == context.Canceled {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: 10-second is enough timeout to cover leader failure
|
||||
// and immediate leader election. Find out what other cases this
|
||||
// could be timed out.
|
||||
sctx, scancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
|
||||
err := s.stressTable.choose()(sctx)
|
||||
|
||||
scancel()
|
||||
|
||||
if err != nil {
|
||||
s.mu.Lock()
|
||||
s.failure++
|
||||
s.mu.Unlock()
|
||||
|
||||
switch grpc.ErrorDesc(err) {
|
||||
case context.DeadlineExceeded.Error():
|
||||
// This retries when request is triggered at the same time as
|
||||
// leader failure. When we terminate the leader, the request to
|
||||
// that leader cannot be processed, and times out. Also requests
|
||||
// to followers cannot be forwarded to the old leader, so timing out
|
||||
// as well. We want to keep stressing until the cluster elects a
|
||||
// new leader and start processing requests again.
|
||||
continue
|
||||
|
||||
case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error():
|
||||
// This retries when request is triggered at the same time as
|
||||
// leader failure and follower nodes receive time out errors
|
||||
// from losing their leader. Followers should retry to connect
|
||||
// to the new leader.
|
||||
continue
|
||||
|
||||
case etcdserver.ErrStopped.Error():
|
||||
// one of the etcd nodes stopped from failure injection
|
||||
continue
|
||||
|
||||
case transport.ErrConnClosing.Desc:
|
||||
// server closed the transport (failure injected node)
|
||||
continue
|
||||
|
||||
case rpctypes.ErrNotCapable.Error():
|
||||
// capability check has not been done (in the beginning)
|
||||
continue
|
||||
|
||||
case rpctypes.ErrTooManyRequests.Error():
|
||||
// hitting the recovering member.
|
||||
continue
|
||||
|
||||
case context.Canceled.Error():
|
||||
// from stresser.Cancel method:
|
||||
return
|
||||
|
||||
case grpc.ErrClientConnClosing.Error():
|
||||
// from stresser.Cancel method:
|
||||
return
|
||||
}
|
||||
|
||||
su, fa := s.Report()
|
||||
plog.Warningf("stresser %v (success %d, failure %d) exited with error (%v)", s.Endpoint, su, fa, err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.success++
|
||||
s.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *stresser) Cancel() {
|
||||
s.mu.Lock()
|
||||
s.cancel()
|
||||
s.conn.Close()
|
||||
wg := s.wg
|
||||
s.mu.Unlock()
|
||||
|
||||
wg.Wait()
|
||||
plog.Printf("stresser %q is canceled", s.Endpoint)
|
||||
}
|
||||
|
||||
func (s *stresser) Report() (int, int) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.success, s.failure
|
||||
}
|
||||
|
||||
type stresserV2 struct {
|
||||
Endpoint string
|
||||
|
||||
keySize int
|
||||
keySuffixRange int
|
||||
|
||||
N int
|
||||
|
||||
mu sync.Mutex
|
||||
failure int
|
||||
success int
|
||||
|
||||
cancel func()
|
||||
}
|
||||
|
||||
func (s *stresserV2) Stress() error {
|
||||
cfg := clientV2.Config{
|
||||
Endpoints: []string{s.Endpoint},
|
||||
Transport: &http.Transport{
|
||||
Dial: (&net.Dialer{
|
||||
Timeout: time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).Dial,
|
||||
MaxIdleConnsPerHost: s.N,
|
||||
},
|
||||
}
|
||||
c, err := clientV2.New(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
kv := clientV2.NewKeysAPI(c)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s.cancel = cancel
|
||||
|
||||
for i := 0; i < s.N; i++ {
|
||||
go func() {
|
||||
for {
|
||||
setctx, setcancel := context.WithTimeout(ctx, clientV2.DefaultRequestTimeout)
|
||||
key := fmt.Sprintf("foo%016x", rand.Intn(s.keySuffixRange))
|
||||
_, err := kv.Set(setctx, key, string(randBytes(s.keySize)), nil)
|
||||
setcancel()
|
||||
if err == context.Canceled {
|
||||
return
|
||||
}
|
||||
s.mu.Lock()
|
||||
if err != nil {
|
||||
s.failure++
|
||||
} else {
|
||||
s.success++
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
<-ctx.Done()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *stresserV2) Cancel() {
|
||||
s.cancel()
|
||||
}
|
||||
|
||||
func (s *stresserV2) Report() (success int, failure int) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.success, s.failure
|
||||
}
|
||||
|
||||
func randBytes(size int) []byte {
|
||||
data := make([]byte, size)
|
||||
for i := 0; i < size; i++ {
|
||||
data[i] = byte(int('a') + rand.Intn(26))
|
||||
}
|
||||
return data
|
||||
// Checker returns an invariant checker for after the stresser is canceled.
|
||||
Checker() Checker
|
||||
}
|
||||
|
||||
// nopStresser implements Stresser that does nothing
|
||||
@ -381,54 +47,112 @@ func (s *nopStresser) Cancel() {}
|
||||
func (s *nopStresser) Report() (int, int) {
|
||||
return int(time.Since(s.start).Seconds()) * s.qps, 0
|
||||
}
|
||||
func (s *nopStresser) Checker() Checker { return nil }
|
||||
|
||||
// compositeStresser implements a Stresser that runs a slice of
|
||||
// stressers concurrently.
|
||||
type compositeStresser struct {
|
||||
stressers []Stresser
|
||||
}
|
||||
|
||||
func (cs *compositeStresser) Stress() error {
|
||||
for i, s := range cs.stressers {
|
||||
if err := s.Stress(); err != nil {
|
||||
for j := 0; j < i; j++ {
|
||||
cs.stressers[i].Cancel()
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *compositeStresser) Cancel() {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(cs.stressers))
|
||||
for i := range cs.stressers {
|
||||
go func(s Stresser) {
|
||||
defer wg.Done()
|
||||
s.Cancel()
|
||||
}(cs.stressers[i])
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (cs *compositeStresser) Report() (succ int, fail int) {
|
||||
for _, stress := range cs.stressers {
|
||||
s, f := stress.Report()
|
||||
succ += s
|
||||
fail += f
|
||||
}
|
||||
return succ, fail
|
||||
}
|
||||
|
||||
func (cs *compositeStresser) Checker() Checker {
|
||||
var chks []Checker
|
||||
for _, s := range cs.stressers {
|
||||
if chk := s.Checker(); chk != nil {
|
||||
chks = append(chks, chk)
|
||||
}
|
||||
}
|
||||
if len(chks) == 0 {
|
||||
return nil
|
||||
}
|
||||
return newCompositeChecker(chks)
|
||||
}
|
||||
|
||||
type stressConfig struct {
|
||||
qps int
|
||||
keyLargeSize int
|
||||
keySize int
|
||||
keySuffixRange int
|
||||
v2 bool
|
||||
|
||||
numLeases int
|
||||
keysPerLease int
|
||||
|
||||
rateLimiter *rate.Limiter
|
||||
}
|
||||
|
||||
type stressBuilder func(m *member) Stresser
|
||||
|
||||
func newStressBuilder(s string, sc *stressConfig) stressBuilder {
|
||||
// NewStresser creates stresser from a comma separated list of stresser types.
|
||||
func NewStresser(s string, sc *stressConfig, m *member) Stresser {
|
||||
types := strings.Split(s, ",")
|
||||
if len(types) > 1 {
|
||||
stressers := make([]Stresser, len(types))
|
||||
for i, stype := range types {
|
||||
stressers[i] = NewStresser(stype, sc, m)
|
||||
}
|
||||
return &compositeStresser{stressers}
|
||||
}
|
||||
switch s {
|
||||
case "nop":
|
||||
return func(*member) Stresser {
|
||||
return &nopStresser{
|
||||
start: time.Now(),
|
||||
qps: sc.qps,
|
||||
}
|
||||
}
|
||||
case "default":
|
||||
return &nopStresser{start: time.Now(), qps: int(sc.rateLimiter.Limit())}
|
||||
case "keys":
|
||||
// TODO: Too intensive stressers can panic etcd member with
|
||||
// 'out of memory' error. Put rate limits in server side.
|
||||
stressN := 100
|
||||
l := rate.NewLimiter(rate.Limit(sc.qps), sc.qps)
|
||||
|
||||
return func(m *member) Stresser {
|
||||
if sc.v2 {
|
||||
return &stresserV2{
|
||||
Endpoint: m.ClientURL,
|
||||
keySize: sc.keySize,
|
||||
keySuffixRange: sc.keySuffixRange,
|
||||
N: stressN,
|
||||
}
|
||||
} else {
|
||||
return &stresser{
|
||||
Endpoint: m.grpcAddr(),
|
||||
keyLargeSize: sc.keyLargeSize,
|
||||
keySize: sc.keySize,
|
||||
keySuffixRange: sc.keySuffixRange,
|
||||
N: stressN,
|
||||
rateLimiter: l,
|
||||
}
|
||||
}
|
||||
return &keyStresser{
|
||||
Endpoint: m.grpcAddr(),
|
||||
keyLargeSize: sc.keyLargeSize,
|
||||
keySize: sc.keySize,
|
||||
keySuffixRange: sc.keySuffixRange,
|
||||
N: 100,
|
||||
rateLimiter: sc.rateLimiter,
|
||||
}
|
||||
case "v2keys":
|
||||
return &v2Stresser{
|
||||
Endpoint: m.ClientURL,
|
||||
keySize: sc.keySize,
|
||||
keySuffixRange: sc.keySuffixRange,
|
||||
N: 100,
|
||||
rateLimiter: sc.rateLimiter,
|
||||
}
|
||||
case "lease":
|
||||
return &leaseStresser{
|
||||
endpoint: m.grpcAddr(),
|
||||
numLeases: sc.numLeases,
|
||||
keysPerLease: sc.keysPerLease,
|
||||
rateLimiter: sc.rateLimiter,
|
||||
}
|
||||
default:
|
||||
plog.Panicf("unknown stresser type: %s\n", s)
|
||||
}
|
||||
|
||||
return nil // never reach here
|
||||
}
|
||||
|
@ -20,11 +20,19 @@ import (
|
||||
)
|
||||
|
||||
type tester struct {
|
||||
cluster *cluster
|
||||
limit int
|
||||
|
||||
failures []failure
|
||||
cluster *cluster
|
||||
limit int
|
||||
status Status
|
||||
currentRevision int64
|
||||
|
||||
stresserType string
|
||||
scfg stressConfig
|
||||
doChecks bool
|
||||
|
||||
stresser Stresser
|
||||
checker Checker
|
||||
}
|
||||
|
||||
// compactQPS is rough number of compact requests per second.
|
||||
@ -39,6 +47,11 @@ func (tt *tester) runLoop() {
|
||||
tt.status.Failures = append(tt.status.Failures, f.Desc())
|
||||
}
|
||||
|
||||
if err := tt.resetStressCheck(); err != nil {
|
||||
plog.Errorf("%s failed to start stresser (%v)", err)
|
||||
return
|
||||
}
|
||||
|
||||
var prevCompactRev int64
|
||||
for round := 0; round < tt.limit || tt.limit == -1; round++ {
|
||||
tt.status.setRound(round)
|
||||
@ -100,26 +113,21 @@ func (tt *tester) doRound(round int) error {
|
||||
if err := f.Recover(tt.cluster, round); err != nil {
|
||||
return fmt.Errorf("recovery error: %v", err)
|
||||
}
|
||||
plog.Printf("%s recovered failure", tt.logPrefix())
|
||||
tt.cancelStresser()
|
||||
plog.Printf("%s wait until cluster is healthy", tt.logPrefix())
|
||||
if err := tt.cluster.WaitHealth(); err != nil {
|
||||
return fmt.Errorf("wait full health error: %v", err)
|
||||
}
|
||||
plog.Printf("%s recovered failure", tt.logPrefix())
|
||||
|
||||
if err := tt.checkConsistency(); err != nil {
|
||||
return fmt.Errorf("tt.checkConsistency error (%v)", err)
|
||||
}
|
||||
|
||||
plog.Printf("%s succeed!", tt.logPrefix())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tt *tester) updateRevision() error {
|
||||
if tt.cluster.v2Only {
|
||||
return nil
|
||||
}
|
||||
|
||||
revs, _, err := tt.cluster.getRevisionHash()
|
||||
for _, rev := range revs {
|
||||
tt.currentRevision = rev
|
||||
@ -131,7 +139,6 @@ func (tt *tester) updateRevision() error {
|
||||
}
|
||||
|
||||
func (tt *tester) checkConsistency() (err error) {
|
||||
tt.cancelStressers()
|
||||
defer func() {
|
||||
if err != nil {
|
||||
return
|
||||
@ -140,20 +147,19 @@ func (tt *tester) checkConsistency() (err error) {
|
||||
plog.Warningf("%s functional-tester returning with tt.updateRevision error (%v)", tt.logPrefix(), err)
|
||||
return
|
||||
}
|
||||
err = tt.startStressers()
|
||||
err = tt.startStresser()
|
||||
}()
|
||||
if err = tt.cluster.Checker.Check(); err != nil {
|
||||
if err = tt.checker.Check(); err != nil {
|
||||
plog.Printf("%s %v", tt.logPrefix(), err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (tt *tester) compact(rev int64, timeout time.Duration) (err error) {
|
||||
tt.cancelStressers()
|
||||
tt.cancelStresser()
|
||||
defer func() {
|
||||
if err == nil {
|
||||
err = tt.startStressers()
|
||||
err = tt.startStresser()
|
||||
}
|
||||
}()
|
||||
|
||||
@ -182,7 +188,6 @@ func (tt *tester) defrag() error {
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
plog.Printf("%s defragmented...", tt.logPrefix())
|
||||
return nil
|
||||
}
|
||||
@ -207,35 +212,49 @@ func (tt *tester) cleanup() error {
|
||||
}
|
||||
caseFailedTotalCounter.WithLabelValues(desc).Inc()
|
||||
|
||||
plog.Printf("%s cleaning up...", tt.logPrefix())
|
||||
tt.cancelStresser()
|
||||
if err := tt.cluster.Cleanup(); err != nil {
|
||||
plog.Warningf("%s cleanup error: %v", tt.logPrefix(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
if err := tt.cluster.Reset(); err != nil {
|
||||
plog.Warningf("%s cleanup Bootstrap error: %v", tt.logPrefix(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return tt.resetStressCheck()
|
||||
}
|
||||
|
||||
func (tt *tester) cancelStressers() {
|
||||
func (tt *tester) cancelStresser() {
|
||||
plog.Printf("%s canceling the stressers...", tt.logPrefix())
|
||||
for _, s := range tt.cluster.Stressers {
|
||||
s.Cancel()
|
||||
}
|
||||
tt.stresser.Cancel()
|
||||
plog.Printf("%s canceled stressers", tt.logPrefix())
|
||||
}
|
||||
|
||||
func (tt *tester) startStressers() error {
|
||||
func (tt *tester) startStresser() (err error) {
|
||||
plog.Printf("%s starting the stressers...", tt.logPrefix())
|
||||
for _, s := range tt.cluster.Stressers {
|
||||
if err := s.Stress(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = tt.stresser.Stress()
|
||||
plog.Printf("%s started stressers", tt.logPrefix())
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (tt *tester) resetStressCheck() error {
|
||||
plog.Infof("%s resetting stressers and checkers...", tt.logPrefix())
|
||||
cs := &compositeStresser{}
|
||||
for _, m := range tt.cluster.Members {
|
||||
s := NewStresser(tt.stresserType, &tt.scfg, m)
|
||||
cs.stressers = append(cs.stressers, s)
|
||||
}
|
||||
tt.stresser = cs
|
||||
if !tt.doChecks {
|
||||
tt.checker = newNoChecker()
|
||||
return tt.startStresser()
|
||||
}
|
||||
chk := newHashChecker(hashAndRevGetter(tt.cluster))
|
||||
if schk := cs.Checker(); schk != nil {
|
||||
chk = newCompositeChecker([]Checker{chk, schk})
|
||||
}
|
||||
tt.checker = chk
|
||||
return tt.startStresser()
|
||||
}
|
||||
|
||||
func (tt *tester) Report() (success, failure int) { return tt.stresser.Report() }
|
||||
|
@ -14,6 +14,11 @@
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func getSameValue(vals map[string]int64) (int64, bool) {
|
||||
var rv int64
|
||||
for _, v := range vals {
|
||||
@ -33,3 +38,14 @@ func max(n1, n2 int64) int64 {
|
||||
}
|
||||
return n2
|
||||
}
|
||||
|
||||
func errsToError(errs []error) error {
|
||||
if len(errs) == 0 {
|
||||
return nil
|
||||
}
|
||||
stringArr := make([]string, len(errs))
|
||||
for i, err := range errs {
|
||||
stringArr[i] = err.Error()
|
||||
}
|
||||
return fmt.Errorf(strings.Join(stringArr, ", "))
|
||||
}
|
||||
|
120
tools/functional-tester/etcd-tester/v2_stresser.go
Normal file
120
tools/functional-tester/etcd-tester/v2_stresser.go
Normal file
@ -0,0 +1,120 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
clientV2 "github.com/coreos/etcd/client"
|
||||
)
|
||||
|
||||
type v2Stresser struct {
|
||||
Endpoint string
|
||||
|
||||
keySize int
|
||||
keySuffixRange int
|
||||
|
||||
N int
|
||||
|
||||
rateLimiter *rate.Limiter
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
||||
mu sync.Mutex
|
||||
failure int
|
||||
success int
|
||||
|
||||
cancel func()
|
||||
}
|
||||
|
||||
func (s *v2Stresser) Stress() error {
|
||||
cfg := clientV2.Config{
|
||||
Endpoints: []string{s.Endpoint},
|
||||
Transport: &http.Transport{
|
||||
Dial: (&net.Dialer{
|
||||
Timeout: time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).Dial,
|
||||
MaxIdleConnsPerHost: s.N,
|
||||
},
|
||||
}
|
||||
c, err := clientV2.New(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
kv := clientV2.NewKeysAPI(c)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s.cancel = cancel
|
||||
s.wg.Add(s.N)
|
||||
for i := 0; i < s.N; i++ {
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
s.run(ctx, kv)
|
||||
}()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *v2Stresser) run(ctx context.Context, kv clientV2.KeysAPI) {
|
||||
for {
|
||||
if err := s.rateLimiter.Wait(ctx); err == context.Canceled {
|
||||
return
|
||||
}
|
||||
setctx, setcancel := context.WithTimeout(ctx, clientV2.DefaultRequestTimeout)
|
||||
key := fmt.Sprintf("foo%016x", rand.Intn(s.keySuffixRange))
|
||||
_, err := kv.Set(setctx, key, string(randBytes(s.keySize)), nil)
|
||||
setcancel()
|
||||
if err == context.Canceled {
|
||||
return
|
||||
}
|
||||
s.mu.Lock()
|
||||
if err != nil {
|
||||
s.failure++
|
||||
} else {
|
||||
s.success++
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *v2Stresser) Cancel() {
|
||||
s.cancel()
|
||||
s.wg.Wait()
|
||||
}
|
||||
|
||||
func (s *v2Stresser) Report() (success int, failure int) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.success, s.failure
|
||||
}
|
||||
|
||||
func (s *v2Stresser) Checker() Checker { return nil }
|
||||
|
||||
func randBytes(size int) []byte {
|
||||
data := make([]byte, size)
|
||||
for i := 0; i < size; i++ {
|
||||
data[i] = byte(int('a') + rand.Intn(26))
|
||||
}
|
||||
return data
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user