mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Don't require minimal for failpoint injection period
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
e246bb8d4d
commit
f285330d46
@ -29,10 +29,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
triggerTimeout = time.Minute
|
||||
waitBetweenFailpointTriggers = time.Second
|
||||
failpointInjectionsCount = 1
|
||||
failpointInjectionsRetries = 3
|
||||
triggerTimeout = time.Minute
|
||||
)
|
||||
|
||||
var (
|
||||
@ -78,45 +75,37 @@ func Validate(clus *e2e.EtcdProcessCluster, failpoint Failpoint) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint) error {
|
||||
func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint, baseTime time.Time) (*InjectionReport, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, triggerTimeout)
|
||||
defer cancel()
|
||||
var err error
|
||||
successes := 0
|
||||
failures := 0
|
||||
for successes < failpointInjectionsCount && failures < failpointInjectionsRetries {
|
||||
time.Sleep(waitBetweenFailpointTriggers)
|
||||
|
||||
lg.Info("Verifying cluster health before failpoint", zap.String("failpoint", failpoint.Name()))
|
||||
if err = verifyClusterHealth(ctx, t, clus); err != nil {
|
||||
return fmt.Errorf("failed to verify cluster health before failpoint injection, err: %v", err)
|
||||
}
|
||||
|
||||
lg.Info("Triggering failpoint", zap.String("failpoint", failpoint.Name()))
|
||||
err = failpoint.Inject(ctx, t, lg, clus)
|
||||
if err != nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("Triggering failpoints timed out, err: %v", ctx.Err())
|
||||
default:
|
||||
}
|
||||
lg.Info("Failed to trigger failpoint", zap.String("failpoint", failpoint.Name()), zap.Error(err))
|
||||
failures++
|
||||
continue
|
||||
}
|
||||
|
||||
lg.Info("Verifying cluster health after failpoint", zap.String("failpoint", failpoint.Name()))
|
||||
if err = verifyClusterHealth(ctx, t, clus); err != nil {
|
||||
return fmt.Errorf("failed to verify cluster health after failpoint injection, err: %v", err)
|
||||
}
|
||||
|
||||
successes++
|
||||
if err = verifyClusterHealth(ctx, t, clus); err != nil {
|
||||
return nil, fmt.Errorf("failed to verify cluster health before failpoint injection, err: %v", err)
|
||||
}
|
||||
if successes < failpointInjectionsCount || failures >= failpointInjectionsRetries {
|
||||
t.Errorf("failed to trigger failpoints enough times, err: %v", err)
|
||||
lg.Info("Triggering failpoint", zap.String("failpoint", failpoint.Name()))
|
||||
start := time.Since(baseTime)
|
||||
err = failpoint.Inject(ctx, t, lg, clus)
|
||||
if err != nil {
|
||||
lg.Error("Failed to trigger failpoint", zap.String("failpoint", failpoint.Name()), zap.Error(err))
|
||||
return nil, fmt.Errorf("failed triggering failpoint, err: %v", err)
|
||||
}
|
||||
if err = verifyClusterHealth(ctx, t, clus); err != nil {
|
||||
return nil, fmt.Errorf("failed to verify cluster health after failpoint injection, err: %v", err)
|
||||
}
|
||||
lg.Info("Finished triggering failpoint", zap.String("failpoint", failpoint.Name()))
|
||||
end := time.Since(baseTime)
|
||||
|
||||
return nil
|
||||
return &InjectionReport{
|
||||
Start: start,
|
||||
End: end,
|
||||
Name: failpoint.Name(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
type InjectionReport struct {
|
||||
Start, End time.Duration
|
||||
Name string
|
||||
}
|
||||
|
||||
func verifyClusterHealth(ctx context.Context, _ *testing.T, clus *e2e.EtcdProcessCluster) error {
|
||||
|
@ -110,27 +110,32 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu
|
||||
defer cancel()
|
||||
g := errgroup.Group{}
|
||||
var operationReport, watchReport []report.ClientReport
|
||||
finishTraffic := make(chan struct{})
|
||||
failpointInjected := make(chan failpoint.InjectionReport, 1)
|
||||
|
||||
// using baseTime time-measuring operation to get monotonic clock reading
|
||||
// see https://github.com/golang/go/blob/master/src/time/time.go#L17
|
||||
baseTime := time.Now()
|
||||
ids := identity.NewIDProvider()
|
||||
g.Go(func() error {
|
||||
defer close(finishTraffic)
|
||||
err := failpoint.Inject(ctx, t, lg, clus, s.failpoint)
|
||||
defer close(failpointInjected)
|
||||
// Give some time for traffic to reach qps target before injecting failpoint.
|
||||
time.Sleep(time.Second)
|
||||
fr, err := failpoint.Inject(ctx, t, lg, clus, s.failpoint, baseTime)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
cancel()
|
||||
}
|
||||
// Give some time for traffic to reach qps target after injecting failpoint.
|
||||
time.Sleep(time.Second)
|
||||
lg.Info("Finished injecting failures")
|
||||
if fr != nil {
|
||||
failpointInjected <- *fr
|
||||
}
|
||||
return nil
|
||||
})
|
||||
maxRevisionChan := make(chan int64, 1)
|
||||
g.Go(func() error {
|
||||
defer close(maxRevisionChan)
|
||||
operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.profile, s.traffic, finishTraffic, baseTime, ids)
|
||||
operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.profile, s.traffic, failpointInjected, baseTime, ids)
|
||||
maxRevision := operationsMaxRevision(operationReport)
|
||||
maxRevisionChan <- maxRevision
|
||||
lg.Info("Finished simulating traffic", zap.Int64("max-revision", maxRevision))
|
||||
|
@ -36,17 +36,6 @@ type ClientReport struct {
|
||||
Watch []model.WatchOperation
|
||||
}
|
||||
|
||||
func (r ClientReport) SuccessfulOperations() int {
|
||||
count := 0
|
||||
for _, op := range r.KeyValue {
|
||||
resp := op.Output.(model.MaybeEtcdResponse)
|
||||
if resp.Error == "" {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
func (r ClientReport) WatchEventCount() int {
|
||||
count := 0
|
||||
for _, op := range r.Watch {
|
||||
|
@ -24,7 +24,9 @@ import (
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
||||
"go.etcd.io/etcd/tests/v3/robustness/failpoint"
|
||||
"go.etcd.io/etcd/tests/v3/robustness/identity"
|
||||
"go.etcd.io/etcd/tests/v3/robustness/model"
|
||||
"go.etcd.io/etcd/tests/v3/robustness/report"
|
||||
)
|
||||
|
||||
@ -50,7 +52,7 @@ var (
|
||||
}
|
||||
)
|
||||
|
||||
func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, profile Profile, traffic Traffic, finish <-chan struct{}, baseTime time.Time, ids identity.Provider) []report.ClientReport {
|
||||
func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, profile Profile, traffic Traffic, failpointInjected <-chan failpoint.InjectionReport, baseTime time.Time, ids identity.Provider) []report.ClientReport {
|
||||
mux := sync.Mutex{}
|
||||
endpoints := clus.EndpointsGRPC()
|
||||
|
||||
@ -58,7 +60,6 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
|
||||
reports := []report.ClientReport{}
|
||||
limiter := rate.NewLimiter(rate.Limit(profile.MaximalQPS), 200)
|
||||
|
||||
startTime := time.Now()
|
||||
cc, err := NewClient(endpoints, ids, baseTime)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -71,6 +72,9 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
|
||||
}
|
||||
wg := sync.WaitGroup{}
|
||||
nonUniqueWriteLimiter := NewConcurrencyLimiter(profile.MaxNonUniqueRequestConcurrency)
|
||||
finish := make(chan struct{})
|
||||
lg.Info("Start traffic")
|
||||
startTime := time.Since(baseTime)
|
||||
for i := 0; i < profile.ClientCount; i++ {
|
||||
wg.Add(1)
|
||||
c, nerr := NewClient([]string{endpoints[i%len(endpoints)]}, ids, baseTime)
|
||||
@ -87,8 +91,20 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
|
||||
mux.Unlock()
|
||||
}(c)
|
||||
}
|
||||
var fr *failpoint.InjectionReport
|
||||
select {
|
||||
case frp, ok := <-failpointInjected:
|
||||
if !ok {
|
||||
t.Fatalf("Failed to collect failpoint report")
|
||||
}
|
||||
fr = &frp
|
||||
case <-ctx.Done():
|
||||
t.Fatalf("Traffic finished before failure was injected: %s", ctx.Err())
|
||||
}
|
||||
close(finish)
|
||||
wg.Wait()
|
||||
endTime := time.Now()
|
||||
lg.Info("Finished traffic")
|
||||
endTime := time.Since(baseTime)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
// Ensure that last operation succeeds
|
||||
@ -98,23 +114,55 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
|
||||
}
|
||||
reports = append(reports, cc.Report())
|
||||
|
||||
var totalOperations int
|
||||
var successfulOperations int
|
||||
for _, r := range reports {
|
||||
totalOperations += len(r.KeyValue)
|
||||
successfulOperations += r.SuccessfulOperations()
|
||||
}
|
||||
lg.Info("Recorded operations", zap.Int("operations", totalOperations), zap.Float64("successRate", float64(successfulOperations)/float64(totalOperations)))
|
||||
totalStats := calculateStats(reports, startTime, endTime)
|
||||
beforeFailpointStats := calculateStats(reports, startTime, fr.Start)
|
||||
duringFailpointStats := calculateStats(reports, fr.Start, fr.End)
|
||||
afterFailpointStats := calculateStats(reports, fr.End, endTime)
|
||||
|
||||
period := endTime.Sub(startTime)
|
||||
qps := float64(successfulOperations) / period.Seconds()
|
||||
lg.Info("Traffic from successful requests", zap.Float64("qps", qps), zap.Int("operations", successfulOperations), zap.Duration("period", period))
|
||||
if qps < profile.MinimalQPS {
|
||||
t.Errorf("Requiring minimal %f qps for test results to be reliable, got %f qps", profile.MinimalQPS, qps)
|
||||
lg.Info("Reporting complete traffic", zap.Int("successes", totalStats.successes), zap.Int("failures", totalStats.failures), zap.Float64("successRate", totalStats.successRate()), zap.Duration("period", totalStats.period), zap.Float64("qps", totalStats.QPS()))
|
||||
lg.Info("Reporting traffic before failure injection", zap.Int("successes", beforeFailpointStats.successes), zap.Int("failures", beforeFailpointStats.failures), zap.Float64("successRate", beforeFailpointStats.successRate()), zap.Duration("period", beforeFailpointStats.period), zap.Float64("qps", beforeFailpointStats.QPS()))
|
||||
lg.Info("Reporting traffic during failure injection", zap.Int("successes", duringFailpointStats.successes), zap.Int("failures", duringFailpointStats.failures), zap.Float64("successRate", duringFailpointStats.successRate()), zap.Duration("period", duringFailpointStats.period), zap.Float64("qps", duringFailpointStats.QPS()))
|
||||
lg.Info("Reporting traffic after failure injection", zap.Int("successes", afterFailpointStats.successes), zap.Int("failures", afterFailpointStats.failures), zap.Float64("successRate", afterFailpointStats.successRate()), zap.Duration("period", afterFailpointStats.period), zap.Float64("qps", afterFailpointStats.QPS()))
|
||||
|
||||
if beforeFailpointStats.QPS() < profile.MinimalQPS {
|
||||
t.Errorf("Requiring minimal %f qps before failpoint injection for test results to be reliable, got %f qps", profile.MinimalQPS, beforeFailpointStats.QPS())
|
||||
}
|
||||
// TODO: Validate QPS post failpoint injection to ensure the that we sufficiently cover period when cluster recovers.
|
||||
return reports
|
||||
}
|
||||
|
||||
func calculateStats(reports []report.ClientReport, start, end time.Duration) (ts trafficStats) {
|
||||
ts.period = end - start
|
||||
|
||||
for _, r := range reports {
|
||||
for _, op := range r.KeyValue {
|
||||
if op.Call < start.Nanoseconds() || op.Call > end.Nanoseconds() {
|
||||
continue
|
||||
}
|
||||
resp := op.Output.(model.MaybeEtcdResponse)
|
||||
if resp.Error == "" {
|
||||
ts.successes++
|
||||
} else {
|
||||
ts.failures++
|
||||
}
|
||||
}
|
||||
}
|
||||
return ts
|
||||
}
|
||||
|
||||
type trafficStats struct {
|
||||
successes, failures int
|
||||
period time.Duration
|
||||
}
|
||||
|
||||
func (ts *trafficStats) successRate() float64 {
|
||||
return float64(ts.successes) / float64(ts.successes+ts.failures)
|
||||
}
|
||||
|
||||
func (ts *trafficStats) QPS() float64 {
|
||||
return float64(ts.successes) / ts.period.Seconds()
|
||||
}
|
||||
|
||||
type Profile struct {
|
||||
Name string
|
||||
MinimalQPS float64
|
||||
|
Loading…
x
Reference in New Issue
Block a user