diff --git a/tests/robustness/client/client.go b/tests/robustness/client/client.go index 516e5d5df..6ca9b76ec 100644 --- a/tests/robustness/client/client.go +++ b/tests/robustness/client/client.go @@ -316,6 +316,7 @@ func ToWatchResponse(r clientv3.WatchResponse, baseTime time.Time) model.WatchRe } resp.IsProgressNotify = r.IsProgressNotify() resp.Revision = r.Header.Revision + resp.MemberId = r.Header.MemberId err := r.Err() if err != nil { resp.Error = r.Err().Error() diff --git a/tests/robustness/main_test.go b/tests/robustness/main_test.go index 63ba6b377..b792007dc 100644 --- a/tests/robustness/main_test.go +++ b/tests/robustness/main_test.go @@ -101,6 +101,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce watchProgressNotifyEnabled := c.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0 validateGotAtLeastOneProgressNotify(t, r.Client, s.watch.requestProgress || watchProgressNotifyEnabled) } + validateWatchSequential(t, r.Client) validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.ExpectUniqueRevision()} r.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, r.Client, persistedRequests, 5*time.Minute) diff --git a/tests/robustness/model/watch.go b/tests/robustness/model/watch.go index fc880e30e..5d8bd38b9 100644 --- a/tests/robustness/model/watch.go +++ b/tests/robustness/model/watch.go @@ -26,5 +26,6 @@ type WatchResponse struct { IsProgressNotify bool Revision int64 Time time.Duration + MemberId uint64 Error string } diff --git a/tests/robustness/watch.go b/tests/robustness/watch.go index 3da853f00..254d81164 100644 --- a/tests/robustness/watch.go +++ b/tests/robustness/watch.go @@ -16,6 +16,7 @@ package robustness import ( "context" + "sort" "sync" "testing" "time" @@ -141,3 +142,58 @@ external: t.Errorf("Progress notify does not match, expect: %v, got: %v", expectProgressNotify, gotProgressNotify) } } + +type timeLastRevision struct { + time time.Duration + lastRevision int64 +} + +func combineWatchResponses(reports []report.ClientReport) map[uint64][]timeLastRevision { + result := make(map[uint64][]timeLastRevision) + for _, r := range reports { + for _, op := range r.Watch { + for _, resp := range op.Responses { + if len(resp.Events) == 0 { + continue + } + result[resp.MemberId] = append(result[resp.MemberId], timeLastRevision{time: resp.Time, lastRevision: resp.Events[len(resp.Events)-1].Revision}) + } + } + } + for memberId, structs := range result { + sort.Slice(structs, func(i, j int) bool { + return structs[i].time < structs[j].time + }) + result[memberId] = structs + } + return result +} + +func validateWatchSequential(t *testing.T, reports []report.ClientReport) { + combinedWatchResponses := combineWatchResponses(reports) + for _, r := range reports { + for _, op := range r.Watch { + if op.Request.Revision != 0 { + continue + } + for _, resp := range op.Responses { + if len(resp.Events) == 0 { + continue + } + var lastMemberWatchRevision int64 + for i, c := range combinedWatchResponses[resp.MemberId] { + // Reports are sorted by time, find first greater or equal and use previous one. + if resp.Time >= c.time { + if i == 0 { + continue + } + lastMemberWatchRevision = combinedWatchResponses[resp.MemberId][i-1].lastRevision + } + } + if resp.Events[0].Revision < lastMemberWatchRevision { + t.Errorf("Error watch is not sequential, expect: %v or higher, got: %v, member id: %v", lastMemberWatchRevision, resp.Events[0].Revision, resp.MemberId) + } + } + } + } +}