functional_test: code cleanup and minor enhancements

Cleaned up some useless or dead code;
Remove some unnecessary methods.

Signed-off-by: Benjamin Wang <wachao@vmware.com>
This commit is contained in:
Benjamin Wang
2022-10-20 10:09:19 +08:00
parent e24402d39f
commit bbda1daecf
5 changed files with 55 additions and 94 deletions

View File

@@ -101,17 +101,6 @@ func (c *caseFollower) Recover(clus *Cluster) error {
return c.recoverMember(clus, c.last)
}
func (c *caseFollower) Desc() string {
if c.desc != "" {
return c.desc
}
return c.rpcpbCase.String()
}
func (c *caseFollower) TestCase() rpcpb.Case {
return c.rpcpbCase
}
type caseLeader struct {
caseByFunc
last int
@@ -139,10 +128,6 @@ func (c *caseLeader) Recover(clus *Cluster) error {
return c.recoverMember(clus, c.last)
}
func (c *caseLeader) TestCase() rpcpb.Case {
return c.rpcpbCase
}
type caseQuorum struct {
caseByFunc
injected map[int]struct{}
@@ -167,17 +152,6 @@ func (c *caseQuorum) Recover(clus *Cluster) error {
return nil
}
func (c *caseQuorum) Desc() string {
if c.desc != "" {
return c.desc
}
return c.rpcpbCase.String()
}
func (c *caseQuorum) TestCase() rpcpb.Case {
return c.rpcpbCase
}
func pickQuorum(size int) (picked map[int]struct{}) {
picked = make(map[int]struct{})
r := rand.New(rand.NewSource(time.Now().UnixNano()))
@@ -230,15 +204,15 @@ type caseUntilSnapshot struct {
// all delay failure cases except the ones failing with latency
// greater than election timeout (trigger leader election and
// cluster keeps operating anyways)
var slowCases = map[rpcpb.Case]bool{
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER: true,
rpcpb.Case_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: true,
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: true,
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER: true,
rpcpb.Case_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT: true,
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT: true,
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM: true,
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ALL: true,
var slowCases = map[rpcpb.Case]struct{}{
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER: {},
rpcpb.Case_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: {},
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: {},
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER: {},
rpcpb.Case_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT: {},
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT: {},
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM: {},
rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ALL: {},
}
func (c *caseUntilSnapshot) Inject(clus *Cluster) error {
@@ -268,7 +242,7 @@ func (c *caseUntilSnapshot) Inject(clus *Cluster) error {
// healthy cluster could accept 1000 req/sec at least.
// 3x time to trigger snapshot.
retries := int(snapshotCount) / 1000 * 3
if v, ok := slowCases[c.TestCase()]; v && ok {
if _, ok := slowCases[c.TestCase()]; ok {
// slow network takes more retries
retries *= 5
}

View File

@@ -22,8 +22,6 @@ import (
)
type caseExternal struct {
Case
desc string
rpcpbCase rpcpb.Case

View File

@@ -43,10 +43,9 @@ import (
type Cluster struct {
lg *zap.Logger
agentConns []*grpc.ClientConn
agentClients []rpcpb.TransportClient
agentStreams []rpcpb.Transport_TransportClient
agentRequests []*rpcpb.Request
agentConns []*grpc.ClientConn
agentClients []rpcpb.TransportClient
agentStreams []rpcpb.Transport_TransportClient
testerHTTPServer *http.Server
@@ -80,7 +79,6 @@ func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
clus.agentConns = make([]*grpc.ClientConn, len(clus.Members))
clus.agentClients = make([]rpcpb.TransportClient, len(clus.Members))
clus.agentStreams = make([]rpcpb.Transport_TransportClient, len(clus.Members))
clus.agentRequests = make([]*rpcpb.Request, len(clus.Members))
clus.cases = make([]Case, 0)
lg.Info("creating members")
@@ -260,16 +258,16 @@ func (clus *Cluster) updateCases() {
fpFailures, fperr := failpointFailures(clus)
if len(fpFailures) == 0 {
clus.lg.Info("no failpoints found!", zap.Error(fperr))
} else {
clus.cases = append(clus.cases, fpFailures...)
}
clus.cases = append(clus.cases,
fpFailures...)
case "FAILPOINTS_WITH_DISK_IO_LATENCY":
fpFailures, fperr := failpointDiskIOFailures(clus)
if len(fpFailures) == 0 {
clus.lg.Info("no failpoints found!", zap.Error(fperr))
} else {
clus.cases = append(clus.cases, fpFailures...)
}
clus.cases = append(clus.cases,
fpFailures...)
}
}
}
@@ -446,13 +444,13 @@ func (clus *Cluster) sendOp(idx int, op rpcpb.Operation) error {
func (clus *Cluster) sendOpWithResp(idx int, op rpcpb.Operation) (*rpcpb.Response, error) {
// maintain the initial member object
// throughout the test time
clus.agentRequests[idx] = &rpcpb.Request{
req := &rpcpb.Request{
Operation: op,
Member: clus.Members[idx],
Tester: clus.Tester,
}
err := clus.agentStreams[idx].Send(clus.agentRequests[idx])
err := clus.agentStreams[idx].Send(req)
clus.lg.Info(
"sent request",
zap.String("operation", op.String()),

View File

@@ -54,7 +54,9 @@ type leaseStresser struct {
aliveLeases *atomicLeases
alivedLeasesWithShortTTL *atomicLeases
revokedLeases *atomicLeases
shortLivedLeases *atomicLeases
// The tester doesn't keep alive the shortLivedLeases,
// so they will expire after the TTL.
shortLivedLeases *atomicLeases
runWg sync.WaitGroup
aliveWg sync.WaitGroup
@@ -188,18 +190,16 @@ func (ls *leaseStresser) run() {
}
func (ls *leaseStresser) restartKeepAlives() {
for leaseID := range ls.aliveLeases.getLeasesMap() {
ls.aliveWg.Add(1)
go func(id int64) {
ls.keepLeaseAlive(id)
}(leaseID)
}
for leaseID := range ls.alivedLeasesWithShortTTL.getLeasesMap() {
ls.aliveWg.Add(1)
go func(id int64) {
ls.keepLeaseAlive(id)
}(leaseID)
f := func(leases *atomicLeases) {
for leaseID := range leases.getLeasesMap() {
ls.aliveWg.Add(1)
go func(id int64) {
ls.keepLeaseAlive(id)
}(leaseID)
}
}
f(ls.aliveLeases)
f(ls.alivedLeasesWithShortTTL)
}
func (ls *leaseStresser) createLeases() {
@@ -349,7 +349,17 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
defer ls.aliveWg.Done()
ctx, cancel := context.WithCancel(ls.ctx)
stream, err := ls.cli.KeepAlive(ctx, clientv3.LeaseID(leaseID))
defer func() { cancel() }()
if err != nil {
ls.lg.Error(
"keepLeaseAlive lease creates stream error",
zap.String("stress-type", ls.stype.String()),
zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err),
)
}
defer cancel()
for {
select {
case <-time.After(500 * time.Millisecond):
@@ -361,11 +371,17 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(ls.ctx.Err()),
)
// it is possible that lease expires at invariant checking phase but not at keepLeaseAlive() phase.
// this scenario is possible when alive lease is just about to expire when keepLeaseAlive() exists and expires at invariant checking phase.
// to circumvent that scenario, we check each lease before keepalive loop exist to see if it has been renewed in last TTL/2 duration.
// if it is renewed, this means that invariant checking have at least ttl/2 time before lease expires which is long enough for the checking to finish.
// if it is not renewed, we remove the lease from the alive map so that the lease doesn't expire during invariant checking
// It is possible that lease expires at invariant checking phase
// but not at keepLeaseAlive() phase. This scenario is possible
// when alive lease is just about to expire when keepLeaseAlive()
// exists and expires at invariant checking phase. To circumvent
// this scenario, we check each lease before keepalive loop exist
// to see if it has been renewed in last TTL/2 duration. If it is
// renewed, it means that invariant checking have at least ttl/2
// time before lease expires which is long enough for the checking
// to finish. If it is not renewed, we remove the lease from the
// alive map so that the lease doesn't expire during invariant
// checking.
renewTime, ok := ls.aliveLeases.read(leaseID)
if ok && renewTime.Add(defaultTTL/2*time.Second).Before(time.Now()) {
ls.aliveLeases.remove(leaseID)
@@ -379,31 +395,6 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
return
}
if err != nil {
ls.lg.Debug(
"keepLeaseAlive lease creates stream error",
zap.String("stress-type", ls.stype.String()),
zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err),
)
cancel()
ctx, cancel = context.WithCancel(ls.ctx)
stream, err = ls.cli.KeepAlive(ctx, clientv3.LeaseID(leaseID))
cancel()
continue
}
if err != nil {
ls.lg.Debug(
"keepLeaseAlive failed to receive lease keepalive response",
zap.String("stress-type", ls.stype.String()),
zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err),
)
continue
}
ls.lg.Debug(
"keepLeaseAlive waiting on lease stream",
zap.String("stress-type", ls.stype.String()),

View File

@@ -15,7 +15,7 @@
package tester
import (
"fmt"
"errors"
"math/rand"
"net"
"net/url"
@@ -67,7 +67,7 @@ func errsToError(errs []error) error {
for i, err := range errs {
stringArr[i] = err.Error()
}
return fmt.Errorf(strings.Join(stringArr, ", "))
return errors.New(strings.Join(stringArr, ", "))
}
func randBytes(size int) []byte {