functional-tester/tester: refactor cluster code to support liveness mode

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyuho Lee
2018-04-04 13:04:21 -07:00
parent b3fea7ed53
commit 808966a7e9
6 changed files with 51 additions and 38 deletions

View File

@@ -293,7 +293,10 @@ func (clus *Cluster) updateFailures() {
clus.lg.Info("no failpoints found!", zap.Error(fperr))
}
clus.failures = append(clus.failures, fpFailures...)
case "NO_FAIL":
case "NO_FAIL_WITH_STRESS":
clus.failures = append(clus.failures, newFailureNoOp())
case "NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS":
// TODO
clus.failures = append(clus.failures, newFailureNoOp())
case "EXTERNAL":
clus.failures = append(clus.failures, newFailureExternal(clus.Tester.ExternalExecPath))

View File

@@ -131,6 +131,8 @@ func Test_newCluster(t *testing.T) {
"DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER",
"DELAY_PEER_PORT_TX_RX_LEADER",
"DELAY_PEER_PORT_TX_RX_ALL",
"NO_FAIL_WITH_STRESS",
"NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS",
},
FailureShuffle: true,
FailpointCommands: []string{`panic("etcd-tester")`},
@@ -142,6 +144,7 @@ func Test_newCluster(t *testing.T) {
StressKeySuffixRange: 250000,
StressKeySuffixRangeTxn: 100,
StressKeyTxnOps: 10,
StressClients: 100,
StressQPS: 1000,
},
}

View File

