tests/robustness: Separate triggering failpoint from injection

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2023-04-06 13:52:50 +02:00
parent 932415a8d5
commit 625d427eb5
2 changed files with 76 additions and 44 deletions

View File

@ -35,8 +35,8 @@ const (
var (
KillFailpoint Failpoint = killFailpoint{}
DefragBeforeCopyPanic Failpoint = goPanicFailpoint{"defragBeforeCopy", triggerDefrag, AnyMember}
DefragBeforeRenamePanic Failpoint = goPanicFailpoint{"defragBeforeRename", triggerDefrag, AnyMember}
DefragBeforeCopyPanic Failpoint = goPanicFailpoint{"defragBeforeCopy", triggerDefrag{}, AnyMember}
DefragBeforeRenamePanic Failpoint = goPanicFailpoint{"defragBeforeRename", triggerDefrag{}, AnyMember}
BeforeCommitPanic Failpoint = goPanicFailpoint{"beforeCommit", nil, AnyMember}
AfterCommitPanic Failpoint = goPanicFailpoint{"afterCommit", nil, AnyMember}
RaftBeforeSavePanic Failpoint = goPanicFailpoint{"raftBeforeSave", nil, AnyMember}
@ -47,15 +47,15 @@ var (
BackendAfterStartDBTxnPanic Failpoint = goPanicFailpoint{"afterStartDBTxn", nil, AnyMember}
BackendBeforeWritebackBufPanic Failpoint = goPanicFailpoint{"beforeWritebackBuf", nil, AnyMember}
BackendAfterWritebackBufPanic Failpoint = goPanicFailpoint{"afterWritebackBuf", nil, AnyMember}
CompactBeforeCommitScheduledCompactPanic Failpoint = goPanicFailpoint{"compactBeforeCommitScheduledCompact", triggerCompact, AnyMember}
CompactAfterCommitScheduledCompactPanic Failpoint = goPanicFailpoint{"compactAfterCommitScheduledCompact", triggerCompact, AnyMember}
CompactBeforeSetFinishedCompactPanic Failpoint = goPanicFailpoint{"compactBeforeSetFinishedCompact", triggerCompact, AnyMember}
CompactAfterSetFinishedCompactPanic Failpoint = goPanicFailpoint{"compactAfterSetFinishedCompact", triggerCompact, AnyMember}
CompactBeforeCommitBatchPanic Failpoint = goPanicFailpoint{"compactBeforeCommitBatch", triggerCompact, AnyMember}
CompactAfterCommitBatchPanic Failpoint = goPanicFailpoint{"compactAfterCommitBatch", triggerCompact, AnyMember}
CompactBeforeCommitScheduledCompactPanic Failpoint = goPanicFailpoint{"compactBeforeCommitScheduledCompact", triggerCompact{}, AnyMember}
CompactAfterCommitScheduledCompactPanic Failpoint = goPanicFailpoint{"compactAfterCommitScheduledCompact", triggerCompact{}, AnyMember}
CompactBeforeSetFinishedCompactPanic Failpoint = goPanicFailpoint{"compactBeforeSetFinishedCompact", triggerCompact{}, AnyMember}
CompactAfterSetFinishedCompactPanic Failpoint = goPanicFailpoint{"compactAfterSetFinishedCompact", triggerCompact{}, AnyMember}
CompactBeforeCommitBatchPanic Failpoint = goPanicFailpoint{"compactBeforeCommitBatch", triggerCompact{}, AnyMember}
CompactAfterCommitBatchPanic Failpoint = goPanicFailpoint{"compactAfterCommitBatch", triggerCompact{}, AnyMember}
RaftBeforeLeaderSendPanic Failpoint = goPanicFailpoint{"raftBeforeLeaderSend", nil, Leader}
BlackholePeerNetwork Failpoint = blackholePeerNetworkFailpoint{waitTillSnapshot: false}
BlackholeUntilSnapshot Failpoint = blackholePeerNetworkFailpoint{waitTillSnapshot: true}
BlackholePeerNetwork Failpoint = blackholePeerNetworkFailpoint{triggerBlackhole{waitTillSnapshot: false}}
BlackholeUntilSnapshot Failpoint = blackholePeerNetworkFailpoint{triggerBlackhole{waitTillSnapshot: true}}
DelayPeerNetwork Failpoint = delayPeerNetworkFailpoint{duration: time.Second, baseLatency: 75 * time.Millisecond, randomizedLatency: 50 * time.Millisecond}
oneNodeClusterFailpoints = []Failpoint{
KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic,
@ -73,18 +73,18 @@ var (
RandomOneNodeClusterFailpoint Failpoint = randomFailpoint{oneNodeClusterFailpoints}
RaftBeforeFollowerSendPanic Failpoint = goPanicFailpoint{"raftBeforeFollowerSend", nil, Follower}
RandomMultiNodeClusterFailpoint Failpoint = randomFailpoint{append(oneNodeClusterFailpoints, RaftBeforeFollowerSendPanic)}
RaftBeforeApplySnapPanic Failpoint = goPanicFailpoint{"raftBeforeApplySnap", triggerBlackholeUntilSnapshot, Follower}
RaftAfterApplySnapPanic Failpoint = goPanicFailpoint{"raftAfterApplySnap", triggerBlackholeUntilSnapshot, Follower}
RaftAfterWALReleasePanic Failpoint = goPanicFailpoint{"raftAfterWALRelease", triggerBlackholeUntilSnapshot, Follower}
RaftBeforeSaveSnapPanic Failpoint = goPanicFailpoint{"raftBeforeSaveSnap", triggerBlackholeUntilSnapshot, Follower}
RaftAfterSaveSnapPanic Failpoint = goPanicFailpoint{"raftAfterSaveSnap", triggerBlackholeUntilSnapshot, Follower}
RaftBeforeApplySnapPanic Failpoint = goPanicFailpoint{"raftBeforeApplySnap", triggerBlackhole{waitTillSnapshot: true}, Follower}
RaftAfterApplySnapPanic Failpoint = goPanicFailpoint{"raftAfterApplySnap", triggerBlackhole{waitTillSnapshot: true}, Follower}
RaftAfterWALReleasePanic Failpoint = goPanicFailpoint{"raftAfterWALRelease", triggerBlackhole{waitTillSnapshot: true}, Follower}
RaftBeforeSaveSnapPanic Failpoint = goPanicFailpoint{"raftBeforeSaveSnap", triggerBlackhole{waitTillSnapshot: true}, Follower}
RaftAfterSaveSnapPanic Failpoint = goPanicFailpoint{"raftAfterSaveSnap", triggerBlackhole{waitTillSnapshot: true}, Follower}
RandomSnapshotFailpoint Failpoint = randomFailpoint{[]Failpoint{
RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic, RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic,
BlackholeUntilSnapshot,
}}
)
func triggerFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config FailpointConfig) {
func injectFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config FailpointConfig) {
ctx, cancel := context.WithTimeout(ctx, triggerTimeout)
defer cancel()
@ -107,7 +107,7 @@ func triggerFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *
}
lg.Info("Triggering failpoint", zap.String("failpoint", config.failpoint.Name()))
err = config.failpoint.Trigger(ctx, t, lg, clus)
err = config.failpoint.Inject(ctx, t, lg, clus)
if err != nil {
select {
case <-ctx.Done():
@ -171,14 +171,18 @@ type FailpointConfig struct {
}
type Failpoint interface {
Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error
Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error
Name() string
AvailabilityChecker
}
type AvailabilityChecker interface {
Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool
}
type killFailpoint struct{}
func (f killFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
func (f killFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
member := clus.Procs[rand.Int()%len(clus.Procs)]
for member.IsRunning() {
@ -210,10 +214,15 @@ func (f killFailpoint) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess)
type goPanicFailpoint struct {
failpoint string
trigger func(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error
trigger trigger
target failpointTarget
}
type trigger interface {
Trigger(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error
AvailabilityChecker
}
type failpointTarget string
const (
@ -222,7 +231,7 @@ const (
Follower failpointTarget = "Follower"
)
func (f goPanicFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
func (f goPanicFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
member := f.pickMember(t, clus)
for member.IsRunning() {
@ -237,7 +246,7 @@ func (f goPanicFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Log
}
if f.trigger != nil {
lg.Info("Triggering gofailpoint", zap.String("failpoint", f.Name()))
err = f.trigger(t, ctx, member, clus)
err = f.trigger.Trigger(t, ctx, member, clus)
if err != nil {
lg.Info("gofailpoint trigger failed", zap.String("failpoint", f.Name()), zap.Error(err))
}
@ -275,6 +284,9 @@ func (f goPanicFailpoint) Available(config e2e.EtcdProcessClusterConfig, member
if f.target == Follower && config.ClusterSize == 1 {
return false
}
if f.trigger != nil && !f.trigger.Available(config, member) {
return false
}
memberFailpoints := member.Failpoints()
if memberFailpoints == nil {
return false
@ -288,7 +300,9 @@ func (f goPanicFailpoint) Name() string {
return f.failpoint
}
func triggerDefrag(_ *testing.T, ctx context.Context, member e2e.EtcdProcess, _ *e2e.EtcdProcessCluster) error {
type triggerDefrag struct{}
func (t triggerDefrag) Trigger(_ *testing.T, ctx context.Context, member e2e.EtcdProcess, _ *e2e.EtcdProcessCluster) error {
cc, err := clientv3.New(clientv3.Config{
Endpoints: member.EndpointsGRPC(),
Logger: zap.NewNop(),
@ -306,7 +320,13 @@ func triggerDefrag(_ *testing.T, ctx context.Context, member e2e.EtcdProcess, _
return nil
}
func triggerCompact(_ *testing.T, ctx context.Context, member e2e.EtcdProcess, _ *e2e.EtcdProcessCluster) error {
func (t triggerDefrag) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool {
return true
}
type triggerCompact struct{}
func (t triggerCompact) Trigger(_ *testing.T, ctx context.Context, member e2e.EtcdProcess, _ *e2e.EtcdProcessCluster) error {
cc, err := clientv3.New(clientv3.Config{
Endpoints: member.EndpointsGRPC(),
Logger: zap.NewNop(),
@ -328,15 +348,15 @@ func triggerCompact(_ *testing.T, ctx context.Context, member e2e.EtcdProcess, _
return nil
}
func triggerBlackholeUntilSnapshot(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error {
return triggerBlackhole(t, ctx, member, clus, true)
func (t triggerCompact) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool {
return true
}
type randomFailpoint struct {
failpoints []Failpoint
}
func (f randomFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
func (f randomFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
availableFailpoints := make([]Failpoint, 0, len(f.failpoints))
for _, failpoint := range f.failpoints {
count := 0
@ -349,9 +369,13 @@ func (f randomFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logg
availableFailpoints = append(availableFailpoints, failpoint)
}
}
if len(availableFailpoints) == 0 {
t.Errorf("No available failpoints")
return nil
}
failpoint := availableFailpoints[rand.Int()%len(availableFailpoints)]
lg.Info("Triggering failpoint\n", zap.String("failpoint", failpoint.Name()))
return failpoint.Trigger(ctx, t, lg, clus)
return failpoint.Inject(ctx, t, lg, clus)
}
func (f randomFailpoint) Name() string {
@ -363,15 +387,31 @@ func (f randomFailpoint) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess
}
type blackholePeerNetworkFailpoint struct {
triggerBlackhole
}
func (f blackholePeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
member := clus.Procs[rand.Int()%len(clus.Procs)]
return f.Trigger(t, ctx, member, clus)
}
func (f blackholePeerNetworkFailpoint) Name() string {
return "blackhole"
}
type triggerBlackhole struct {
waitTillSnapshot bool
}
func (f blackholePeerNetworkFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
member := clus.Procs[rand.Int()%len(clus.Procs)]
return triggerBlackhole(t, ctx, member, clus, f.waitTillSnapshot)
func (tb triggerBlackhole) Trigger(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error {
return blackhole(t, ctx, member, clus, tb.waitTillSnapshot)
}
func triggerBlackhole(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error {
func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, process e2e.EtcdProcess) bool {
return config.ClusterSize > 1 && process.PeerProxy() != nil
}
func blackhole(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error {
proxy := member.PeerProxy()
// Blackholing will cause peers to not be able to use streamWriters registered with member
@ -460,21 +500,13 @@ func latestRevisionForEndpoint(ctx context.Context, c *clientv3.Client) (int64,
return resp.Header.Revision, err
}
func (f blackholePeerNetworkFailpoint) Name() string {
return "blackhole"
}
func (f blackholePeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, proc e2e.EtcdProcess) bool {
return config.ClusterSize > 1 && proc.PeerProxy() != nil
}
type delayPeerNetworkFailpoint struct {
duration time.Duration
baseLatency time.Duration
randomizedLatency time.Duration
}
func (f delayPeerNetworkFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
func (f delayPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
member := clus.Procs[rand.Int()%len(clus.Procs)]
proxy := member.PeerProxy()
@ -492,6 +524,6 @@ func (f delayPeerNetworkFailpoint) Name() string {
return "delay"
}
func (f delayPeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, proc e2e.EtcdProcess) bool {
return config.ClusterSize > 1 && proc.PeerProxy() != nil
func (f delayPeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, clus e2e.EtcdProcess) bool {
return config.ClusterSize > 1 && clus.PeerProxy() != nil
}

View File

@ -244,7 +244,7 @@ func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.Et
g.Go(func() error {
defer close(finishTraffic)
triggerFailpoints(ctx, t, lg, clus, failpoint)
injectFailpoints(ctx, t, lg, clus, failpoint)
time.Sleep(time.Second)
return nil
})