Merge pull request #15250 from serathius/linearizability-validate-watch

tests: Validate watch responses in linearizability tests
This commit is contained in:
Marek Siarkowicz 2023-02-07 14:50:34 +01:00 committed by GitHub
commit 9bae010994
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 79 additions and 36 deletions

View File

@ -170,14 +170,15 @@ func TestLinearizability(t *testing.T) {
t.Fatal(err)
}
defer clus.Close()
operations, events := testLinearizability(ctx, t, lg, clus, FailpointConfig{
operations, watchResponses := testLinearizability(ctx, t, lg, clus, FailpointConfig{
failpoint: scenario.failpoint,
count: 1,
retries: 3,
waitBetweenTriggers: waitBetweenFailpointTriggers,
}, *scenario.traffic)
forcestopCluster(clus)
longestHistory, remainingEvents := pickLongestHistory(events)
validateWatchResponses(t, watchResponses)
longestHistory, remainingEvents := watchEventHistory(watchResponses)
validateEventsMatch(t, longestHistory, remainingEvents)
operations = patchOperationBasedOnWatchEvents(operations, longestHistory)
checkOperationsAndPersistResults(t, lg, operations, clus)
@ -185,7 +186,7 @@ func TestLinearizability(t *testing.T) {
}
}
func testLinearizability(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint FailpointConfig, traffic trafficConfig) (operations []porcupine.Operation, events [][]watchEvent) {
func testLinearizability(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint FailpointConfig, traffic trafficConfig) (operations []porcupine.Operation, responses [][]watchResponse) {
// Run multiple test components (traffic, failpoints, etc) in parallel and use canceling context to propagate stop signal.
g := errgroup.Group{}
trafficCtx, trafficCancel := context.WithCancel(ctx)
@ -203,11 +204,11 @@ func testLinearizability(ctx context.Context, t *testing.T, lg *zap.Logger, clus
return nil
})
g.Go(func() error {
events = collectClusterWatchEvents(watchCtx, t, lg, clus)
responses = collectClusterWatchEvents(watchCtx, t, lg, clus)
return nil
})
g.Wait()
return operations, events
return operations, responses
}
func patchOperationBasedOnWatchEvents(operations []porcupine.Operation, watchEvents []watchEvent) []porcupine.Operation {
@ -381,7 +382,12 @@ type trafficConfig struct {
traffic Traffic
}
func pickLongestHistory(ops [][]watchEvent) (longest []watchEvent, rest [][]watchEvent) {
func watchEventHistory(responses [][]watchResponse) (longest []watchEvent, rest [][]watchEvent) {
ops := make([][]watchEvent, len(responses))
for i, resps := range responses {
ops[i] = toWatchEvents(resps)
}
sort.Slice(ops, func(i, j int) bool {
return len(ops[i]) > len(ops[j])
})

View File

@ -28,10 +28,10 @@ import (
"go.etcd.io/etcd/tests/v3/linearizability/model"
)
func collectClusterWatchEvents(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) [][]watchEvent {
func collectClusterWatchEvents(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) [][]watchResponse {
mux := sync.Mutex{}
var wg sync.WaitGroup
memberEvents := make([][]watchEvent, len(clus.Procs))
memberResponses := make([][]watchResponse, len(clus.Procs))
for i, member := range clus.Procs {
c, err := clientv3.New(clientv3.Config{
Endpoints: member.EndpointsV3(),
@ -47,46 +47,26 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, lg *zap.Logger
go func(i int, c *clientv3.Client) {
defer wg.Done()
defer c.Close()
events := collectMemberWatchEvents(ctx, lg, c)
responses := watchMember(ctx, lg, c)
mux.Lock()
memberEvents[i] = events
memberResponses[i] = responses
mux.Unlock()
}(i, c)
}
wg.Wait()
return memberEvents
return memberResponses
}
func collectMemberWatchEvents(ctx context.Context, lg *zap.Logger, c *clientv3.Client) []watchEvent {
events := []watchEvent{}
var lastRevision int64 = 1
func watchMember(ctx context.Context, lg *zap.Logger, c *clientv3.Client) (resps []watchResponse) {
var lastRevision int64 = 0
for {
select {
case <-ctx.Done():
return events
return resps
default:
}
for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision)) {
for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1)) {
resps = append(resps, watchResponse{resp, time.Now()})
lastRevision = resp.Header.Revision
time := time.Now()
for _, event := range resp.Events {
var op model.OperationType
switch event.Type {
case mvccpb.PUT:
op = model.Put
case mvccpb.DELETE:
op = model.Delete
}
events = append(events, watchEvent{
Time: time,
Revision: event.Kv.ModRevision,
Op: model.EtcdOperation{
Type: op,
Key: string(event.Kv.Key),
Value: model.ToValueOrHash(string(event.Kv.Value)),
},
})
}
if resp.Err() != nil {
lg.Info("Watch error", zap.Error(resp.Err()))
}
@ -94,6 +74,63 @@ func collectMemberWatchEvents(ctx context.Context, lg *zap.Logger, c *clientv3.C
}
}
func validateWatchResponses(t *testing.T, responses [][]watchResponse) {
for _, memberResponses := range responses {
validateMemberWatchResponses(t, memberResponses)
}
}
func validateMemberWatchResponses(t *testing.T, responses []watchResponse) {
var lastRevision int64 = 1
for _, resp := range responses {
if resp.Header.Revision < lastRevision {
t.Errorf("Revision should never decrease")
}
if resp.Header.Revision == lastRevision && len(resp.Events) != 0 {
t.Errorf("Got two non-empty responses about same revision")
}
for _, event := range resp.Events {
if event.Kv.ModRevision != lastRevision+1 {
t.Errorf("Expect revision to grow by 1, last: %d, mod: %d", lastRevision, event.Kv.ModRevision)
}
lastRevision = event.Kv.ModRevision
}
if resp.Header.Revision != lastRevision {
t.Errorf("Expect response revision equal last event mod revision")
}
lastRevision = resp.Header.Revision
}
}
func toWatchEvents(responses []watchResponse) (events []watchEvent) {
for _, resp := range responses {
for _, event := range resp.Events {
var op model.OperationType
switch event.Type {
case mvccpb.PUT:
op = model.Put
case mvccpb.DELETE:
op = model.Delete
}
events = append(events, watchEvent{
Time: resp.time,
Revision: event.Kv.ModRevision,
Op: model.EtcdOperation{
Type: op,
Key: string(event.Kv.Key),
Value: model.ToValueOrHash(string(event.Kv.Value)),
},
})
}
}
return events
}
type watchResponse struct {
clientv3.WatchResponse
time time.Time
}
type watchEvent struct {
Op model.EtcdOperation
Revision int64