@@ -116,30 +116,34 @@ func (clus *Cluster) doRound() error {
zap.Int("round", clus.rd),
zap.Strings("failures", clus.failureStrings()),
)
for i, f := range clus.failures {
for i, fa := range clus.failures {
clus.cs = i
caseTotalCounter.WithLabelValues(f.Desc()).Inc()
caseTotalCounter.WithLabelValues(fa.Desc()).Inc()
clus.lg.Info("wait health before injecting failures")
if err := clus.WaitHealth(); err != nil {
return fmt.Errorf("wait full health error: %v", err)
}
// TODO: "NO_FAIL_WITH_STRESS"
// TODO: "NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS"
clus.lg.Info(
"injecting failure",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.String("desc", f.Desc()),
zap.String("desc", fa.Desc()),
)
if err := f.Inject(clus); err != nil {
if err := fa.Inject(clus); err != nil {
return fmt.Errorf("injection error: %v", err)
}
clus.lg.Info(
"injected failure",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.String("desc", f.Desc()),
zap.String("desc", fa.Desc()),
)
// if run local, recovering server may conflict
@@ -149,16 +153,16 @@ func (clus *Cluster) doRound() error {
"recovering failure",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.String("desc", f.Desc()),
zap.String("desc", fa.Desc()),
)
if err := f.Recover(clus); err != nil {
if err := fa.Recover(clus); err != nil {
return fmt.Errorf("recovery error: %v", err)
}
clus.lg.Info(
"recovered failure",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.String("desc", f.Desc()),
zap.String("desc", fa.Desc()),
)
clus.lg.Info("pausing stresser after failure recovery, before wait health")
@@ -168,6 +172,7 @@ func (clus *Cluster) doRound() error {
if err := clus.WaitHealth(); err != nil {
return fmt.Errorf("wait full health error: %v", err)
}
clus.lg.Info("check consistency after recovering failures")
if err := clus.checkConsistency(); err != nil {
return fmt.Errorf("tt.checkConsistency error (%v)", err)
@@ -177,7 +182,7 @@ func (clus *Cluster) doRound() error {
"failure case passed",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
zap.String("desc", f.Desc()),
zap.String("desc", fa.Desc()),
)
}
@@ -186,6 +191,7 @@ func (clus *Cluster) doRound() error {
zap.Int("round", clus.rd),
zap.Strings("failures", clus.failureStrings()),
)
return nil
}

View File

@@ -97,6 +97,8 @@ tester-config:
- DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER
- DELAY_PEER_PORT_TX_RX_LEADER
- DELAY_PEER_PORT_TX_RX_ALL
- NO_FAIL_WITH_STRESS
- NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS
failure-shuffle: true
failpoint-commands:
@@ -110,7 +112,6 @@ tester-config:
stress-types:
- KV
- LEASE
# - NO_STRESS
# - ELECTION_RUNNER
# - WATCH_RUNNER
# - LOCK_RACER_RUNNER
@@ -121,4 +122,6 @@ tester-config:
stress-key-suffix-range: 250000
stress-key-suffix-range-txn: 100
stress-key-txn-ops: 10
stress-clients: 100
stress-qps: 1000

View File

@@ -44,14 +44,15 @@ type keyStresser struct {
keyTxnSuffixRange int
keyTxnOps int
N int
rateLimiter *rate.Limiter
wg sync.WaitGroup
wg sync.WaitGroup
clientsN int
ctx context.Context
cancel func()
cli *clientv3.Client
// atomicModifiedKeys records the number of keys created and deleted by the stresser.
atomicModifiedKeys int64
@@ -59,40 +60,37 @@ type keyStresser struct {
}
func (s *keyStresser) Stress() error {
// TODO: add backoff option
cli, err := s.m.CreateEtcdClient()
var err error
s.cli, err = s.m.CreateEtcdClient(grpc.WithBackoffMaxDelay(1 * time.Second))
if err != nil {
return fmt.Errorf("%v (%q)", err, s.m.EtcdClientEndpoint)
}
ctx, cancel := context.WithCancel(context.Background())
s.wg.Add(s.N)
s.cli = cli
s.cancel = cancel
s.ctx, s.cancel = context.WithCancel(context.Background())
s.wg.Add(s.clientsN)
var stressEntries = []stressEntry{
{weight: 0.7, f: newStressPut(cli, s.keySuffixRange, s.keySize)},
{weight: 0.7, f: newStressPut(s.cli, s.keySuffixRange, s.keySize)},
{
weight: 0.7 * float32(s.keySize) / float32(s.keyLargeSize),
f: newStressPut(cli, s.keySuffixRange, s.keyLargeSize),
f: newStressPut(s.cli, s.keySuffixRange, s.keyLargeSize),
},
{weight: 0.07, f: newStressRange(cli, s.keySuffixRange)},
{weight: 0.07, f: newStressRangeInterval(cli, s.keySuffixRange)},
{weight: 0.07, f: newStressDelete(cli, s.keySuffixRange)},
{weight: 0.07, f: newStressDeleteInterval(cli, s.keySuffixRange)},
{weight: 0.07, f: newStressRange(s.cli, s.keySuffixRange)},
{weight: 0.07, f: newStressRangeInterval(s.cli, s.keySuffixRange)},
{weight: 0.07, f: newStressDelete(s.cli, s.keySuffixRange)},
{weight: 0.07, f: newStressDeleteInterval(s.cli, s.keySuffixRange)},
}
if s.keyTxnSuffixRange > 0 {
// adjust to make up ±70% of workloads with writes
stressEntries[0].weight = 0.35
stressEntries = append(stressEntries, stressEntry{
weight: 0.35,
f: newStressTxn(cli, s.keyTxnSuffixRange, s.keyTxnOps),
f: newStressTxn(s.cli, s.keyTxnSuffixRange, s.keyTxnOps),
})
}
s.stressTable = createStressTable(stressEntries)
for i := 0; i < s.N; i++ {
go s.run(ctx)
for i := 0; i < s.clientsN; i++ {
go s.run()
}
s.lg.Info(
@@ -102,18 +100,18 @@ func (s *keyStresser) Stress() error {
return nil
}
func (s *keyStresser) run(ctx context.Context) {
func (s *keyStresser) run() {
defer s.wg.Done()
for {
if err := s.rateLimiter.Wait(ctx); err == context.Canceled {
if err := s.rateLimiter.Wait(s.ctx); err == context.Canceled {
return
}
// TODO: 10-second is enough timeout to cover leader failure
// and immediate leader election. Find out what other cases this
// could be timed out.
sctx, scancel := context.WithTimeout(ctx, 10*time.Second)
sctx, scancel := context.WithTimeout(s.ctx, 10*time.Second)
err, modifiedKeys := s.stressTable.choose()(sctx)
scancel()
if err == nil {

View File

@@ -33,8 +33,8 @@ import (
const (
// time to live for lease
TTL = 120
TTLShort = 2
defaultTTL = 120
defaultTTLShort = 2
)
type leaseStresser struct {
@@ -201,7 +201,7 @@ func (ls *leaseStresser) createAliveLeases() {
wg.Add(1)
go func() {
defer wg.Done()
leaseID, err := ls.createLeaseWithKeys(TTL)
leaseID, err := ls.createLeaseWithKeys(defaultTTL)
if err != nil {
ls.lg.Debug(
"createLeaseWithKeys failed",
@@ -228,7 +228,7 @@ func (ls *leaseStresser) createShortLivedLeases() {
wg.Add(1)
go func() {
defer wg.Done()
leaseID, err := ls.createLeaseWithKeys(TTLShort)
leaseID, err := ls.createLeaseWithKeys(defaultTTLShort)
if err != nil {
return
}
@@ -323,7 +323,7 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
// if it is renewed, this means that invariant checking have at least ttl/2 time before lease exipres which is long enough for the checking to finish.
// if it is not renewed, we remove the lease from the alive map so that the lease doesn't exipre during invariant checking
renewTime, ok := ls.aliveLeases.read(leaseID)
if ok && renewTime.Add(TTL/2*time.Second).Before(time.Now()) {
if ok && renewTime.Add(defaultTTL/2*time.Second).Before(time.Now()) {
ls.aliveLeases.remove(leaseID)
ls.lg.Debug(
"keepLeaseAlive lease has not been renewed, dropped it",