diff --git a/tests/linearizability/linearizability_test.go b/tests/linearizability/linearizability_test.go index 7844461d6..e5e7b3786 100644 --- a/tests/linearizability/linearizability_test.go +++ b/tests/linearizability/linearizability_test.go @@ -170,14 +170,15 @@ func TestLinearizability(t *testing.T) { t.Fatal(err) } defer clus.Close() - operations, events := testLinearizability(ctx, t, lg, clus, FailpointConfig{ + operations, watchResponses := testLinearizability(ctx, t, lg, clus, FailpointConfig{ failpoint: scenario.failpoint, count: 1, retries: 3, waitBetweenTriggers: waitBetweenFailpointTriggers, }, *scenario.traffic) forcestopCluster(clus) - longestHistory, remainingEvents := pickLongestHistory(events) + validateWatchResponses(t, watchResponses) + longestHistory, remainingEvents := watchEventHistory(watchResponses) validateEventsMatch(t, longestHistory, remainingEvents) operations = patchOperationBasedOnWatchEvents(operations, longestHistory) checkOperationsAndPersistResults(t, lg, operations, clus) @@ -185,7 +186,7 @@ func TestLinearizability(t *testing.T) { } } -func testLinearizability(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint FailpointConfig, traffic trafficConfig) (operations []porcupine.Operation, events [][]watchEvent) { +func testLinearizability(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint FailpointConfig, traffic trafficConfig) (operations []porcupine.Operation, responses [][]watchResponse) { // Run multiple test components (traffic, failpoints, etc) in parallel and use canceling context to propagate stop signal. g := errgroup.Group{} trafficCtx, trafficCancel := context.WithCancel(ctx) @@ -203,11 +204,11 @@ func testLinearizability(ctx context.Context, t *testing.T, lg *zap.Logger, clus return nil }) g.Go(func() error { - events = collectClusterWatchEvents(watchCtx, t, lg, clus) + responses = collectClusterWatchEvents(watchCtx, t, lg, clus) return nil }) g.Wait() - return operations, events + return operations, responses } func patchOperationBasedOnWatchEvents(operations []porcupine.Operation, watchEvents []watchEvent) []porcupine.Operation { @@ -381,7 +382,12 @@ type trafficConfig struct { traffic Traffic } -func pickLongestHistory(ops [][]watchEvent) (longest []watchEvent, rest [][]watchEvent) { +func watchEventHistory(responses [][]watchResponse) (longest []watchEvent, rest [][]watchEvent) { + ops := make([][]watchEvent, len(responses)) + for i, resps := range responses { + ops[i] = toWatchEvents(resps) + } + sort.Slice(ops, func(i, j int) bool { return len(ops[i]) > len(ops[j]) }) diff --git a/tests/linearizability/watch.go b/tests/linearizability/watch.go index ff1e3f101..2aaf59b18 100644 --- a/tests/linearizability/watch.go +++ b/tests/linearizability/watch.go @@ -28,10 +28,10 @@ import ( "go.etcd.io/etcd/tests/v3/linearizability/model" ) -func collectClusterWatchEvents(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) [][]watchEvent { +func collectClusterWatchEvents(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) [][]watchResponse { mux := sync.Mutex{} var wg sync.WaitGroup - memberEvents := make([][]watchEvent, len(clus.Procs)) + memberResponses := make([][]watchResponse, len(clus.Procs)) for i, member := range clus.Procs { c, err := clientv3.New(clientv3.Config{ Endpoints: member.EndpointsV3(), @@ -47,46 +47,26 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, lg *zap.Logger go func(i int, c *clientv3.Client) { defer wg.Done() defer c.Close() - events := collectMemberWatchEvents(ctx, lg, c) + responses := watchMember(ctx, lg, c) mux.Lock() - memberEvents[i] = events + memberResponses[i] = responses mux.Unlock() }(i, c) } wg.Wait() - return memberEvents + return memberResponses } - -func collectMemberWatchEvents(ctx context.Context, lg *zap.Logger, c *clientv3.Client) []watchEvent { - events := []watchEvent{} - var lastRevision int64 = 1 +func watchMember(ctx context.Context, lg *zap.Logger, c *clientv3.Client) (resps []watchResponse) { + var lastRevision int64 = 0 for { select { case <-ctx.Done(): - return events + return resps default: } - for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision)) { + for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1)) { + resps = append(resps, watchResponse{resp, time.Now()}) lastRevision = resp.Header.Revision - time := time.Now() - for _, event := range resp.Events { - var op model.OperationType - switch event.Type { - case mvccpb.PUT: - op = model.Put - case mvccpb.DELETE: - op = model.Delete - } - events = append(events, watchEvent{ - Time: time, - Revision: event.Kv.ModRevision, - Op: model.EtcdOperation{ - Type: op, - Key: string(event.Kv.Key), - Value: model.ToValueOrHash(string(event.Kv.Value)), - }, - }) - } if resp.Err() != nil { lg.Info("Watch error", zap.Error(resp.Err())) } @@ -94,6 +74,63 @@ func collectMemberWatchEvents(ctx context.Context, lg *zap.Logger, c *clientv3.C } } +func validateWatchResponses(t *testing.T, responses [][]watchResponse) { + for _, memberResponses := range responses { + validateMemberWatchResponses(t, memberResponses) + } +} + +func validateMemberWatchResponses(t *testing.T, responses []watchResponse) { + var lastRevision int64 = 1 + for _, resp := range responses { + if resp.Header.Revision < lastRevision { + t.Errorf("Revision should never decrease") + } + if resp.Header.Revision == lastRevision && len(resp.Events) != 0 { + t.Errorf("Got two non-empty responses about same revision") + } + for _, event := range resp.Events { + if event.Kv.ModRevision != lastRevision+1 { + t.Errorf("Expect revision to grow by 1, last: %d, mod: %d", lastRevision, event.Kv.ModRevision) + } + lastRevision = event.Kv.ModRevision + } + if resp.Header.Revision != lastRevision { + t.Errorf("Expect response revision equal last event mod revision") + } + lastRevision = resp.Header.Revision + } +} + +func toWatchEvents(responses []watchResponse) (events []watchEvent) { + for _, resp := range responses { + for _, event := range resp.Events { + var op model.OperationType + switch event.Type { + case mvccpb.PUT: + op = model.Put + case mvccpb.DELETE: + op = model.Delete + } + events = append(events, watchEvent{ + Time: resp.time, + Revision: event.Kv.ModRevision, + Op: model.EtcdOperation{ + Type: op, + Key: string(event.Kv.Key), + Value: model.ToValueOrHash(string(event.Kv.Value)), + }, + }) + } + } + return events +} + +type watchResponse struct { + clientv3.WatchResponse + time time.Time +} + type watchEvent struct { Op model.EtcdOperation Revision int64