diff --git a/tests/linearizability/failpoints.go b/tests/linearizability/failpoints.go index 1792c173e..0175dcb50 100644 --- a/tests/linearizability/failpoints.go +++ b/tests/linearizability/failpoints.go @@ -53,7 +53,7 @@ var ( CompactBeforeCommitBatchPanic Failpoint = goPanicFailpoint{"compactBeforeCommitBatch", triggerCompact, AnyMember} CompactAfterCommitBatchPanic Failpoint = goPanicFailpoint{"compactAfterCommitBatch", triggerCompact, AnyMember} RaftBeforeLeaderSendPanic Failpoint = goPanicFailpoint{"raftBeforeLeaderSend", nil, Leader} - BlackholePeerNetwork Failpoint = blackholePeerNetworkFailpoint{duration: time.Second} + BlackholePeerNetwork Failpoint = blackholePeerNetworkFailpoint{} DelayPeerNetwork Failpoint = delayPeerNetworkFailpoint{duration: time.Second, baseLatency: 75 * time.Millisecond, randomizedLatency: 50 * time.Millisecond} oneNodeClusterFailpoints = []Failpoint{ KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, @@ -241,76 +241,8 @@ func triggerCompact(_ *testing.T, ctx context.Context, member e2e.EtcdProcess, _ return nil } -// latestRevisionForEndpoint gets latest revision of the first endpoint in Client.Endpoints list -func latestRevisionForEndpoint(ctx context.Context, c *clientv3.Client) (int64, error) { - cntx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) - defer cancel() - resp, err := c.Status(cntx, c.Endpoints()[0]) - if err != nil { - return 0, err - } - return resp.Header.Revision, err -} - func triggerBlackholeUntilSnapshot(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error { - leader := clus.Procs[clus.WaitLeader(t)] - lc, err := clientv3.New(clientv3.Config{ - Endpoints: []string{leader.Config().ClientURL}, - Logger: zap.NewNop(), - DialKeepAliveTime: 1 * time.Millisecond, - DialKeepAliveTimeout: 5 * time.Millisecond, - }) - if err != nil { - return err - } - defer lc.Close() - - mc, err := clientv3.New(clientv3.Config{ - Endpoints: []string{member.Config().ClientURL}, - Logger: zap.NewNop(), - DialKeepAliveTime: 1 * time.Millisecond, - DialKeepAliveTimeout: 5 * time.Millisecond, - }) - if err != nil { - return err - } - defer mc.Close() - - proxy := member.PeerProxy() - // Blackholing will cause peers to not be able to use streamWriters registered with member - // but peer traffic is still possible because member has 'pipeline' with peers - // TODO: find a way to stop all traffic - proxy.BlackholeTx() - proxy.BlackholeRx() - - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - // Have to refresh revBlackholedMem. It can still increase as member processes changes that are received but not yet applied. - revBlackholedMem, err := latestRevisionForEndpoint(ctx, mc) - if err != nil { - return err - } - revLeader, err := latestRevisionForEndpoint(ctx, lc) - if err != nil { - return err - } - t.Logf("Leader: [%s], Member: [%s], revLeader: %d, revBlackholedMem: %d", leader.Config().Name, member.Config().Name, revLeader, revBlackholedMem) - // Blackholed member has to be sufficiently behind to trigger snapshot transfer. - // Need to make sure leader compacted latest revBlackholedMem inside EtcdServer.snapshot. - // That's why we wait for clus.Cfg.SnapshotCount (to trigger snapshot) + clus.Cfg.SnapshotCatchUpEntries (EtcdServer.snapshot compaction offset) - if revLeader-revBlackholedMem > int64(clus.Cfg.SnapshotCount+clus.Cfg.SnapshotCatchUpEntries) { - break - } - time.Sleep(100 * time.Millisecond) - } - - proxy.UnblackholeTx() - proxy.UnblackholeRx() - return nil + return triggerBlackhole(t, ctx, member, clus, true) } type randomFailpoint struct { @@ -343,24 +275,102 @@ func (f randomFailpoint) Available(e2e.EtcdProcess) bool { return true } -type blackholePeerNetworkFailpoint struct { - duration time.Duration -} +type blackholePeerNetworkFailpoint struct{} func (f blackholePeerNetworkFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { member := clus.Procs[rand.Int()%len(clus.Procs)] + return triggerBlackhole(t, ctx, member, clus, false) +} + +func triggerBlackhole(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error { proxy := member.PeerProxy() + // Blackholing will cause peers to not be able to use streamWriters registered with member + // but peer traffic is still possible because member has 'pipeline' with peers + // TODO: find a way to stop all traffic + t.Logf("Blackholing traffic from and to member %q", member.Config().Name) proxy.BlackholeTx() proxy.BlackholeRx() - lg.Info("Blackholing traffic from and to member", zap.String("member", member.Config().Name)) - time.Sleep(f.duration) - lg.Info("Traffic restored from and to member", zap.String("member", member.Config().Name)) - proxy.UnblackholeTx() - proxy.UnblackholeRx() + defer func() { + t.Logf("Traffic restored from and to member %q", member.Config().Name) + proxy.UnblackholeTx() + proxy.UnblackholeRx() + }() + if shouldWaitTillSnapshot { + return waitTillSnapshot(ctx, t, clus, member) + } else { + time.Sleep(time.Second) + return nil + } +} + +func waitTillSnapshot(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, blackholedMember e2e.EtcdProcess) error { + var endpoints []string + for _, ep := range clus.EndpointsV3() { + if ep == blackholedMember.Config().ClientURL { + continue + } + endpoints = append(endpoints, ep) + } + clusterClient, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + Logger: zap.NewNop(), + DialKeepAliveTime: 1 * time.Millisecond, + DialKeepAliveTimeout: 5 * time.Millisecond, + }) + if err != nil { + return err + } + defer clusterClient.Close() + + blackholedMemberClient, err := clientv3.New(clientv3.Config{ + Endpoints: []string{blackholedMember.Config().ClientURL}, + Logger: zap.NewNop(), + DialKeepAliveTime: 1 * time.Millisecond, + DialKeepAliveTimeout: 5 * time.Millisecond, + }) + if err != nil { + return err + } + defer blackholedMemberClient.Close() + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // Have to refresh blackholedMemberRevision. It can still increase as blackholedMember processes changes that are received but not yet applied. + blackholedMemberRevision, err := latestRevisionForEndpoint(ctx, blackholedMemberClient) + if err != nil { + return err + } + clusterRevision, err := latestRevisionForEndpoint(ctx, clusterClient) + if err != nil { + return err + } + t.Logf("clusterRevision: %d, blackholedMemberRevision: %d", clusterRevision, blackholedMemberRevision) + // Blackholed member has to be sufficiently behind to trigger snapshot transfer. + // Need to make sure leader compacted latest revBlackholedMem inside EtcdServer.snapshot. + // That's why we wait for clus.Cfg.SnapshotCount (to trigger snapshot) + clus.Cfg.SnapshotCatchUpEntries (EtcdServer.snapshot compaction offset) + if clusterRevision-blackholedMemberRevision > int64(clus.Cfg.SnapshotCount+clus.Cfg.SnapshotCatchUpEntries) { + break + } + time.Sleep(100 * time.Millisecond) + } return nil } +// latestRevisionForEndpoint gets latest revision of the first endpoint in Client.Endpoints list +func latestRevisionForEndpoint(ctx context.Context, c *clientv3.Client) (int64, error) { + cntx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer cancel() + resp, err := c.Status(cntx, c.Endpoints()[0]) + if err != nil { + return 0, err + } + return resp.Header.Revision, err +} + func (f blackholePeerNetworkFailpoint) Name() string { return "blackhole" }